From 284202950f92f0a39f1727d4553cb0802d0a1e0b Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 28 Jan 2025 12:56:48 +1030 Subject: [PATCH] x-pack/filebeat/input/httpjson: add metrics for number of events and pages published This is intended to match the events and batches published by the CEL input, but there is no concept of batches in HTTPJSON, the nearest being paging. --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 2 ++ x-pack/filebeat/input/httpjson/input.go | 2 +- x-pack/filebeat/input/httpjson/metrics.go | 12 ++++++++++ x-pack/filebeat/input/httpjson/request.go | 24 +++++++++++-------- .../filebeat/input/httpjson/request_test.go | 2 +- 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6fe4669fd840..44340c9d4346 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -411,6 +411,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804] - Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462] - Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567] +- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index fb246803411e..9f3a9ecdd813 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -1623,6 +1623,8 @@ observe the activity of the input. [options="header"] |======= | Metric | Description +| `events_published_total` | Total number of events published. +| `pages_published_total` | Total number of pages of event published. | `http_request_total` | Total number of processed requests. | `http_request_errors_total` | Total number of request errors. | `http_request_delete_total` | Total number of `DELETE` requests. diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index f7db8dc6794e..90f9b124b781 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -215,7 +215,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso } pagination := newPagination(cfg, client, log) responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, metrics, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(cfg.Cursor, log) diff --git a/x-pack/filebeat/input/httpjson/metrics.go b/x-pack/filebeat/input/httpjson/metrics.go index 4e0ba508c015..16ffd69d274b 100644 --- a/x-pack/filebeat/input/httpjson/metrics.go +++ b/x-pack/filebeat/input/httpjson/metrics.go @@ -19,6 +19,8 @@ type inputMetrics struct { intervalPages metrics.Sample // histogram of pages per interval intervals *monitoring.Uint // total number of intervals executed intervalErrs *monitoring.Uint // total number of interval errors + eventsPublished *monitoring.Uint // number of events published + pagesPublished *monitoring.Uint // number of pages of event published } func newInputMetrics(reg *monitoring.Registry) *inputMetrics { @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { out := &inputMetrics{ intervals: monitoring.NewUint(reg, "httpjson_interval_total"), intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"), + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + pagesPublished: monitoring.NewUint(reg, "pages_published_total"), intervalExecutionTime: metrics.NewUniformSample(1024), intervalPageExecutionTime: metrics.NewUniformSample(1024), intervalPages: metrics.NewUniformSample(1024), @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { return out } +func (m *inputMetrics) addEventsPublished(n uint64) { + if m == nil { + return + } + m.eventsPublished.Add(n) +} + func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) { if m == nil { return @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) { if m == nil { return } + m.pagesPublished.Add(1) m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds()) } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index fb847570826a..aef078dad200 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - p := newPublisher(trCtx, publisher, true, r.log) + p := newPublisher(trCtx, publisher, true, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p) n = p.eventCount() continue @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values - p := newPublisher(trCtx, publisher, false, r.log) + p := newPublisher(trCtx, publisher, false, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p) n = p.eventCount() } else { @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) if rf.isChain { rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) } else { @@ -474,14 +474,16 @@ type requester struct { client *httpClient requestFactories []*requestFactory responseProcessors []*responseProcessor + metrics *inputMetrics log *logp.Logger } -func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester { +func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester { return &requester{ client: client, requestFactories: reqs, responseProcessors: resps, + metrics: metrics, log: log, } } @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra } resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) n += p.eventCount() } @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { // publisher is an event publication handler. type publisher struct { - trCtx *transformContext - pub inputcursor.Publisher - n int - log *logp.Logger + trCtx *transformContext + pub inputcursor.Publisher + n int + log *logp.Logger + metrics *inputMetrics } -func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher { if !publish { pub = nil } @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) { p.trCtx.updateLastEvent(msg) p.trCtx.updateCursor() + p.metrics.addEventsPublished(1) p.n++ } diff --git a/x-pack/filebeat/input/httpjson/request_test.go b/x-pack/filebeat/input/httpjson/request_test.go index 2bd3aab675a4..89e077b8ee69 100644 --- a/x-pack/filebeat/input/httpjson/request_test.go +++ b/x-pack/filebeat/input/httpjson/request_test.go @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) { pagination := newPagination(config, client, log) responseProcessor := newResponseProcessor(config, pagination, nil, nil, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, nil, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(config.Cursor, log)