diff --git a/heartbeat/monitors/wrappers/summarizer/plugdrop.go b/heartbeat/monitors/wrappers/summarizer/plugdrop.go index fff6c143bf02..a4ddc61abe7d 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugdrop.go +++ b/heartbeat/monitors/wrappers/summarizer/plugdrop.go @@ -43,3 +43,7 @@ func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryAc func (d DropBrowserExtraEvents) BeforeRetry() { // noop } + +func (d DropBrowserExtraEvents) BeforeEachEvent(event *beat.Event) { + // noop +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugerr.go b/heartbeat/monitors/wrappers/summarizer/plugerr.go index 1010370f520c..83ab6de4f5ab 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugerr.go +++ b/heartbeat/monitors/wrappers/summarizer/plugerr.go @@ -46,6 +46,8 @@ func NewBrowserErrPlugin() *BrowserErrPlugin { } } +func (esp *BrowserErrPlugin) BeforeEachEvent(event *beat.Event) {} // noop + func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { // track these to determine if the journey // needs an error injected due to incompleteness @@ -127,6 +129,10 @@ func (esp *LightweightErrPlugin) BeforeRetry() { // noop } +func (esp *LightweightErrPlugin) BeforeEachEvent(event *beat.Event) { + // noop +} + // errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise func errToFieldVal(eventErr error) (errVal interface{}) { var asECS *ecserr.ECSErr diff --git a/heartbeat/monitors/wrappers/summarizer/plugmondur.go b/heartbeat/monitors/wrappers/summarizer/plugmondur.go index 171e3870269b..78627b8c79da 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugmondur.go +++ b/heartbeat/monitors/wrappers/summarizer/plugmondur.go @@ -31,11 +31,15 @@ type LightweightDurationPlugin struct { } func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { - // Effectively only runs once, on the first event + return 0 // noop +} + +func (lwdsp *LightweightDurationPlugin) BeforeEachEvent(event *beat.Event) { + // Effectively capture on the first event, on the first event if lwdsp.startedAt == nil { - lwdsp.setEventStartAt() + now := time.Now() + lwdsp.startedAt = &now } - return 0 } func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { @@ -44,15 +48,10 @@ func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeS } func (lwdsp *LightweightDurationPlugin) BeforeRetry() { - // Reset event.startAt + // Reset event start time lwdsp.startedAt = nil } -func (lwdsp *LightweightDurationPlugin) setEventStartAt() { - now := time.Now() - lwdsp.startedAt = &now -} - // BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field // for browser monitors. type BrowserDurationPlugin struct { @@ -89,4 +88,5 @@ func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSumma return 0 } -func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} +func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} +func (bwdsp *BrowserDurationPlugin) BeforeEachEvent(event *beat.Event) {} // noop diff --git a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go index 4acfee4dc361..cf7e90af5f30 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go +++ b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go @@ -74,6 +74,8 @@ func (ssp *BrowserStateStatusPlugin) BeforeRetry() { ssp.cssp.BeforeRetry() } +func (ssp *BrowserStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} //noop + // LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary, // those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` type LightweightStateStatusPlugin struct { @@ -113,6 +115,8 @@ func (ssp *LightweightStateStatusPlugin) BeforeRetry() { ssp.cssp.BeforeRetry() } +func (ssp *LightweightStateStatusPlugin) BeforeEachEvent(event *beat.Event) {} // noop + type commonSSP struct { js *jobsummary.JobSummary stateTracker *monitorstate.Tracker diff --git a/heartbeat/monitors/wrappers/summarizer/plugurl.go b/heartbeat/monitors/wrappers/summarizer/plugurl.go index dc4394aa42ad..e47463575a31 100644 --- a/heartbeat/monitors/wrappers/summarizer/plugurl.go +++ b/heartbeat/monitors/wrappers/summarizer/plugurl.go @@ -52,3 +52,5 @@ func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActi func (busp *BrowserURLPlugin) BeforeRetry() { busp.urlFields = nil } + +func (busp *BrowserURLPlugin) BeforeEachEvent(event *beat.Event) {} //noop diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index cca3fada663a..ad0902d45af7 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -42,6 +42,12 @@ type Summarizer struct { startedAt time.Time } +func (s Summarizer) beforeEachEvent(event *beat.Event) { + for _, plugin := range s.plugins { + plugin.BeforeEachEvent(event) + } +} + // EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback type EachEventActions uint8 @@ -58,6 +64,9 @@ const RetryBeforeSummary = 1 // in one location. Prior to this code was strewn about a bit more and following it was // a bit trickier. type SummarizerPlugin interface { + // BeforeEachEvent is called on each event, and allows for the mutation of events + // before monitor execution + BeforeEachEvent(event *beat.Event) // EachEvent is called on each event, and allows for the mutation of events EachEvent(event *beat.Event, err error) EachEventActions // BeforeSummary is run on the final (summary) event for each monitor. @@ -106,6 +115,10 @@ func (s *Summarizer) setupPlugins() { // This adds the state and summary top level fields. func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { + + // call BeforeEachEvent for each plugin before running job + s.beforeEachEvent(event) + conts, eventErr := j(event) s.mtx.Lock() @@ -149,10 +162,10 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { for _, p := range s.plugins { p.BeforeRetry() } - return s.rootJob(event) + return s.Wrap(s.rootJob)(event) } - conts = []jobs.Job{delayedRootJob} + return []jobs.Job{delayedRootJob}, eventErr } } diff --git a/x-pack/heartbeat/heartbeat.yml b/x-pack/heartbeat/heartbeat.yml index d11f77e8b723..4e638690721b 100644 --- a/x-pack/heartbeat/heartbeat.yml +++ b/x-pack/heartbeat/heartbeat.yml @@ -23,7 +23,7 @@ heartbeat.config.monitors: heartbeat.monitors: - type: http # Set enabled to true (or delete the following line) to enable this monitor - enabled: true + enabled: false # ID used to uniquely identify this monitor in Elasticsearch even if the config changes id: my-monitor # Human readable display name for this service in Uptime UI and elsewhere