Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for docker containers restart in loki.source.docker #742

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ v1.1.0-rc.0
- Fix a bug where custom components would not shadow the stdlib. If you have a module whose name conflicts with an stdlib function
and if you use this exact function in your config, then you will need to rename your module. (@wildum)

- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum)

### Other changes

- Update `alloy-mixin` to use more specific alert group names (for example,
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/source/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) {
}

return &options{
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
client: client,
handler: loki.NewEntryHandler(c.handler.Chan(), func() {}),
positions: c.posFile,
targetRestartInterval: 5 * time.Second,
}, nil
}

Expand Down
105 changes: 105 additions & 0 deletions internal/component/loki/source/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,32 @@ package docker

import (
"context"
"io"
"os"
"strings"
"testing"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/componenttest"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki/client/fake"
"github.com/grafana/alloy/internal/component/common/loki/positions"
dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const targetRestartInterval = 20 * time.Millisecond

func Test(t *testing.T) {
// Use host that works on all platforms (including Windows).
var cfg = `
Expand Down Expand Up @@ -73,3 +88,93 @@ func TestDuplicateTargets(t *testing.T) {

require.Len(t, cmp.manager.tasks, 1)
}

func TestRestart(t *testing.T) {
runningState := true
client := clientMock{
logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor",
running: func() bool { return runningState },
}
expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor"

tailer, entryHandler := setupTailer(t, client)
go tailer.Run(context.Background())

// The container is already running, expect log lines.
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.")

// Stops the container.
runningState = false
time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines.
entryHandler.Clear()
time.Sleep(targetRestartInterval + 10*time.Millisecond)
assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running.

// Restart the container and expect log lines.
runningState = true
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logLines := entryHandler.Received()
if assert.NotEmpty(c, logLines) {
assert.Equal(c, expectedLogLine, logLines[0].Line)
}
}, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.")
}

func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
entryHandler = fake.NewClient(func() {})

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: t.TempDir() + "/positions.yml",
})
require.NoError(t, err)

tgt, err := dt.NewTarget(
dt.NewMetrics(prometheus.NewRegistry()),
logger,
entryHandler,
ps,
"flog",
model.LabelSet{"job": "docker"},
[]*relabel.Config{},
client,
)
require.NoError(t, err)
tailerTask := &tailerTask{
options: &options{
client: client,
targetRestartInterval: targetRestartInterval,
},
target: tgt,
}
return newTailer(logger, tailerTask), entryHandler
}

type clientMock struct {
client.APIClient
logLine string
running func() bool
}

func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: c,
State: &types.ContainerState{
Running: mock.running(),
},
},
Config: &container.Config{Tty: true},
}, nil
}

func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(mock.logLine)), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() {
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
go t.processLoop(ctx)
} else {
level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName)
wildum marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
41 changes: 23 additions & 18 deletions internal/component/loki/source/docker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package docker
import (
"context"
"sync"
"time"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
Expand Down Expand Up @@ -52,6 +52,9 @@ type options struct {

// positions interface so tailers can save/restore offsets in log files.
positions positions.Positions

// targetRestartInterval to restart task that has stopped running.
targetRestartInterval time.Duration
}

// tailerTask is the payload used to create tailers. It implements runner.Task.
Expand Down Expand Up @@ -95,23 +98,25 @@ func newTailer(l log.Logger, task *tailerTask) *tailer {
}

func (t *tailer) Run(ctx context.Context) {
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)
wildum marked this conversation as resolved.
Show resolved Hide resolved

t.target.StartIfNotRunning()

select {
case err := <-chErr:
// Error setting up the Wait request from the client; either failed to
// read from /containers/{containerID}/wait, or couldn't parse the
// response. Stop the target and exit the task after logging; if it was
// a transient error, the target will be retried on the next discovery
// refresh.
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
t.target.Stop()
return
case <-ch:
t.target.Stop()
return
ticker := time.NewTicker(t.opts.targetRestartInterval)
tickerC := ticker.C

for {
select {
case <-tickerC:
res, err := t.opts.client.ContainerInspect(ctx, t.target.Name())
if err != nil {
level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err)
continue
}
if res.State.Running {
t.target.StartIfNotRunning()
}
case <-ctx.Done():
t.target.Stop()
ticker.Stop()
return
}
}
}

Expand Down
Loading