goatcounter-systemd/main.go

141 lines
3 KiB
Go
Raw Normal View History

2024-09-28 16:20:24 -04:00
package main
import (
"context"
"errors"
"log/slog"
"os"
"os/signal"
"time"
"git.0xdad.com/tblyler/goatcounter-systemd/caddy"
"git.0xdad.com/tblyler/goatcounter-systemd/di"
"git.0xdad.com/tblyler/goatcounter-systemd/goatcounter"
"github.com/coreos/go-systemd/v22/sdjournal"
"golang.org/x/sync/errgroup"
)
func main() {
signalNotifyChan := make(chan os.Signal, 1)
signal.Notify(signalNotifyChan, os.Interrupt)
signalContextErr := errors.New("interrupt signal received")
ctx, cancel := context.WithCancelCause(context.Background())
go func() {
<-signalNotifyChan
slog.WarnContext(ctx, "shutting down due to interrupt signal")
cancel(signalContextErr)
}()
container, err := di.NewContainer()
defer container.Close()
if err != nil {
slog.ErrorContext(ctx, "failed to initialize dependency injection container", slog.String("err", err.Error()))
os.Exit(1)
}
journald := container.Journald()
eg, ctx := errgroup.WithContext(ctx)
rawJSONChan := make(chan []byte, 12)
eg.Go(func() error {
defer close(rawJSONChan)
for entry, err := range journald.Iter(ctx) {
if err != nil {
return err
}
rawJSONChan <- []byte(entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE])
}
return nil
})
caddyLogChan := make(chan caddy.LogEntry, 12)
eg.Go(func() error {
defer close(caddyLogChan)
for rawJSON := range rawJSONChan {
logEntry, err := caddy.FromJSON(rawJSON)
if err != nil {
slog.ErrorContext(ctx, "failed to parse caddy log", slog.String("err", err.Error()), slog.String("json", string(rawJSON)))
continue
}
caddyLogChan <- logEntry
}
return nil
})
goatHitChan := make(chan goatcounter.Hit, 12)
eg.Go(func() error {
defer close(goatHitChan)
for caddyLog := range caddyLogChan {
goatHitChan <- goatcounter.HitFromCaddyLogEntry(caddyLog)
}
return nil
})
eg.Go(func() error {
// send hits to goatcounter at least every 10 seconds
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
goatcounterMultiSiteClient := container.GoatcounterMultiSiteClient()
hits := make([]goatcounter.Hit, 0, goatcounter.MaxHitsPerRequest)
count := func() {
counted, err := goatcounterMultiSiteClient.Count(ctx, hits...)
if err != nil {
slog.ErrorContext(ctx, "failed to count hits", slog.String("err", err.Error()))
return
}
2024-09-28 16:24:43 -04:00
if counted == 0 {
return
}
2024-09-28 16:20:24 -04:00
slog.InfoContext(ctx, "counted hits across all sites", slog.Uint64("counted", counted))
hits = hits[:0]
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case hit := <-goatHitChan:
hits = append(hits, hit)
// send hits to goatcounter if we have the max number for a request
// this might be spread across multiple sites
if len(hits) >= goatcounter.MaxHitsPerRequest {
count()
}
case <-ticker.C:
count()
}
}
})
err = eg.Wait()
if err != nil {
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), signalContextErr) {
return
}
slog.ErrorContext(ctx, "shutting down due to error", slog.String("err", err.Error()))
os.Exit(2)
}
}