Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: add metrics for number of events and …
Browse files Browse the repository at this point in the history
…pages published (#42442)

This is intended to match the events and batches published by the CEL
input, but there is no concept of batches in HTTPJSON, the nearest being
paging.
  • Loading branch information
efd6 authored Feb 4, 2025
1 parent accc5e1 commit 71900c4
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
- Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567]
- The journald input is now generally available. {pull}42107[42107]
- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,8 @@ observe the activity of the input.
[options="header"]
|=======
| Metric | Description
| `events_published_total` | Total number of events published.
| `pages_published_total` | Total number of pages of event published.
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso
}
pagination := newPagination(cfg, client, log)
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, metrics, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(cfg.Cursor, log)
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/httpjson/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type inputMetrics struct {
intervalPages metrics.Sample // histogram of pages per interval
intervals *monitoring.Uint // total number of intervals executed
intervalErrs *monitoring.Uint // total number of interval errors
eventsPublished *monitoring.Uint // number of events published
pagesPublished *monitoring.Uint // number of pages of event published
}

func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
Expand All @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
out := &inputMetrics{
intervals: monitoring.NewUint(reg, "httpjson_interval_total"),
intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
pagesPublished: monitoring.NewUint(reg, "pages_published_total"),
intervalExecutionTime: metrics.NewUniformSample(1024),
intervalPageExecutionTime: metrics.NewUniformSample(1024),
intervalPages: metrics.NewUniformSample(1024),
Expand All @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
return out
}

func (m *inputMetrics) addEventsPublished(n uint64) {
if m == nil {
return
}
m.eventsPublished.Add(n)
}

func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) {
if m == nil {
return
Expand All @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) {
if m == nil {
return
}
m.pagesPublished.Add(1)
m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds())
}

Expand Down
24 changes: 14 additions & 10 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.log)
p := newPublisher(trCtx, publisher, true, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
Expand Down Expand Up @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.log)
p := newPublisher(trCtx, publisher, false, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
resps = intermediateResps
}

p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
} else {
Expand Down Expand Up @@ -474,14 +474,16 @@ type requester struct {
client *httpClient
requestFactories []*requestFactory
responseProcessors []*responseProcessor
metrics *inputMetrics
log *logp.Logger
}

func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester {
func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester {
return &requester{
client: client,
requestFactories: reqs,
responseProcessors: resps,
metrics: metrics,
log: log,
}
}
Expand Down Expand Up @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra
}
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
n += p.eventCount()
}
Expand Down Expand Up @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {

// publisher is an event publication handler.
type publisher struct {
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
metrics *inputMetrics
}

func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
Expand Down Expand Up @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {
p.trCtx.updateLastEvent(msg)
p.trCtx.updateCursor()

p.metrics.addEventsPublished(1)
p.n++
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) {
pagination := newPagination(config, client, log)
responseProcessor := newResponseProcessor(config, pagination, nil, nil, log)

requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, nil, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)
Expand Down

0 comments on commit 71900c4

Please sign in to comment.