From 226b5ffda7264c11c69ea95ac7ec335483b70832 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 May 2024 15:18:50 +0200 Subject: [PATCH 1/5] Attempt to restart docker target for loki source at a fixed interval to support containers restart --- CHANGELOG.md | 2 + .../component/loki/source/docker/docker.go | 7 ++-- .../docker/internal/dockertarget/target.go | 2 - .../component/loki/source/docker/runner.go | 40 ++++++++++--------- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2df9337a99..a9fa7f5b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -79,6 +79,8 @@ v1.1.0-rc.0 - Fix a bug where custom components would not shadow the stdlib. If you have a module whose name conflicts with an stdlib function and if you use this exact function in your config, then you will need to rename your module. (@wildum) +- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum) + ### Other changes - Update `alloy-mixin` to use more specific alert group names (for example, diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index b7d06b91b8..12b636c647 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { } return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, + client: client, + handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), + positions: c.posFile, + taskRestartInterval: 5 * time.Second, }, nil } diff --git a/internal/component/loki/source/docker/internal/dockertarget/target.go b/internal/component/loki/source/docker/internal/dockertarget/target.go index b30241ab7d..02d48b31c9 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target.go +++ b/internal/component/loki/source/docker/internal/dockertarget/target.go @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() { ctx, cancel := context.WithCancel(context.Background()) t.cancel = cancel go t.processLoop(ctx) - } else { - level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName) } } diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go index 2afe48adef..984da73fe8 100644 --- a/internal/component/loki/source/docker/runner.go +++ b/internal/component/loki/source/docker/runner.go @@ -5,8 +5,8 @@ package docker import ( "context" "sync" + "time" - "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/go-kit/log" "github.com/grafana/alloy/internal/alloy/logging/level" @@ -52,6 +52,9 @@ type options struct { // positions interface so tailers can save/restore offsets in log files. positions positions.Positions + + // taskRestartInterval to restart task that has stopped running. + taskRestartInterval time.Duration } // tailerTask is the payload used to create tailers. It implements runner.Task. @@ -95,23 +98,24 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } func (t *tailer) Run(ctx context.Context) { - ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) - - t.target.StartIfNotRunning() - - select { - case err := <-chErr: - // Error setting up the Wait request from the client; either failed to - // read from /containers/{containerID}/wait, or couldn't parse the - // response. Stop the target and exit the task after logging; if it was - // a transient error, the target will be retried on the next discovery - // refresh. - level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) - t.target.Stop() - return - case <-ch: - t.target.Stop() - return + ticker := time.NewTicker(t.opts.taskRestartInterval) + tickerC := ticker.C + + for { + select { + case <-tickerC: + res, err := t.opts.client.ContainerInspect(ctx, t.target.Name()) + if err != nil { + level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err) + continue + } + if res.State.Running { + t.target.StartIfNotRunning() + } + case <-ctx.Done(): + ticker.Stop() + return + } } } From 4651df356377fb54caad226fc860699498018e45 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 May 2024 17:47:28 +0200 Subject: [PATCH 2/5] add test --- .../loki/source/docker/docker_test.go | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index e020bcda09..97a3b21f3b 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -4,14 +4,27 @@ package docker import ( "context" + "io" + "os" + "strings" "testing" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/go-kit/log" "github.com/grafana/alloy/internal/alloy/componenttest" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/loki/client/fake" + "github.com/grafana/alloy/internal/component/common/loki/positions" + dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -73,3 +86,93 @@ func TestDuplicateTargets(t *testing.T) { require.Len(t, cmp.manager.tasks, 1) } + +func TestRestart(t *testing.T) { + runningState := true + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + go tailer.Run(context.Background()) + + // The container is already running, expect log lines. + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") + + // Stops the container. + runningState = false + time.Sleep(30 * time.Millisecond) // Sleep for a duration greater than the interval to make sure it stops sending log lines. + entryHandler.Clear() + time.Sleep(30 * time.Millisecond) + assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. + + // Restart the container and expect log lines. + runningState = true + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") +} + +func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + entryHandler = fake.NewClient(func() {}) + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + tgt, err := dt.NewTarget( + dt.NewMetrics(prometheus.NewRegistry()), + logger, + entryHandler, + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + ) + require.NoError(t, err) + tailerTask := &tailerTask{ + options: &options{ + client: client, + taskRestartInterval: 20 * time.Millisecond, + }, + target: tgt, + } + return newTailer(logger, tailerTask), entryHandler +} + +type clientMock struct { + client.APIClient + logLine string + running func() bool +} + +func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: c, + State: &types.ContainerState{ + Running: mock.running(), + }, + }, + Config: &container.Config{Tty: true}, + }, nil +} + +func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(mock.logLine)), nil +} From fc4b93a05a3eee9e67bfdc243332cb863c9d8811 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Tue, 7 May 2024 14:42:10 +0200 Subject: [PATCH 3/5] add missing target cleanup --- internal/component/loki/source/docker/runner.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go index 984da73fe8..e3256bbc12 100644 --- a/internal/component/loki/source/docker/runner.go +++ b/internal/component/loki/source/docker/runner.go @@ -113,6 +113,7 @@ func (t *tailer) Run(ctx context.Context) { t.target.StartIfNotRunning() } case <-ctx.Done(): + t.target.Stop() ticker.Stop() return } From 585bf5bf753cd69ccda7e67dad35bc27b2e511ac Mon Sep 17 00:00:00 2001 From: William Dumont Date: Tue, 7 May 2024 14:43:12 +0200 Subject: [PATCH 4/5] improve test readability --- internal/component/loki/source/docker/docker_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index 97a3b21f3b..d981a45535 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -28,6 +28,8 @@ import ( "github.com/stretchr/testify/require" ) +const taskRestartInterval = 20 * time.Millisecond + func Test(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` @@ -108,9 +110,9 @@ func TestRestart(t *testing.T) { // Stops the container. runningState = false - time.Sleep(30 * time.Millisecond) // Sleep for a duration greater than the interval to make sure it stops sending log lines. + time.Sleep(taskRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than taskRestartInterval to make sure it stops sending log lines. entryHandler.Clear() - time.Sleep(30 * time.Millisecond) + time.Sleep(taskRestartInterval + 10*time.Millisecond) assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. // Restart the container and expect log lines. @@ -148,7 +150,7 @@ func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler tailerTask := &tailerTask{ options: &options{ client: client, - taskRestartInterval: 20 * time.Millisecond, + taskRestartInterval: taskRestartInterval, }, target: tgt, } From ddd83328d9174f1036b713b802733ca0c5dd29e0 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Tue, 7 May 2024 16:52:24 +0200 Subject: [PATCH 5/5] rename config option --- internal/component/loki/source/docker/docker.go | 8 ++++---- internal/component/loki/source/docker/docker_test.go | 10 +++++----- internal/component/loki/source/docker/runner.go | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 12b636c647..a64b142d8f 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -305,10 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { } return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, - taskRestartInterval: 5 * time.Second, + client: client, + handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), + positions: c.posFile, + targetRestartInterval: 5 * time.Second, }, nil } diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index d981a45535..96d847a66c 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -28,7 +28,7 @@ import ( "github.com/stretchr/testify/require" ) -const taskRestartInterval = 20 * time.Millisecond +const targetRestartInterval = 20 * time.Millisecond func Test(t *testing.T) { // Use host that works on all platforms (including Windows). @@ -110,9 +110,9 @@ func TestRestart(t *testing.T) { // Stops the container. runningState = false - time.Sleep(taskRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than taskRestartInterval to make sure it stops sending log lines. + time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. entryHandler.Clear() - time.Sleep(taskRestartInterval + 10*time.Millisecond) + time.Sleep(targetRestartInterval + 10*time.Millisecond) assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. // Restart the container and expect log lines. @@ -149,8 +149,8 @@ func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler require.NoError(t, err) tailerTask := &tailerTask{ options: &options{ - client: client, - taskRestartInterval: taskRestartInterval, + client: client, + targetRestartInterval: targetRestartInterval, }, target: tgt, } diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go index e3256bbc12..7f3dfdd0c8 100644 --- a/internal/component/loki/source/docker/runner.go +++ b/internal/component/loki/source/docker/runner.go @@ -53,8 +53,8 @@ type options struct { // positions interface so tailers can save/restore offsets in log files. positions positions.Positions - // taskRestartInterval to restart task that has stopped running. - taskRestartInterval time.Duration + // targetRestartInterval to restart task that has stopped running. + targetRestartInterval time.Duration } // tailerTask is the payload used to create tailers. It implements runner.Task. @@ -98,7 +98,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } func (t *tailer) Run(ctx context.Context) { - ticker := time.NewTicker(t.opts.taskRestartInterval) + ticker := time.NewTicker(t.opts.targetRestartInterval) tickerC := ticker.C for {