Skip to content

Commit

Permalink
Add ingester limits (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddsua authored Feb 4, 2025
1 parent a32184b commit 4350da5
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 6 deletions.
126 changes: 125 additions & 1 deletion service/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ type Ingester struct {
Loki *loki.Loki
Timescale *timescale.Timescale
StreamCache *StreamCache
Opts IngesterOptions
}

type IngesterOptions struct {
MaxLabels int
MaxLabelNameLen int
MaxLabelLen int
MaxMessages int
MaxMessageLen int
KeepEmptyLabels bool
}

func (this *Ingester) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -82,6 +92,97 @@ func (this *Ingester) HandleRequest(req *http.Request) error {
return errors.New("service not found")
}

var elipsis = func(token string, maxLen int) (string, int) {

if maxLen <= 0 {
return "", 0
}

if maxLen <= len(token) {
return token, 0
}

return token, 3
}

var streamLabelsCount int

var truncateLabels = func(labels map[string]string, isStream bool) {

var discardWarn bool
var count int

if !isStream {
count = streamLabelsCount
}

nameTrunkToken, nameTrunkTrim := elipsis("___", this.Opts.MaxLabelNameLen)
valueTruncToken, valueTruncTrim := elipsis("...", this.Opts.MaxLabelLen)

for key, val := range labels {

if this.Opts.MaxLabels > 0 && count >= this.Opts.MaxLabels {

if !discardWarn {

if isStream {
slog.Warn("WEB STREAM: Discard excess stream labels",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(labels)-this.Opts.MaxLabels),
slog.Int("max", this.Opts.MaxLabels),
slog.String("remote_addr", req.RemoteAddr))
} else {
slog.Warn("WEB STREAM: Discard excess entry labels",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(labels)-(this.Opts.MaxLabels-streamLabelsCount)),
slog.Int("max", this.Opts.MaxLabels-streamLabelsCount),
slog.String("remote_addr", req.RemoteAddr))
}

discardWarn = true
}

delete(labels, key)
continue
}

if this.Opts.MaxLabelNameLen > 0 && len(key) > this.Opts.MaxLabelNameLen {

slog.Warn("WEB STREAM: Label name truncated",
slog.String("stream_id", logStream.ID.String()),
slog.String("label", key),
slog.String("remote_addr", req.RemoteAddr))

// reset record with a truncated key
delete(labels, key)
key = key[:this.Opts.MaxLabelNameLen-nameTrunkTrim] + nameTrunkToken
labels[key] = val
}

val = strings.TrimSpace(val)
if val == "" {

if !this.Opts.KeepEmptyLabels {
delete(labels, key)
continue
}

val = "[null]"
labels[key] = val
}

if this.Opts.MaxLabelLen > 0 && len(val) > this.Opts.MaxLabelLen {
labels[key] = val[:this.Opts.MaxLabelLen-valueTruncTrim] + valueTruncToken
}

count++
}

if isStream {
streamLabelsCount = count
}
}

contentType := req.Header.Get("content-type")
switch {
case strings.Contains(contentType, "json"):
Expand All @@ -95,11 +196,34 @@ func (this *Ingester) HandleRequest(req *http.Request) error {
return errors.New("invalid batch payload")
}

slog.Debug("WEB STREAM: Ingesting entries",
slog.Debug("WEB STREAM: Ingest entries",
slog.Int("count", len(payload.Entries)),
slog.String("stream_id", logStream.ID.String()),
slog.String("remote_addr", req.RemoteAddr))

truncateLabels(payload.Meta, true)

if this.Opts.MaxMessages > 0 && len(payload.Entries) > this.Opts.MaxMessages {

slog.Warn("WEB STREAM: Discard excess entries",
slog.String("stream_id", logStream.ID.String()),
slog.Int("count", len(payload.Entries)-this.Opts.MaxMessages),
slog.String("remote_addr", req.RemoteAddr))

payload.Entries = payload.Entries[:this.Opts.MaxMessages]
}

msgTruncToken, msgTruncTrim := elipsis("...", this.Opts.MaxMessageLen)

for idx, entry := range payload.Entries {

truncateLabels(payload.Entries[idx].Meta, false)

if this.Opts.MaxMessageLen > 0 && len(entry.Message) > this.Opts.MaxMessageLen {
payload.Entries[idx].Message = entry.Message[:this.Opts.MaxMessageLen-msgTruncTrim] + msgTruncToken
}
}

txID := uuid.New()

if this.Loki != nil {
Expand Down
5 changes: 2 additions & 3 deletions service/ingester/streams/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package streams
import "github.com/maddsua/logpush/service/logdata"

type WebStream struct {
ServiceID string `json:"service_id"`
Meta map[string]string `json:"meta"`
Entries []WebLogEntry `json:"entries"`
Meta map[string]string `json:"meta"`
Entries []WebLogEntry `json:"entries"`
}

type WebLogEntry struct {
Expand Down
1 change: 0 additions & 1 deletion service/logdata/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (um UnixMilli) Time(sequence int) time.Time {
ts = time.Now()
}

// todo: ensure correct result
return ts.Add(time.Duration(sequence))
}

Expand Down
25 changes: 24 additions & 1 deletion service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
}

if err := dbconn.Ping(); err != nil {
slog.Error("STARTUP: Unable to open DB connection",
slog.Error("STARTUP: Unable to connect to the DB",
slog.String("err", err.Error()))
os.Exit(1)
}
Expand Down Expand Up @@ -103,6 +103,14 @@ func main() {
DB: dbops.New(dbconn),
Timescale: &timescale.Timescale{DB: dbconn},
StreamCache: ingester.NewStreamCache(),
Opts: ingester.IngesterOptions{
MaxLabels: envInt("INGESTER_MAX_LABELS"),
MaxLabelNameLen: envInt("INGESTER_MAX_LABEL_NAME_LEN"),
MaxLabelLen: envInt("INGESTER_MAX_LABEL_LEN"),
MaxMessages: envInt("INGESTER_MAX_MESSAGES"),
MaxMessageLen: envInt("INGESTER_MAX_MESSAGE_LEN"),
KeepEmptyLabels: strings.ToLower(os.Getenv("INGESTER_KEEP_EMPTY_LABELS")) != "false",
},
}

mux.HandleFunc("POST /push/stream/{id}", func(writer http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -195,3 +203,18 @@ func rootMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(writer, req)
})
}

func envInt(name string) int {

envVal := os.Getenv(name)
if envVal == "" {
return 0
}

val, err := strconv.Atoi(envVal)
if err != nil {
return 0
}

return val
}

0 comments on commit 4350da5

Please sign in to comment.