Skip to content

Commit

Permalink
Patch: loki strict labels (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua authored Feb 21, 2025
1 parent c588943 commit 0a658c9
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 18 deletions.
2 changes: 1 addition & 1 deletion client/tests/main.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const fakeHandler = async (request: Request, waitUntil: (task: Promise<any>) =>
const fakeProcedure = async (_request: Request, logger: Logger): Promise<Response> => {

logger.info('Processing lead form request');
logger.error('Some important API is down, rerouting', { api_down: 'catpics' });
logger.error('Some important API is down, rerouting', { api_down: 'catpics', sus_field: 'ok\0;drop table;' });
logger.warn('Rerouting to an internal API', { timeout: 3600 });
logger.info('Recaptcha OK', { score: 0.9 });

Expand Down
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Use the docker image and you're all set.
- `DATABASE_URL`: `{string}` - PostgreSQL/TImescaleDB database URL
- `LOKI_URL`: `{string}|null` - Loki HTTP API url (only schema and host are used)
- `LOKI_STRUCTURED_METADATA`: `true|false|null` - Use loki structured metadata instead of labels for everything
- `LOKI_STRICT_LABELS`: `true|false|null` - Restrict label expressions to a basic character set (latin letters, numbers, commonly used symbols)
- `DB_MIGRATE`: `true|false|null` - Run DB migration on startup
- `RPC_TOKEN`: `{string}|null` - Management API token
- `INGESTER_MAX_LABELS`: `{int|null}` - Max number of labels per ingested stream (it's shared between `stream` and `stream.entry[idx]`; having multiple entries doesn't exhaust this quota)
Expand Down
91 changes: 91 additions & 0 deletions service/forwarder/loki/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package loki

import (
"log/slog"
"strings"
"unicode"
)

func filterLabelFormat(labels map[string]string) {

for key, value := range labels {

if stripped := stripLabelKey(key); stripped == "" {

slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (illformed key)",
slog.String("key", key))

delete(labels, key)
continue

} else if stripped != key {

slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label moved (illformed key)",
slog.String("key", key),
slog.String("new_key", stripped))

delete(labels, key)
key = stripped
labels[key] = value
}

if stripped := stripLabelValue(value); stripped == "" {

slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (stripped value is empty)",
slog.String("key", key))

delete(labels, key)

} else if stripped != value {

slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label value stripped",
slog.String("key", key))

labels[key] = stripped
}
}
}

func stripLabelKey(key string) string {

var stripped string

for _, next := range key {

switch next {
case '_', '-', '+', '*', '=':
stripped += "_"
continue
}

switch {
case
(next >= 'A' && next <= 'Z'),
(next >= 'a' && next <= 'z'),
(next >= '0' && next <= '9'):
stripped += string(next)
continue
}
}

return strings.TrimSpace(stripped)
}

func stripLabelValue(key string) string {

var stripped string

for _, next := range key {

switch {
case next == '\\':
stripped += "/"
case unicode.IsPrint(next):
stripped += string(next)
default:
stripped += "?"
}
}

return stripped
}
24 changes: 14 additions & 10 deletions service/forwarder/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"net/http"
"net/url"
"os"
"strings"

"github.com/google/uuid"
Expand All @@ -17,12 +16,17 @@ import (
)

type Loki struct {
url string
UseStructMeta bool
LokiOptions
url string
}

type LokiOptions struct {
Retries int
UseStructMeta bool
StrictLabels bool
}

func ParseLokiUrl(params string) (*Loki, error) {
func ParseLokiUrl(params string, opts LokiOptions) (*Loki, error) {

if params = strings.TrimSpace(params); params == "" {
return nil, nil
Expand All @@ -47,8 +51,8 @@ func ParseLokiUrl(params string) (*Loki, error) {
}

return &Loki{
url: lokiUrl.String(),
UseStructMeta: strings.ToLower(os.Getenv("LOKI_STRUCTURED_METADATA")) != "false",
LokiOptions: opts,
url: lokiUrl.String(),
}, nil
}

Expand Down Expand Up @@ -92,7 +96,7 @@ func (this *Loki) retryNumber() int {

func (this *Loki) PushStreams(streams []LokiStream) error {

payload, err := lokiSerializeStreams(streams)
payload, err := this.serializeStreams(streams)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,11 +140,11 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd

var streams []LokiStream
if this.UseStructMeta {
if next := webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 {
if next := this.webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 {
streams = []LokiStream{next}
}
} else {
streams = webStreamToLabeled(payload, streamSource, txID)
streams = this.webStreamToLabeled(payload, streamSource, txID)
}

if len(streams) == 0 {
Expand Down Expand Up @@ -171,7 +175,7 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd
slog.String("remote_addr", remoteAddr))
}

func lokiSerializeStreams(streams []LokiStream) (*bytes.Buffer, error) {
func (this *Loki) serializeStreams(streams []LokiStream) (*bytes.Buffer, error) {

payload := LokiHttpBatch{Streams: streams}

Expand Down
19 changes: 16 additions & 3 deletions service/forwarder/loki/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/maddsua/logpush/service/logdata"
)

func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream {
func (this *Loki) webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream {

baseLabels := map[string]string{
"logpush_source": "web",
Expand All @@ -34,6 +34,10 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx
logdata.CopyMetaFields(labels, entry.Meta)
labels["detected_level"] = entry.Level.String()

if this.StrictLabels {
filterLabelFormat(labels)
}

result = append(result, LokiStream{
Stream: labels,
Values: [][]any{
Expand All @@ -48,7 +52,7 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx
return result
}

func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream {
func (this *Loki) webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream {

labels := map[string]string{
"logpush_source": "web",
Expand Down Expand Up @@ -83,19 +87,28 @@ func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream,

meta := map[string]string{}
maps.Copy(meta, metaFields)
meta["detected_level"] = entry.Level.String()

if entry.Meta != nil {
maps.Copy(meta, entry.Meta)
}

meta["detected_level"] = entry.Level.String()

if this.StrictLabels {
filterLabelFormat(meta)
}

streamValues = append(streamValues, []any{
entry.Date.String(idx),
entry.Message,
meta,
})
}

if this.StrictLabels {
filterLabelFormat(labels)
}

return LokiStream{
Stream: labels,
Values: streamValues,
Expand Down
19 changes: 15 additions & 4 deletions service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
}

if strings.ToLower(os.Getenv("DEBUG")) == "true" {
if envBool("DEBUG") {
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Debug("Logging enabled")
}
Expand All @@ -64,7 +64,10 @@ func main() {

slog.Info("STARTUP: DB connection OK")

lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL"))
lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL"), loki.LokiOptions{
UseStructMeta: envBoolNf("LOKI_STRUCTURED_METADATA"),
StrictLabels: envBoolNf("LOKI_STRICT_LABELS"),
})
if err != nil {
slog.Error("STARTUP: Unable to parse LOKI_HOST",
slog.String("err", err.Error()))
Expand All @@ -85,7 +88,7 @@ func main() {
slog.Info("STARTUP: Loki not configured. Using Timescale/Postgres")
}

if strings.ToLower(os.Getenv("DB_MIGRATE")) == "true" {
if envBool("DB_MIGRATEDEBUG") {

slog.Info("STARTUP: Running DB migrations")

Expand All @@ -109,7 +112,7 @@ func main() {
MaxLabelLen: envInt("INGESTER_MAX_LABEL_LEN"),
MaxMessages: envInt("INGESTER_MAX_MESSAGES"),
MaxMessageLen: envInt("INGESTER_MAX_MESSAGE_LEN"),
KeepEmptyLabels: strings.ToLower(os.Getenv("INGESTER_KEEP_EMPTY_LABELS")) != "false",
KeepEmptyLabels: envBoolNf("INGESTER_KEEP_EMPTY_LABELS"),
},
}

Expand Down Expand Up @@ -218,3 +221,11 @@ func envInt(name string) int {

return val
}

func envBool(name string) bool {
return strings.ToLower(os.Getenv(name)) == "true"
}

func envBoolNf(name string) bool {
return strings.ToLower(os.Getenv(name)) != "false"
}

0 comments on commit 0a658c9

Please sign in to comment.