Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingester limits #8

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion service/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ type Ingester struct {
Loki *loki.Loki
Timescale *timescale.Timescale
StreamCache *StreamCache
Opts IngesterOptions
}

type IngesterOptions struct {
MaxLabels int
MaxLabelNameLen int
MaxLabelLen int
MaxMessages int
MaxMessageLen int
KeepEmptyLabels bool
}

func (this *Ingester) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -82,6 +92,97 @@ func (this *Ingester) HandleRequest(req *http.Request) error {
return errors.New("service not found")
}

var elipsis = func(token string, maxLen int) (string, int) {

if maxLen <= 0 {
return "", 0
}

if maxLen <= len(token) {
return token, 0
}

return token, 3
}

var streamLabelsCount int

var truncateLabels = func(labels map[string]string, isStream bool) {

var discardWarn bool
var count int

if !isStream {
count = streamLabelsCount
}

nameTrunkToken, nameTrunkTrim := elipsis("___", this.Opts.MaxLabelNameLen)
valueTruncToken, valueTruncTrim := elipsis("...", this.Opts.MaxLabelLen)

for key, val := range labels {

if this.Opts.MaxLabels > 0 && count >= this.Opts.MaxLabels {

if !discardWarn {

if isStream {
slog.Warn("WEB STREAM: Discard excess stream labels",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(labels)-this.Opts.MaxLabels),
slog.Int("max", this.Opts.MaxLabels),
slog.String("remote_addr", req.RemoteAddr))
} else {
slog.Warn("WEB STREAM: Discard excess entry labels",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(labels)-(this.Opts.MaxLabels-streamLabelsCount)),
slog.Int("max", this.Opts.MaxLabels-streamLabelsCount),
slog.String("remote_addr", req.RemoteAddr))
}

discardWarn = true
}

delete(labels, key)
continue
}

if this.Opts.MaxLabelNameLen > 0 && len(key) > this.Opts.MaxLabelNameLen {

slog.Warn("WEB STREAM: Label name truncated",
slog.String("stream_id", logStream.ID.String()),
slog.String("label", key),
slog.String("remote_addr", req.RemoteAddr))

// reset record with a truncated key
delete(labels, key)
key = key[:this.Opts.MaxLabelNameLen-nameTrunkTrim] + nameTrunkToken
labels[key] = val
}

val = strings.TrimSpace(val)
if val == "" {

if !this.Opts.KeepEmptyLabels {
delete(labels, key)
continue
}

val = "[null]"
labels[key] = val
}

if this.Opts.MaxLabelLen > 0 && len(val) > this.Opts.MaxLabelLen {
labels[key] = val[:this.Opts.MaxLabelLen-valueTruncTrim] + valueTruncToken
}

count++
}

if isStream {
streamLabelsCount = count
}
}

contentType := req.Header.Get("content-type")
switch {
case strings.Contains(contentType, "json"):
Expand All @@ -95,11 +196,34 @@ func (this *Ingester) HandleRequest(req *http.Request) error {
return errors.New("invalid batch payload")
}

slog.Debug("WEB STREAM: Ingesting entries",
slog.Debug("WEB STREAM: Ingest entries",
slog.Int("count", len(payload.Entries)),
slog.String("stream_id", logStream.ID.String()),
slog.String("remote_addr", req.RemoteAddr))

truncateLabels(payload.Meta, true)

if this.Opts.MaxMessages > 0 && len(payload.Entries) > this.Opts.MaxMessages {

slog.Warn("WEB STREAM: Discard excess entries",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(payload.Entries)-this.Opts.MaxMessages),
slog.String("remote_addr", req.RemoteAddr))

payload.Entries = payload.Entries[:this.Opts.MaxMessages]
}

msgTruncToken, msgTruncTrim := elipsis("...", this.Opts.MaxMessageLen)

for idx, entry := range payload.Entries {

truncateLabels(payload.Entries[idx].Meta, false)

if this.Opts.MaxMessageLen > 0 && len(entry.Message) > this.Opts.MaxMessageLen {
payload.Entries[idx].Message = entry.Message[:this.Opts.MaxMessageLen-msgTruncTrim] + msgTruncToken
}
}

txID := uuid.New()

if this.Loki != nil {
Expand Down
5 changes: 2 additions & 3 deletions service/ingester/streams/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package streams
import "github.com/maddsua/logpush/service/logdata"

type WebStream struct {
ServiceID string `json:"service_id"`
Meta map[string]string `json:"meta"`
Entries []WebLogEntry `json:"entries"`
Meta map[string]string `json:"meta"`
Entries []WebLogEntry `json:"entries"`
}

type WebLogEntry struct {
Expand Down
1 change: 0 additions & 1 deletion service/logdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (um UnixMilli) Time(sequence int) time.Time {
ts = time.Now()
}

// todo: ensure correct result
return ts.Add(time.Duration(sequence))
}

Expand Down
25 changes: 24 additions & 1 deletion service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
}

if err := dbconn.Ping(); err != nil {
slog.Error("STARTUP: Unable to open DB connection",
slog.Error("STARTUP: Unable to connect to the DB",
slog.String("err", err.Error()))
os.Exit(1)
}
Expand Down Expand Up @@ -103,6 +103,14 @@ func main() {
DB: dbops.New(dbconn),
Timescale: &timescale.Timescale{DB: dbconn},
StreamCache: ingester.NewStreamCache(),
Opts: ingester.IngesterOptions{
MaxLabels: envInt("INGESTER_MAX_LABELS"),
MaxLabelNameLen: envInt("INGESTER_MAX_LABEL_NAME_LEN"),
MaxLabelLen: envInt("INGESTER_MAX_LABEL_LEN"),
MaxMessages: envInt("INGESTER_MAX_MESSAGES"),
MaxMessageLen: envInt("INGESTER_MAX_MESSAGE_LEN"),
KeepEmptyLabels: strings.ToLower(os.Getenv("INGESTER_KEEP_EMPTY_LABELS")) != "false",
},
}

mux.HandleFunc("POST /push/stream/{id}", func(writer http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -195,3 +203,18 @@ func rootMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(writer, req)
})
}

func envInt(name string) int {

envVal := os.Getenv(name)
if envVal == "" {
return 0
}

val, err := strconv.Atoi(envVal)
if err != nil {
return 0
}

return val
}