Initial MVP commit

This commit is contained in:
Tony Blyler 2024-09-28 16:20:24 -04:00
commit 7bcf16a1d4
No known key found for this signature in database
24 changed files with 2781 additions and 0 deletions

1
.containerignore Normal file
View file

@ -0,0 +1 @@
.git

19
Containerfile Normal file
View file

@ -0,0 +1,19 @@
FROM docker.io/golang:1.23 AS builder
RUN apt-get update && apt-get install -y libsystemd-dev
WORKDIR /goatcounter-systemd/
COPY ./ /goatcounter-systemd/
ENV GOCACHE="/go/pkg/mod"
RUN --mount=type=cache,target="/go/pkg/mod" go build \
-o /usr/local/bin/goatcounter-systemd
FROM docker.io/debian:stable-slim
COPY --from=builder /usr/local/bin/goatcounter-systemd /usr/local/bin/goatcounter-systemd
USER nobody
ENTRYPOINT [ "/usr/local/bin/goatcounter-systemd" ]

64
caddy/log_parse.go Normal file
View file

@ -0,0 +1,64 @@
package caddy
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
type LogEntry struct {
rawLogEntry
Timestamp time.Time
Duration time.Duration
}
type logRequest struct {
RemoteIP string `json:"remote_ip"`
RemotePort string `json:"remote_port"`
ClientIP string `json:"client_ip"`
Proto string `json:"proto"`
Method string `json:"method"`
Host string `json:"host"`
URI string `json:"uri"`
Headers http.Header `json:"headers"`
}
type rawLogEntry struct {
Level string `json:"level"`
Timestamp float64 `json:"ts"`
Logger string `json:"logger"`
Message string `json:"msg"`
Upstream string `json:"upstream"`
Duration float64 `json:"duration"`
Request logRequest `json:"request"`
}
func FromJSON(jsonData []byte) (logEntry LogEntry, err error) {
rawLogEntry := rawLogEntry{}
err = json.Unmarshal(jsonData, &rawLogEntry)
if err != nil {
return logEntry, fmt.Errorf("failed to JSON unmarshal caddy log entry: %w", err)
}
rawLogEntry.Request.Host = stripPort(rawLogEntry.Request.Host)
seconds := int64(rawLogEntry.Timestamp)
nanoSeconds := int64((rawLogEntry.Timestamp - float64(seconds)) * 1e9)
logEntry.Timestamp = time.Unix(seconds, nanoSeconds)
logEntry.Duration = time.Duration(float64(time.Second) * rawLogEntry.Duration)
logEntry.rawLogEntry = rawLogEntry
return logEntry, nil
}
func stripPort(host string) string {
portIndex := strings.Index(host, ":")
if portIndex >= 0 {
return host[:portIndex]
}
return host
}

32
config/config.go Normal file
View file

@ -0,0 +1,32 @@
package config
import (
"encoding/json"
"fmt"
"os"
)
type Site struct {
URL string
APIKey string
Hosts []string
}
type Config struct {
LogIdentifier string
Sites []Site
}
func FromFilePath(filePath string) (config Config, err error) {
data, err := os.ReadFile(filePath)
if err != nil {
return config, fmt.Errorf("failed to read config file path %s: %w", filePath, err)
}
err = json.Unmarshal(data, &config)
if err != nil {
return config, fmt.Errorf("failed to unmarshal JSON from file path %s: %w", filePath, err)
}
return config, nil
}

137
di/container.go Normal file
View file

@ -0,0 +1,137 @@
package di
import (
"errors"
"fmt"
"log/slog"
"os"
"strings"
"git.0xdad.com/tblyler/goatcounter-systemd/config"
"git.0xdad.com/tblyler/goatcounter-systemd/goatcounter"
"git.0xdad.com/tblyler/goatcounter-systemd/journald"
)
type Container struct {
config config.Config
journald *journald.Journald
goatcounterMultiSiteClient *goatcounter.MultiSiteClient
closers []func() error
}
func NewContainer() (*Container, error) {
container := &Container{}
sets := []func() error{
container.setSlog,
container.setConfig,
container.setJournald,
container.setGoatcounterMultiSiteClient,
}
for _, set := range sets {
err := set()
if err != nil {
return container, err
}
}
return container, nil
}
func (c *Container) Close() error {
errs := make([]error, 0, len(c.closers))
for _, closer := range c.closers {
errs = append(errs, closer())
}
return errors.Join(errs...)
}
func (c *Container) setSlog() error {
var logLevel slog.Level
switch strings.ToLower(strings.TrimSpace(os.Getenv("LOG_LEVEL"))) {
case "debug":
logLevel = slog.LevelDebug
case "info", "":
logLevel = slog.LevelInfo
case "warn":
logLevel = slog.LevelWarn
case "error":
logLevel = slog.LevelError
default:
return fmt.Errorf("invalid log level: %s", os.Getenv("LOG_LEVEL"))
}
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: logLevel <= slog.LevelDebug,
Level: logLevel,
}))
slog.SetDefault(logger)
slog.SetLogLoggerLevel(logLevel)
return nil
}
func (c *Container) Config() config.Config {
return c.config
}
func (c *Container) setConfig() error {
configPath := os.Getenv("CONFIG_FILE")
if configPath == "" {
configPath = "/etc/goatcounter-systemd/config.json"
}
config, err := config.FromFilePath(configPath)
if err != nil {
return fmt.Errorf("failed to set config: %w", err)
}
c.config = config
return nil
}
func (c *Container) Journald() *journald.Journald {
return c.journald
}
func (c *Container) setJournald() error {
config := c.Config()
journald, err := journald.NewJournald(config.LogIdentifier)
if err != nil {
return fmt.Errorf("failed to set journald: %w", err)
}
c.closers = append(c.closers, journald.Close)
c.journald = journald
return nil
}
func (c *Container) GoatcounterMultiSiteClient() *goatcounter.MultiSiteClient {
return c.goatcounterMultiSiteClient
}
func (c *Container) setGoatcounterMultiSiteClient() error {
config := c.Config()
siteToClient := map[string]goatcounter.SiteClient{}
for _, site := range config.Sites {
client, err := goatcounter.NewClient(site.URL, site.APIKey)
if err != nil {
return fmt.Errorf("failed to create client for site %s: %w", site.URL, err)
}
for _, host := range site.Hosts {
siteToClient[host] = client
}
}
c.goatcounterMultiSiteClient = goatcounter.NewMultiSiteClient(siteToClient, true)
return nil
}

8
go.mod Normal file
View file

@ -0,0 +1,8 @@
module git.0xdad.com/tblyler/goatcounter-systemd
go 1.23.1
require (
github.com/coreos/go-systemd/v22 v22.5.0
golang.org/x/sync v0.8.0
)

5
go.sum Normal file
View file

@ -0,0 +1,5 @@
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=

136
goatcounter/client.go Normal file
View file

@ -0,0 +1,136 @@
package goatcounter
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"slices"
"time"
"git.0xdad.com/tblyler/goatcounter-systemd/caddy"
)
const (
MaxHitsPerRequest = 500
)
type Hit struct {
Host string `json:"-"`
Bot int `json:"bot,omitempty"`
CreatedAt string `json:"created_at"`
Event bool `json:"event"`
IP string `json:"ip"`
Location string `json:"location,omitempty"`
Path string `json:"path"`
Query string `json:"query,omitempty"`
Referrer string `json:"ref,omitempty"`
Session string `json:"session,omitempty"`
ScreenSize string `json:"size,omitempty"`
Title string `json:"title,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
}
type CountRequest struct {
Filter []string `json:"filter,omitempty"`
Hits []Hit `json:"hits"`
NoSessions bool `json:"no_sessions"`
}
type Client struct {
url *url.URL
apiKey string
httpClient *http.Client
defaultHeaders http.Header
}
func HitFromCaddyLogEntry(logEntry caddy.LogEntry) Hit {
path := ""
query := ""
requestURI, err := url.ParseRequestURI(logEntry.Request.URI)
if err == nil {
path = requestURI.Path
query = requestURI.RawQuery
}
return Hit{
Bot: 0,
Host: logEntry.Request.Host,
CreatedAt: logEntry.Timestamp.Format(time.RFC3339Nano),
Event: false,
IP: logEntry.Request.ClientIP,
Location: "",
Path: path,
Query: query,
Referrer: logEntry.Request.Headers.Get("Referer"),
Session: "",
ScreenSize: "",
Title: "",
UserAgent: logEntry.Request.Headers.Get("User-Agent"),
}
}
func NewClient(address, apiKey string) (*Client, error) {
url, err := url.ParseRequestURI(address)
if err != nil {
return nil, fmt.Errorf("failed to parse address as URL %s: %w", address, err)
}
return &Client{
url: url,
apiKey: apiKey,
httpClient: &http.Client{},
defaultHeaders: http.Header{
"Content-Type": []string{"application/json"},
"Authorization": []string{"Bearer " + apiKey},
},
}, nil
}
func (c *Client) countURL() string {
return c.url.JoinPath("/api/v0/count").String()
}
func (c *Client) URL() *url.URL {
return c.url
}
func (c *Client) Count(ctx context.Context, hits ...Hit) (counted uint64, err error) {
for hits := range slices.Chunk(hits, MaxHitsPerRequest) {
if len(hits) == 0 {
continue
}
body, err := json.Marshal(CountRequest{
NoSessions: true,
Hits: hits,
})
if err != nil {
return counted, fmt.Errorf("failed to marshal count request: %w", err)
}
request, err := http.NewRequestWithContext(ctx, http.MethodPost, c.countURL(), bytes.NewReader(body))
if err != nil {
return counted, fmt.Errorf("failed to create count request: %w", err)
}
request.Header = c.defaultHeaders
response, err := c.httpClient.Do(request)
if err != nil {
return counted, fmt.Errorf("failed to perform count request: %w", err)
}
if response.StatusCode != http.StatusAccepted {
responseBody, _ := io.ReadAll(response.Body)
return counted, fmt.Errorf("unexpected status code %d, response: %s", response.StatusCode, string(responseBody))
}
counted += uint64(len(hits))
}
return counted, nil
}

View file

@ -0,0 +1,69 @@
package goatcounter
import (
"context"
"log/slog"
"net/url"
)
type SiteClient interface {
URL() *url.URL
Count(context.Context, ...Hit) (uint64, error)
}
type MultiSiteClient struct {
siteToClient map[string]SiteClient
ignoreSites map[string]struct{}
}
func NewMultiSiteClient(siteToClient map[string]SiteClient, ignoreGoatSites bool) *MultiSiteClient {
ignoreSites := map[string]struct{}{}
if ignoreGoatSites {
for _, client := range siteToClient {
ignoreSite := client.URL().Host
slog.Debug("ignoring goat site", slog.String("site", ignoreSite))
ignoreSites[ignoreSite] = struct{}{}
}
}
return &MultiSiteClient{
siteToClient: siteToClient,
ignoreSites: ignoreSites,
}
}
func (m *MultiSiteClient) Count(ctx context.Context, hits ...Hit) (counted uint64, err error) {
siteHits := map[string][]Hit{}
for _, hit := range hits {
siteHits[hit.Host] = append(siteHits[hit.Host], hit)
}
for site, hits := range siteHits {
hitCount := len(hits)
logger := slog.With(slog.String("site", site), slog.Int("count", hitCount))
if _, ignore := m.ignoreSites[site]; ignore {
logger.DebugContext(ctx, "ignoring hits for site")
continue
}
client, ok := m.siteToClient[site]
if !ok {
logger.ErrorContext(ctx, "no client for site, skipping")
continue
}
subCount, err := client.Count(ctx, hits...)
if err != nil {
logger.ErrorContext(ctx, "failed to count hits", slog.String("err", err.Error()))
continue
}
logger.InfoContext(ctx, "counted hits", slog.Uint64("counted", subCount))
counted += subCount
}
return counted, nil
}

114
journald/journald.go Normal file
View file

@ -0,0 +1,114 @@
package journald
import (
"context"
"fmt"
"iter"
"strings"
"sync"
"time"
"github.com/coreos/go-systemd/v22/sdjournal"
)
func syslogMatchData(syslogIdentifier string) string {
builder := strings.Builder{}
builder.WriteString(sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER)
builder.WriteString("=")
builder.WriteString(syslogIdentifier)
return builder.String()
}
type Journald struct {
journal *sdjournal.Journal
lock sync.Mutex
}
func NewJournald(syslogName string) (*Journald, error) {
journal, err := sdjournal.NewJournal()
if err != nil {
return nil, fmt.Errorf("failed to initialize journal: %w", err)
}
if syslogName != "" {
matchData := syslogMatchData(syslogName)
err = journal.AddMatch(matchData)
if err != nil {
return nil, fmt.Errorf("failed to add match %s to journal: %w", matchData, err)
}
}
err = journal.SeekTail()
if err != nil {
return nil, fmt.Errorf("failed to seek to tail: %w", err)
}
_, err = journal.Previous()
if err != nil {
return nil, fmt.Errorf("failed to go to tail message: %w", err)
}
return &Journald{
journal: journal,
}, nil
}
func (j *Journald) Close() error {
err := j.journal.Close()
if err != nil {
return fmt.Errorf("failed to close journal: %w", err)
}
return nil
}
func (j *Journald) Next() (*sdjournal.JournalEntry, error) {
j.lock.Lock()
defer j.lock.Unlock()
entryCount, err := j.journal.Next()
if err != nil {
return nil, fmt.Errorf("failed to get next journald entry: %w", err)
}
if entryCount == 0 {
return nil, nil
}
entry, err := j.journal.GetEntry()
if err != nil {
return nil, fmt.Errorf("failed to get journald entry: %w", err)
}
return entry, nil
}
func (j *Journald) Iter(ctx context.Context) iter.Seq2[*sdjournal.JournalEntry, error] {
return func(yield func(*sdjournal.JournalEntry, error) bool) {
for {
j.journal.Wait(time.Second)
if ctx.Err() != nil {
yield(nil, fmt.Errorf("failed to iterate on journald, context error: %w", ctx.Err()))
return
}
entry, err := j.Next()
if err != nil {
yield(nil, err)
return
}
// no more entries at the moment
if entry == nil {
continue
}
// we were told to stop
if !yield(entry, nil) {
return
}
}
}
}

136
main.go Normal file
View file

@ -0,0 +1,136 @@
package main
import (
"context"
"errors"
"log/slog"
"os"
"os/signal"
"time"
"git.0xdad.com/tblyler/goatcounter-systemd/caddy"
"git.0xdad.com/tblyler/goatcounter-systemd/di"
"git.0xdad.com/tblyler/goatcounter-systemd/goatcounter"
"github.com/coreos/go-systemd/v22/sdjournal"
"golang.org/x/sync/errgroup"
)
func main() {
signalNotifyChan := make(chan os.Signal, 1)
signal.Notify(signalNotifyChan, os.Interrupt)
signalContextErr := errors.New("interrupt signal received")
ctx, cancel := context.WithCancelCause(context.Background())
go func() {
<-signalNotifyChan
slog.WarnContext(ctx, "shutting down due to interrupt signal")
cancel(signalContextErr)
}()
container, err := di.NewContainer()
defer container.Close()
if err != nil {
slog.ErrorContext(ctx, "failed to initialize dependency injection container", slog.String("err", err.Error()))
os.Exit(1)
}
journald := container.Journald()
eg, ctx := errgroup.WithContext(ctx)
rawJSONChan := make(chan []byte, 12)
eg.Go(func() error {
defer close(rawJSONChan)
for entry, err := range journald.Iter(ctx) {
if err != nil {
return err
}
rawJSONChan <- []byte(entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE])
}
return nil
})
caddyLogChan := make(chan caddy.LogEntry, 12)
eg.Go(func() error {
defer close(caddyLogChan)
for rawJSON := range rawJSONChan {
logEntry, err := caddy.FromJSON(rawJSON)
if err != nil {
slog.ErrorContext(ctx, "failed to parse caddy log", slog.String("err", err.Error()), slog.String("json", string(rawJSON)))
continue
}
caddyLogChan <- logEntry
}
return nil
})
goatHitChan := make(chan goatcounter.Hit, 12)
eg.Go(func() error {
defer close(goatHitChan)
for caddyLog := range caddyLogChan {
goatHitChan <- goatcounter.HitFromCaddyLogEntry(caddyLog)
}
return nil
})
eg.Go(func() error {
// send hits to goatcounter at least every 10 seconds
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
goatcounterMultiSiteClient := container.GoatcounterMultiSiteClient()
hits := make([]goatcounter.Hit, 0, goatcounter.MaxHitsPerRequest)
count := func() {
counted, err := goatcounterMultiSiteClient.Count(ctx, hits...)
if err != nil {
slog.ErrorContext(ctx, "failed to count hits", slog.String("err", err.Error()))
return
}
slog.InfoContext(ctx, "counted hits across all sites", slog.Uint64("counted", counted))
hits = hits[:0]
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case hit := <-goatHitChan:
hits = append(hits, hit)
// send hits to goatcounter if we have the max number for a request
// this might be spread across multiple sites
if len(hits) >= goatcounter.MaxHitsPerRequest {
count()
}
case <-ticker.C:
count()
}
}
})
err = eg.Wait()
if err != nil {
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), signalContextErr) {
return
}
slog.ErrorContext(ctx, "shutting down due to error", slog.String("err", err.Error()))
os.Exit(2)
}
}

191
vendor/github.com/coreos/go-systemd/v22/LICENSE generated vendored Normal file
View file

@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

5
vendor/github.com/coreos/go-systemd/v22/NOTICE generated vendored Normal file
View file

@ -0,0 +1,5 @@
CoreOS Project
Copyright 2018 CoreOS, Inc
This product includes software developed at CoreOS, Inc.
(http://www.coreos.com/).

View file

@ -0,0 +1,82 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package dlopen provides some convenience functions to dlopen a library and
// get its symbols.
package dlopen
// #cgo LDFLAGS: -ldl
// #include <stdlib.h>
// #include <dlfcn.h>
import "C"
import (
"errors"
"fmt"
"unsafe"
)
var ErrSoNotFound = errors.New("unable to open a handle to the library")
// LibHandle represents an open handle to a library (.so)
type LibHandle struct {
Handle unsafe.Pointer
Libname string
}
// GetHandle tries to get a handle to a library (.so), attempting to access it
// by the names specified in libs and returning the first that is successfully
// opened. Callers are responsible for closing the handler. If no library can
// be successfully opened, an error is returned.
func GetHandle(libs []string) (*LibHandle, error) {
for _, name := range libs {
libname := C.CString(name)
defer C.free(unsafe.Pointer(libname))
handle := C.dlopen(libname, C.RTLD_LAZY)
if handle != nil {
h := &LibHandle{
Handle: handle,
Libname: name,
}
return h, nil
}
}
return nil, ErrSoNotFound
}
// GetSymbolPointer takes a symbol name and returns a pointer to the symbol.
func (l *LibHandle) GetSymbolPointer(symbol string) (unsafe.Pointer, error) {
sym := C.CString(symbol)
defer C.free(unsafe.Pointer(sym))
C.dlerror()
p := C.dlsym(l.Handle, sym)
e := C.dlerror()
if e != nil {
return nil, fmt.Errorf("error resolving symbol %q: %v", symbol, errors.New(C.GoString(e)))
}
return p, nil
}
// Close closes a LibHandle.
func (l *LibHandle) Close() error {
C.dlerror()
C.dlclose(l.Handle)
e := C.dlerror()
if e != nil {
return fmt.Errorf("error closing %v: %v", l.Libname, errors.New(C.GoString(e)))
}
return nil
}

View file

@ -0,0 +1,57 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//go:build linux
// +build linux
package dlopen
// #include <string.h>
// #include <stdlib.h>
//
// int
// my_strlen(void *f, const char *s)
// {
// size_t (*strlen)(const char *);
//
// strlen = (size_t (*)(const char *))f;
// return strlen(s);
// }
import "C"
import (
"fmt"
"unsafe"
)
func strlen(libs []string, s string) (int, error) {
h, err := GetHandle(libs)
if err != nil {
return -1, fmt.Errorf(`couldn't get a handle to the library: %v`, err)
}
defer h.Close()
f := "strlen"
cs := C.CString(s)
defer C.free(unsafe.Pointer(cs))
strlen, err := h.GetSymbolPointer(f)
if err != nil {
return -1, fmt.Errorf(`couldn't get symbol %q: %v`, f, err)
}
len := C.my_strlen(strlen, cs)
return int(len), nil
}

View file

@ -0,0 +1,66 @@
// Copyright 2015 RedHat, Inc.
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sdjournal
import (
"github.com/coreos/go-systemd/v22/internal/dlopen"
"sync"
"unsafe"
)
var (
// lazy initialized
libsystemdHandle *dlopen.LibHandle
libsystemdMutex = &sync.Mutex{}
libsystemdFunctions = map[string]unsafe.Pointer{}
libsystemdNames = []string{
// systemd < 209
"libsystemd-journal.so.0",
"libsystemd-journal.so",
// systemd >= 209 merged libsystemd-journal into libsystemd proper
"libsystemd.so.0",
"libsystemd.so",
}
)
func getFunction(name string) (unsafe.Pointer, error) {
libsystemdMutex.Lock()
defer libsystemdMutex.Unlock()
if libsystemdHandle == nil {
h, err := dlopen.GetHandle(libsystemdNames)
if err != nil {
return nil, err
}
libsystemdHandle = h
}
f, ok := libsystemdFunctions[name]
if !ok {
var err error
f, err = libsystemdHandle.GetSymbolPointer(name)
if err != nil {
return nil, err
}
libsystemdFunctions[name] = f
}
return f, nil
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,272 @@
// Copyright 2015 RedHat, Inc.
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sdjournal
import (
"errors"
"fmt"
"io"
"log"
"strings"
"sync"
"time"
)
var (
// ErrExpired gets returned when the Follow function runs into the
// specified timeout.
ErrExpired = errors.New("Timeout expired")
)
// JournalReaderConfig represents options to drive the behavior of a JournalReader.
type JournalReaderConfig struct {
// The Since, NumFromTail and Cursor options are mutually exclusive and
// determine where the reading begins within the journal. The order in which
// options are written is exactly the order of precedence.
Since time.Duration // start relative to a Duration from now
NumFromTail uint64 // start relative to the tail
Cursor string // start relative to the cursor
// Show only journal entries whose fields match the supplied values. If
// the array is empty, entries will not be filtered.
Matches []Match
// If not empty, the journal instance will point to a journal residing
// in this directory. The supplied path may be relative or absolute.
Path string
// If not nil, Formatter will be used to translate the resulting entries
// into strings. If not set, the default format (timestamp and message field)
// will be used. If Formatter returns an error, Read will stop and return the error.
Formatter func(entry *JournalEntry) (string, error)
}
// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
type JournalReader struct {
journal *Journal
msgReader *strings.Reader
formatter func(entry *JournalEntry) (string, error)
}
// NewJournalReader creates a new JournalReader with configuration options that are similar to the
// systemd journalctl tool's iteration and filtering features.
func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
// use simpleMessageFormatter as default formatter.
if config.Formatter == nil {
config.Formatter = simpleMessageFormatter
}
r := &JournalReader{
formatter: config.Formatter,
}
// Open the journal
var err error
if config.Path != "" {
r.journal, err = NewJournalFromDir(config.Path)
} else {
r.journal, err = NewJournal()
}
if err != nil {
return nil, err
}
// Add any supplied matches
for _, m := range config.Matches {
if err = r.journal.AddMatch(m.String()); err != nil {
return nil, err
}
}
// Set the start position based on options
if config.Since != 0 {
// Start based on a relative time
start := time.Now().Add(config.Since)
if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
return nil, err
}
} else if config.NumFromTail != 0 {
// Start based on a number of lines before the tail
if err := r.journal.SeekTail(); err != nil {
return nil, err
}
// Move the read pointer into position near the tail. Go one further than
// the option so that the initial cursor advancement positions us at the
// correct starting point.
skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
if err != nil {
return nil, err
}
// If we skipped fewer lines than expected, we have reached journal start.
// Thus, we seek to head so that next invocation can read the first line.
if skip != config.NumFromTail+1 {
if err := r.journal.SeekHead(); err != nil {
return nil, err
}
}
} else if config.Cursor != "" {
// Start based on a custom cursor
if err := r.journal.SeekCursor(config.Cursor); err != nil {
return nil, err
}
}
return r, nil
}
// Read reads entries from the journal. Read follows the Reader interface so
// it must be able to read a specific amount of bytes. Journald on the other
// hand only allows us to read full entries of arbitrary size (without byte
// granularity). JournalReader is therefore internally buffering entries that
// don't fit in the read buffer. Callers should keep calling until 0 and/or an
// error is returned.
func (r *JournalReader) Read(b []byte) (int, error) {
if r.msgReader == nil {
// Advance the journal cursor. It has to be called at least one time
// before reading
c, err := r.journal.Next()
// An unexpected error
if err != nil {
return 0, err
}
// EOF detection
if c == 0 {
return 0, io.EOF
}
entry, err := r.journal.GetEntry()
if err != nil {
return 0, err
}
// Build a message
msg, err := r.formatter(entry)
if err != nil {
return 0, err
}
r.msgReader = strings.NewReader(msg)
}
// Copy and return the message
sz, err := r.msgReader.Read(b)
if err == io.EOF {
// The current entry has been fully read. Don't propagate this
// EOF, so the next entry can be read at the next Read()
// iteration.
r.msgReader = nil
return sz, nil
}
if err != nil {
return sz, err
}
if r.msgReader.Len() == 0 {
r.msgReader = nil
}
return sz, nil
}
// Close closes the JournalReader's handle to the journal.
func (r *JournalReader) Close() error {
return r.journal.Close()
}
// Rewind attempts to rewind the JournalReader to the first entry.
func (r *JournalReader) Rewind() error {
r.msgReader = nil
return r.journal.SeekHead()
}
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
// follow will continue until a single time.Time is received on the until channel.
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
// Process journal entries and events. Entries are flushed until the tail or
// timeout is reached, and then we wait for new events or the timeout.
var msg = make([]byte, 64*1<<(10))
var waitCh = make(chan int, 1)
var waitGroup sync.WaitGroup
defer waitGroup.Wait()
process:
for {
c, err := r.Read(msg)
if err != nil && err != io.EOF {
return err
}
select {
case <-until:
return ErrExpired
default:
}
if c > 0 {
if _, err = writer.Write(msg[:c]); err != nil {
return err
}
continue process
}
// We're at the tail, so wait for new events or time out.
// Holds journal events to process. Tightly bounded for now unless there's a
// reason to unblock the journal watch routine more quickly.
for {
waitGroup.Add(1)
go func() {
status := r.journal.Wait(100 * time.Millisecond)
waitCh <- status
waitGroup.Done()
}()
select {
case <-until:
return ErrExpired
case e := <-waitCh:
switch e {
case SD_JOURNAL_NOP:
// the journal did not change since the last invocation
case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
continue process
default:
if e < 0 {
return fmt.Errorf("received error event: %d", e)
}
log.Printf("received unknown event: %d\n", e)
}
}
}
}
}
// simpleMessageFormatter is the default formatter.
// It returns a string representing the current journal entry in a simple format which
// includes the entry timestamp and MESSAGE field.
func simpleMessageFormatter(entry *JournalEntry) (string, error) {
msg, ok := entry.Fields["MESSAGE"]
if !ok {
return "", fmt.Errorf("no MESSAGE field present in journal entry")
}
usec := entry.RealtimeTimestamp
timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
return fmt.Sprintf("%s %s\n", timestamp, msg), nil
}

27
vendor/golang.org/x/sync/LICENSE generated vendored Normal file
View file

@ -0,0 +1,27 @@
Copyright 2009 The Go Authors.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google LLC nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

22
vendor/golang.org/x/sync/PATENTS generated vendored Normal file
View file

@ -0,0 +1,22 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

135
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View file

@ -0,0 +1,135 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}

13
vendor/golang.org/x/sync/errgroup/go120.go generated vendored Normal file
View file

@ -0,0 +1,13 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}

14
vendor/golang.org/x/sync/errgroup/pre_go120.go generated vendored Normal file
View file

@ -0,0 +1,14 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}

7
vendor/modules.txt vendored Normal file
View file

@ -0,0 +1,7 @@
# github.com/coreos/go-systemd/v22 v22.5.0
## explicit; go 1.12
github.com/coreos/go-systemd/v22/internal/dlopen
github.com/coreos/go-systemd/v22/sdjournal
# golang.org/x/sync v0.8.0
## explicit; go 1.18
golang.org/x/sync/errgroup