From 9e4c6665132b0412ece5a4aaa68b85caf823dbd9 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 5 Feb 2025 06:43:22 +1030 Subject: [PATCH] [8.16](backport #42442) x-pack/filebeat/input/httpjson: add metrics for number of events and pages published (#42588) * x-pack/filebeat/input/httpjson: add metrics for number of events and pages published (#42442) This is intended to match the events and batches published by the CEL input, but there is no concept of batches in HTTPJSON, the nearest being paging. (cherry picked from commit 71900c4d89fa3b29f8709bb46a61a9ad78a1e9c2) * remove irrelevant changelog entries --------- Co-authored-by: Dan Kortschak --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 2 ++ x-pack/filebeat/input/httpjson/input.go | 2 +- x-pack/filebeat/input/httpjson/metrics.go | 12 ++++++++++ x-pack/filebeat/input/httpjson/request.go | 24 +++++++++++-------- .../filebeat/input/httpjson/request_test.go | 2 +- 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9ecaceb8f50b..e04ab3a0ad6a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -202,6 +202,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update CEL mito extensions to v1.12.2. {pull}39755[39755] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] +- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index a7dd5d7634fa..f99c0f70fe26 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -1622,6 +1622,8 @@ observe the activity of the input. [options="header"] |======= | Metric | Description +| `events_published_total` | Total number of events published. +| `pages_published_total` | Total number of pages of event published. | `http_request_total` | Total number of processed requests. | `http_request_errors_total` | Total number of request errors. | `http_request_delete_total` | Total number of `DELETE` requests. diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index ad61aceff895..95ab66474e81 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -159,7 +159,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso } pagination := newPagination(cfg, client, log) responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, metrics, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(cfg.Cursor, log) diff --git a/x-pack/filebeat/input/httpjson/metrics.go b/x-pack/filebeat/input/httpjson/metrics.go index 4e0ba508c015..16ffd69d274b 100644 --- a/x-pack/filebeat/input/httpjson/metrics.go +++ b/x-pack/filebeat/input/httpjson/metrics.go @@ -19,6 +19,8 @@ type inputMetrics struct { intervalPages metrics.Sample // histogram of pages per interval intervals *monitoring.Uint // total number of intervals executed intervalErrs *monitoring.Uint // total number of interval errors + eventsPublished *monitoring.Uint // number of events published + pagesPublished *monitoring.Uint // number of pages of event published } func newInputMetrics(reg *monitoring.Registry) *inputMetrics { @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { out := &inputMetrics{ intervals: monitoring.NewUint(reg, "httpjson_interval_total"), intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"), + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + pagesPublished: monitoring.NewUint(reg, "pages_published_total"), intervalExecutionTime: metrics.NewUniformSample(1024), intervalPageExecutionTime: metrics.NewUniformSample(1024), intervalPages: metrics.NewUniformSample(1024), @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { return out } +func (m *inputMetrics) addEventsPublished(n uint64) { + if m == nil { + return + } + m.eventsPublished.Add(n) +} + func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) { if m == nil { return @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) { if m == nil { return } + m.pagesPublished.Add(1) m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds()) } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 160ac67fe9e6..a526e9f56af3 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - p := newPublisher(trCtx, publisher, true, r.log) + p := newPublisher(trCtx, publisher, true, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p) n = p.eventCount() continue @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values - p := newPublisher(trCtx, publisher, false, r.log) + p := newPublisher(trCtx, publisher, false, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p) n = p.eventCount() } else { @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) if rf.isChain { rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) } else { @@ -474,14 +474,16 @@ type requester struct { client *httpClient requestFactories []*requestFactory responseProcessors []*responseProcessor + metrics *inputMetrics log *logp.Logger } -func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester { +func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester { return &requester{ client: client, requestFactories: reqs, responseProcessors: resps, + metrics: metrics, log: log, } } @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra } resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) n += p.eventCount() } @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { // publisher is an event publication handler. type publisher struct { - trCtx *transformContext - pub inputcursor.Publisher - n int - log *logp.Logger + trCtx *transformContext + pub inputcursor.Publisher + n int + log *logp.Logger + metrics *inputMetrics } -func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher { if !publish { pub = nil } @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) { p.trCtx.updateLastEvent(msg) p.trCtx.updateCursor() + p.metrics.addEventsPublished(1) p.n++ } diff --git a/x-pack/filebeat/input/httpjson/request_test.go b/x-pack/filebeat/input/httpjson/request_test.go index 2bd3aab675a4..89e077b8ee69 100644 --- a/x-pack/filebeat/input/httpjson/request_test.go +++ b/x-pack/filebeat/input/httpjson/request_test.go @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) { pagination := newPagination(config, client, log) responseProcessor := newResponseProcessor(config, pagination, nil, nil, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, nil, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(config.Cursor, log)