Skip to content

Commit aa39888

Browse files
Add status reporting for Journald input (#42462) (#42561)
This commit adds the status reporting for the Journald input. It also adds a debug log to the `UpdateStatus` function from `v2.Context`. (cherry picked from commit 76f4086) Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
1 parent 5263646 commit aa39888

File tree

5 files changed

+96
-12
lines changed

5 files changed

+96
-12
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
329329
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
330330
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
331331
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
332+
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
332333

333334
*Auditbeat*
334335

filebeat/input/journald/environment_test.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3333
"github.com/elastic/beats/v7/libbeat/beat"
3434
"github.com/elastic/beats/v7/libbeat/common/acker"
35+
"github.com/elastic/beats/v7/libbeat/management/status"
3536
"github.com/elastic/beats/v7/libbeat/statestore"
3637
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
3738
conf "github.com/elastic/elastic-agent-libs/config"
@@ -40,10 +41,11 @@ import (
4041
)
4142

4243
type inputTestingEnvironment struct {
43-
t *testing.T
44-
workingDir string
45-
stateStore *testInputStore
46-
pipeline *mockPipelineConnector
44+
t *testing.T
45+
workingDir string
46+
stateStore *testInputStore
47+
pipeline *mockPipelineConnector
48+
statusReporter *mockStatusReporter
4749

4850
pluginInitOnce sync.Once
4951
plugin v2.Plugin
@@ -54,10 +56,11 @@ type inputTestingEnvironment struct {
5456

5557
func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
5658
return &inputTestingEnvironment{
57-
t: t,
58-
workingDir: t.TempDir(),
59-
stateStore: openTestStatestore(),
60-
pipeline: &mockPipelineConnector{},
59+
t: t,
60+
workingDir: t.TempDir(),
61+
stateStore: openTestStatestore(),
62+
pipeline: &mockPipelineConnector{},
63+
statusReporter: &mockStatusReporter{},
6164
}
6265
}
6366

@@ -95,7 +98,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
9598
}
9699
}()
97100

98-
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx}
101+
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, StatusReporter: e.statusReporter}
99102
if err := inp.Run(inputCtx, e.pipeline); err != nil {
100103
e.t.Errorf("input 'Run' method returned an error: %s", err)
101104
}
@@ -125,6 +128,25 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
125128
}, 5*time.Second, 10*time.Millisecond, &msg)
126129
}
127130

131+
func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {
132+
t := e.t
133+
t.Helper()
134+
got := e.statusReporter.GetUpdates()
135+
if len(got) != len(expected) {
136+
t.Fatalf("expecting %d updates, got %d", len(expected), len(got))
137+
}
138+
139+
for i := range expected {
140+
g, e := got[i], expected[i]
141+
if g != e {
142+
t.Errorf(
143+
"expecting [%d] status update to be {state:%s, msg:%s}, got {state:%s, msg:%s}",
144+
i, e.state.String(), e.msg, g.state.String(), g.msg,
145+
)
146+
}
147+
}
148+
}
149+
128150
type testInputStore struct {
129151
registry *statestore.Registry
130152
}
@@ -251,3 +273,25 @@ func blockingACKer(starter context.Context) beat.EventListener {
251273
}
252274
})
253275
}
276+
277+
type statusUpdate struct {
278+
state status.Status
279+
msg string
280+
}
281+
282+
type mockStatusReporter struct {
283+
mutex sync.RWMutex
284+
updates []statusUpdate
285+
}
286+
287+
func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) {
288+
m.mutex.Lock()
289+
m.updates = append(m.updates, statusUpdate{status, msg})
290+
m.mutex.Unlock()
291+
}
292+
293+
func (m *mockStatusReporter) GetUpdates() []statusUpdate {
294+
m.mutex.RLock()
295+
defer m.mutex.RUnlock()
296+
return append([]statusUpdate{}, m.updates...)
297+
}

filebeat/input/journald/input.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
input "github.com/elastic/beats/v7/filebeat/input/v2"
3030
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
3131
"github.com/elastic/beats/v7/libbeat/feature"
32+
"github.com/elastic/beats/v7/libbeat/management/status"
3233
"github.com/elastic/beats/v7/libbeat/reader"
3334
"github.com/elastic/beats/v7/libbeat/reader/parser"
3435
conf "github.com/elastic/elastic-agent-libs/config"
@@ -154,6 +155,8 @@ func (inp *journald) Run(
154155
logger := ctx.Logger.
155156
With("path", src.Name()).
156157
With("input_id", inp.ID)
158+
159+
ctx.UpdateStatus(status.Starting, "Starting")
157160
currentCheckpoint := initCheckpoint(logger, cursor)
158161

159162
mode := inp.Seek
@@ -173,7 +176,9 @@ func (inp *journald) Run(
173176
journalctl.Factory,
174177
)
175178
if err != nil {
176-
return fmt.Errorf("could not start journal reader: %w", err)
179+
wrappedErr := fmt.Errorf("could not start journal reader: %w", err)
180+
ctx.UpdateStatus(status.Failed, wrappedErr.Error())
181+
return wrappedErr
177182
}
178183

179184
defer reader.Close()
@@ -186,6 +191,7 @@ func (inp *journald) Run(
186191
saveRemoteHostname: inp.SaveRemoteHostname,
187192
})
188193

194+
ctx.UpdateStatus(status.Running, "Running")
189195
for {
190196
entry, err := parser.Next()
191197
if err != nil {
@@ -197,14 +203,18 @@ func (inp *journald) Run(
197203
case errors.Is(err, journalctl.ErrRestarting):
198204
continue
199205
default:
200-
logger.Errorf("could not read event: %s", err)
206+
msg := fmt.Sprintf("could not read event: %s", err)
207+
ctx.UpdateStatus(status.Failed, msg)
208+
logger.Error(msg)
201209
return err
202210
}
203211
}
204212

205213
event := entry.ToEvent()
206214
if err := publisher.Publish(event, event.Private); err != nil {
207-
logger.Errorf("could not publish event: %s", err)
215+
msg := fmt.Sprintf("could not publish event: %s", err)
216+
ctx.UpdateStatus(status.Failed, msg)
217+
logger.Errorf(msg)
208218
return err
209219
}
210220
}

filebeat/input/journald/input_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
3838
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3939
"github.com/elastic/beats/v7/libbeat/beat"
40+
"github.com/elastic/beats/v7/libbeat/management/status"
4041
"github.com/elastic/elastic-agent-libs/logp"
4142
"github.com/elastic/elastic-agent-libs/mapstr"
4243
)
@@ -330,6 +331,33 @@ func TestReaderAdapterCanHandleNonStringFields(t *testing.T) {
330331
}
331332
}
332333

334+
func TestInputCanReportStatus(t *testing.T) {
335+
out := decompress(t, filepath.Join("testdata", "multiple-boots.journal.gz"))
336+
337+
env := newInputTestingEnvironment(t)
338+
cfg := mapstr.M{
339+
"paths": []string{out},
340+
}
341+
inp := env.mustCreateInput(cfg)
342+
343+
ctx, cancelInput := context.WithCancel(context.Background())
344+
t.Cleanup(cancelInput)
345+
346+
env.startInput(ctx, inp)
347+
env.waitUntilEventCount(6)
348+
349+
env.RequireStatuses([]statusUpdate{
350+
{
351+
state: status.Starting,
352+
msg: "Starting",
353+
},
354+
{
355+
state: status.Running,
356+
msg: "Running",
357+
},
358+
})
359+
}
360+
333361
func decompress(t *testing.T, namegz string) string {
334362
t.Helper()
335363

filebeat/input/v2/input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type Context struct {
9797

9898
func (c Context) UpdateStatus(status status.Status, msg string) {
9999
if c.StatusReporter != nil {
100+
c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg)
100101
c.StatusReporter.UpdateStatus(status, msg)
101102
}
102103
}

0 commit comments

Comments
 (0)