diff --git a/client/tests/main.test.ts b/client/tests/main.test.ts index e1c7a0b..a7c5c8a 100644 --- a/client/tests/main.test.ts +++ b/client/tests/main.test.ts @@ -21,7 +21,7 @@ const fakeHandler = async (request: Request, waitUntil: (task: Promise) => const fakeProcedure = async (_request: Request, logger: Logger): Promise => { logger.info('Processing lead form request'); - logger.error('Some important API is down, rerouting', { api_down: 'catpics' }); + logger.error('Some important API is down, rerouting', { api_down: 'catpics', sus_field: 'ok\0;drop table;' }); logger.warn('Rerouting to an internal API', { timeout: 3600 }); logger.info('Recaptcha OK', { score: 0.9 }); diff --git a/readme.md b/readme.md index af741e5..ccd0d39 100644 --- a/readme.md +++ b/readme.md @@ -51,6 +51,7 @@ Use the docker image and you're all set. - `DATABASE_URL`: `{string}` - PostgreSQL/TImescaleDB database URL - `LOKI_URL`: `{string}|null` - Loki HTTP API url (only schema and host are used) - `LOKI_STRUCTURED_METADATA`: `true|false|null` - Use loki structured metadata instead of labels for everything +- `LOKI_STRICT_LABELS`: `true|false|null` - Restrict label expressions to a basic character set (latin letters, numbers, commonly used symbols) - `DB_MIGRATE`: `true|false|null` - Run DB migration on startup - `RPC_TOKEN`: `{string}|null` - Management API token - `INGESTER_MAX_LABELS`: `{int|null}` - Max number of labels per ingested stream (it's shared between `stream` and `stream.entry[idx]`; having multiple entries doesn't exhaust this quota) diff --git a/service/forwarder/loki/format.go b/service/forwarder/loki/format.go new file mode 100644 index 0000000..241e74c --- /dev/null +++ b/service/forwarder/loki/format.go @@ -0,0 +1,91 @@ +package loki + +import ( + "log/slog" + "strings" + "unicode" +) + +func filterLabelFormat(labels map[string]string) { + + for key, value := range labels { + + if stripped := stripLabelKey(key); stripped == "" { + + slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (illformed key)", + slog.String("key", key)) + + delete(labels, key) + continue + + } else if stripped != key { + + slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label moved (illformed key)", + slog.String("key", key), + slog.String("new_key", stripped)) + + delete(labels, key) + key = stripped + labels[key] = value + } + + if stripped := stripLabelValue(value); stripped == "" { + + slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (stripped value is empty)", + slog.String("key", key)) + + delete(labels, key) + + } else if stripped != value { + + slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label value stripped", + slog.String("key", key)) + + labels[key] = stripped + } + } +} + +func stripLabelKey(key string) string { + + var stripped string + + for _, next := range key { + + switch next { + case '_', '-', '+', '*', '=': + stripped += "_" + continue + } + + switch { + case + (next >= 'A' && next <= 'Z'), + (next >= 'a' && next <= 'z'), + (next >= '0' && next <= '9'): + stripped += string(next) + continue + } + } + + return strings.TrimSpace(stripped) +} + +func stripLabelValue(key string) string { + + var stripped string + + for _, next := range key { + + switch { + case next == '\\': + stripped += "/" + case unicode.IsPrint(next): + stripped += string(next) + default: + stripped += "?" + } + } + + return stripped +} diff --git a/service/forwarder/loki/loki.go b/service/forwarder/loki/loki.go index 3283e2f..a6a3501 100644 --- a/service/forwarder/loki/loki.go +++ b/service/forwarder/loki/loki.go @@ -8,7 +8,6 @@ import ( "log/slog" "net/http" "net/url" - "os" "strings" "github.com/google/uuid" @@ -17,12 +16,17 @@ import ( ) type Loki struct { - url string - UseStructMeta bool + LokiOptions + url string +} + +type LokiOptions struct { Retries int + UseStructMeta bool + StrictLabels bool } -func ParseLokiUrl(params string) (*Loki, error) { +func ParseLokiUrl(params string, opts LokiOptions) (*Loki, error) { if params = strings.TrimSpace(params); params == "" { return nil, nil @@ -47,8 +51,8 @@ func ParseLokiUrl(params string) (*Loki, error) { } return &Loki{ - url: lokiUrl.String(), - UseStructMeta: strings.ToLower(os.Getenv("LOKI_STRUCTURED_METADATA")) != "false", + LokiOptions: opts, + url: lokiUrl.String(), }, nil } @@ -92,7 +96,7 @@ func (this *Loki) retryNumber() int { func (this *Loki) PushStreams(streams []LokiStream) error { - payload, err := lokiSerializeStreams(streams) + payload, err := this.serializeStreams(streams) if err != nil { return err } @@ -136,11 +140,11 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd var streams []LokiStream if this.UseStructMeta { - if next := webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 { + if next := this.webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 { streams = []LokiStream{next} } } else { - streams = webStreamToLabeled(payload, streamSource, txID) + streams = this.webStreamToLabeled(payload, streamSource, txID) } if len(streams) == 0 { @@ -171,7 +175,7 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd slog.String("remote_addr", remoteAddr)) } -func lokiSerializeStreams(streams []LokiStream) (*bytes.Buffer, error) { +func (this *Loki) serializeStreams(streams []LokiStream) (*bytes.Buffer, error) { payload := LokiHttpBatch{Streams: streams} diff --git a/service/forwarder/loki/streams.go b/service/forwarder/loki/streams.go index 73429f2..213f795 100644 --- a/service/forwarder/loki/streams.go +++ b/service/forwarder/loki/streams.go @@ -10,7 +10,7 @@ import ( "github.com/maddsua/logpush/service/logdata" ) -func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream { +func (this *Loki) webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream { baseLabels := map[string]string{ "logpush_source": "web", @@ -34,6 +34,10 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx logdata.CopyMetaFields(labels, entry.Meta) labels["detected_level"] = entry.Level.String() + if this.StrictLabels { + filterLabelFormat(labels) + } + result = append(result, LokiStream{ Stream: labels, Values: [][]any{ @@ -48,7 +52,7 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx return result } -func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream { +func (this *Loki) webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream { labels := map[string]string{ "logpush_source": "web", @@ -83,12 +87,17 @@ func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, meta := map[string]string{} maps.Copy(meta, metaFields) - meta["detected_level"] = entry.Level.String() if entry.Meta != nil { maps.Copy(meta, entry.Meta) } + meta["detected_level"] = entry.Level.String() + + if this.StrictLabels { + filterLabelFormat(meta) + } + streamValues = append(streamValues, []any{ entry.Date.String(idx), entry.Message, @@ -96,6 +105,10 @@ func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, }) } + if this.StrictLabels { + filterLabelFormat(labels) + } + return LokiStream{ Stream: labels, Values: streamValues, diff --git a/service/main.go b/service/main.go index ebd597d..b17b43f 100644 --- a/service/main.go +++ b/service/main.go @@ -39,7 +39,7 @@ func main() { slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil))) } - if strings.ToLower(os.Getenv("DEBUG")) == "true" { + if envBool("DEBUG") { slog.SetLogLoggerLevel(slog.LevelDebug) slog.Debug("Logging enabled") } @@ -64,7 +64,10 @@ func main() { slog.Info("STARTUP: DB connection OK") - lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL")) + lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL"), loki.LokiOptions{ + UseStructMeta: envBoolNf("LOKI_STRUCTURED_METADATA"), + StrictLabels: envBoolNf("LOKI_STRICT_LABELS"), + }) if err != nil { slog.Error("STARTUP: Unable to parse LOKI_HOST", slog.String("err", err.Error())) @@ -85,7 +88,7 @@ func main() { slog.Info("STARTUP: Loki not configured. Using Timescale/Postgres") } - if strings.ToLower(os.Getenv("DB_MIGRATE")) == "true" { + if envBool("DB_MIGRATEDEBUG") { slog.Info("STARTUP: Running DB migrations") @@ -109,7 +112,7 @@ func main() { MaxLabelLen: envInt("INGESTER_MAX_LABEL_LEN"), MaxMessages: envInt("INGESTER_MAX_MESSAGES"), MaxMessageLen: envInt("INGESTER_MAX_MESSAGE_LEN"), - KeepEmptyLabels: strings.ToLower(os.Getenv("INGESTER_KEEP_EMPTY_LABELS")) != "false", + KeepEmptyLabels: envBoolNf("INGESTER_KEEP_EMPTY_LABELS"), }, } @@ -218,3 +221,11 @@ func envInt(name string) int { return val } + +func envBool(name string) bool { + return strings.ToLower(os.Getenv(name)) == "true" +} + +func envBoolNf(name string) bool { + return strings.ToLower(os.Getenv(name)) != "false" +}