Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch: loki strict labels #9

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/tests/main.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const fakeHandler = async (request: Request, waitUntil: (task: Promise<any>) =>
const fakeProcedure = async (_request: Request, logger: Logger): Promise<Response> => {

logger.info('Processing lead form request');
logger.error('Some important API is down, rerouting', { api_down: 'catpics' });
logger.error('Some important API is down, rerouting', { api_down: 'catpics', sus_field: 'ok\0;drop table;' });
logger.warn('Rerouting to an internal API', { timeout: 3600 });
logger.info('Recaptcha OK', { score: 0.9 });

Expand Down
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Use the docker image and you're all set.
- `DATABASE_URL`: `{string}` - PostgreSQL/TImescaleDB database URL
- `LOKI_URL`: `{string}|null` - Loki HTTP API url (only schema and host are used)
- `LOKI_STRUCTURED_METADATA`: `true|false|null` - Use loki structured metadata instead of labels for everything
- `LOKI_STRICT_LABELS`: `true|false|null` - Restrict label expressions to a basic character set (latin letters, numbers, commonly used symbols)
- `DB_MIGRATE`: `true|false|null` - Run DB migration on startup
- `RPC_TOKEN`: `{string}|null` - Management API token
- `INGESTER_MAX_LABELS`: `{int|null}` - Max number of labels per ingested stream (it's shared between `stream` and `stream.entry[idx]`; having multiple entries doesn't exhaust this quota)
Expand Down
91 changes: 91 additions & 0 deletions service/forwarder/loki/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package loki

import (
"log/slog"
"strings"
"unicode"
)

func filterLabelFormat(labels map[string]string) {

for key, value := range labels {

if stripped := stripLabelKey(key); stripped == "" {

slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (illformed key)",
slog.String("key", key))

delete(labels, key)
continue

} else if stripped != key {

slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label moved (illformed key)",
slog.String("key", key),
slog.String("new_key", stripped))

delete(labels, key)
key = stripped
labels[key] = value
}

if stripped := stripLabelValue(value); stripped == "" {

slog.Error("FORWARDER: LOKI: LABEL FILTER: Label removed (stripped value is empty)",
slog.String("key", key))

delete(labels, key)

} else if stripped != value {

slog.Warn("FORWARDER: LOKI: LABEL FILTER: Label value stripped",
slog.String("key", key))

labels[key] = stripped
}
}
}

func stripLabelKey(key string) string {

var stripped string

for _, next := range key {

switch next {
case '_', '-', '+', '*', '=':
stripped += "_"
continue
}

switch {
case
(next >= 'A' && next <= 'Z'),
(next >= 'a' && next <= 'z'),
(next >= '0' && next <= '9'):
stripped += string(next)
continue
}
}

return strings.TrimSpace(stripped)
}

func stripLabelValue(key string) string {

var stripped string

for _, next := range key {

switch {
case next == '\\':
stripped += "/"
case unicode.IsPrint(next):
stripped += string(next)
default:
stripped += "?"
}
}

return stripped
}
24 changes: 14 additions & 10 deletions service/forwarder/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"net/http"
"net/url"
"os"
"strings"

"github.com/google/uuid"
Expand All @@ -17,12 +16,17 @@ import (
)

type Loki struct {
url string
UseStructMeta bool
LokiOptions
url string
}

type LokiOptions struct {
Retries int
UseStructMeta bool
StrictLabels bool
}

func ParseLokiUrl(params string) (*Loki, error) {
func ParseLokiUrl(params string, opts LokiOptions) (*Loki, error) {

if params = strings.TrimSpace(params); params == "" {
return nil, nil
Expand All @@ -47,8 +51,8 @@ func ParseLokiUrl(params string) (*Loki, error) {
}

return &Loki{
url: lokiUrl.String(),
UseStructMeta: strings.ToLower(os.Getenv("LOKI_STRUCTURED_METADATA")) != "false",
LokiOptions: opts,
url: lokiUrl.String(),
}, nil
}

Expand Down Expand Up @@ -92,7 +96,7 @@ func (this *Loki) retryNumber() int {

func (this *Loki) PushStreams(streams []LokiStream) error {

payload, err := lokiSerializeStreams(streams)
payload, err := this.serializeStreams(streams)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,11 +140,11 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd

var streams []LokiStream
if this.UseStructMeta {
if next := webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 {
if next := this.webStreamToStructured(payload, streamSource, txID); len(next.Values) > 0 {
streams = []LokiStream{next}
}
} else {
streams = webStreamToLabeled(payload, streamSource, txID)
streams = this.webStreamToLabeled(payload, streamSource, txID)
}

if len(streams) == 0 {
Expand Down Expand Up @@ -171,7 +175,7 @@ func (this *Loki) IngestWeb(streamSource *dbops.Stream, txID uuid.UUID, remoteAd
slog.String("remote_addr", remoteAddr))
}

func lokiSerializeStreams(streams []LokiStream) (*bytes.Buffer, error) {
func (this *Loki) serializeStreams(streams []LokiStream) (*bytes.Buffer, error) {

payload := LokiHttpBatch{Streams: streams}

Expand Down
19 changes: 16 additions & 3 deletions service/forwarder/loki/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/maddsua/logpush/service/logdata"
)

func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream {
func (this *Loki) webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) []LokiStream {

baseLabels := map[string]string{
"logpush_source": "web",
Expand All @@ -34,6 +34,10 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx
logdata.CopyMetaFields(labels, entry.Meta)
labels["detected_level"] = entry.Level.String()

if this.StrictLabels {
filterLabelFormat(labels)
}

result = append(result, LokiStream{
Stream: labels,
Values: [][]any{
Expand All @@ -48,7 +52,7 @@ func webStreamToLabeled(logStream *streams.WebStream, instance *dbops.Stream, tx
return result
}

func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream {
func (this *Loki) webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream, txID uuid.UUID) LokiStream {

labels := map[string]string{
"logpush_source": "web",
Expand Down Expand Up @@ -83,19 +87,28 @@ func webStreamToStructured(logStream *streams.WebStream, instance *dbops.Stream,

meta := map[string]string{}
maps.Copy(meta, metaFields)
meta["detected_level"] = entry.Level.String()

if entry.Meta != nil {
maps.Copy(meta, entry.Meta)
}

meta["detected_level"] = entry.Level.String()

if this.StrictLabels {
filterLabelFormat(meta)
}

streamValues = append(streamValues, []any{
entry.Date.String(idx),
entry.Message,
meta,
})
}

if this.StrictLabels {
filterLabelFormat(labels)
}

return LokiStream{
Stream: labels,
Values: streamValues,
Expand Down
19 changes: 15 additions & 4 deletions service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
}

if strings.ToLower(os.Getenv("DEBUG")) == "true" {
if envBool("DEBUG") {
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Debug("Logging enabled")
}
Expand All @@ -64,7 +64,10 @@ func main() {

slog.Info("STARTUP: DB connection OK")

lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL"))
lokiConn, err := loki.ParseLokiUrl(os.Getenv("LOKI_URL"), loki.LokiOptions{
UseStructMeta: envBoolNf("LOKI_STRUCTURED_METADATA"),
StrictLabels: envBoolNf("LOKI_STRICT_LABELS"),
})
if err != nil {
slog.Error("STARTUP: Unable to parse LOKI_HOST",
slog.String("err", err.Error()))
Expand All @@ -85,7 +88,7 @@ func main() {
slog.Info("STARTUP: Loki not configured. Using Timescale/Postgres")
}

if strings.ToLower(os.Getenv("DB_MIGRATE")) == "true" {
if envBool("DB_MIGRATEDEBUG") {

slog.Info("STARTUP: Running DB migrations")

Expand All @@ -109,7 +112,7 @@ func main() {
MaxLabelLen: envInt("INGESTER_MAX_LABEL_LEN"),
MaxMessages: envInt("INGESTER_MAX_MESSAGES"),
MaxMessageLen: envInt("INGESTER_MAX_MESSAGE_LEN"),
KeepEmptyLabels: strings.ToLower(os.Getenv("INGESTER_KEEP_EMPTY_LABELS")) != "false",
KeepEmptyLabels: envBoolNf("INGESTER_KEEP_EMPTY_LABELS"),
},
}

Expand Down Expand Up @@ -218,3 +221,11 @@ func envInt(name string) int {

return val
}

func envBool(name string) bool {
return strings.ToLower(os.Getenv(name)) == "true"
}

func envBoolNf(name string) bool {
return strings.ToLower(os.Getenv(name)) != "false"
}