Skip to content

Commit e3e2332

Browse files
AndersonQbelimawrjlind23
authored
Filestream returns error when an input with duplicated ID is created (#41954)
* filestream: do not run input with duplicated ID * wip * wip * Restore configuration files/remove test files * Move reload retry logic to reload.go * Fix isReloadable and improve tests * Add changelog * remove comments * Fix build * Update reloadInputs to use the new error types from cfgfile/list.go * Fix more tests * Add `allow_deprecated_id_duplication` flag * Add non reloadable logic to autodiscover * Add test to autodiscover not reloading common.ErrNonReloadable * Add test for common.IsInputReloadable * Update changelog * address lint warnings * Update notice to 2025 * Implement review suggestions * Fix OTel API * Fix flakiness on TestFilestreamMetadataUpdatedOnRename For some reason this test became flaky, the root of the flakiness is not on the test, it is on how a rename operation is detected. Even though this test uses `os.Rename`, it does not seem to be an atomic operation. https://www.man7.org/linux/man-pages/man2/rename.2.html does not make it clear whether 'renameat' (used by `os.Rename`) is atomic. On a flaky execution, the file is actually perceived as removed and then a new file is created, both with the same inode. This happens on a system that does not reuse inodes as soon they're freed. Because the file is detected as removed, it's state is also removed. Then when more data is added, only the offset of the new data is tracked by the registry, causing the test to fail. A workaround for this is to not remove the state when the file is removed, hence `clean_removed: false` is set in the test config. --------- Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co> Co-authored-by: Julien Lind <julien.lind@elastic.co>
1 parent ef29005 commit e3e2332

File tree

20 files changed

+530
-56
lines changed

20 files changed

+530
-56
lines changed

CHANGELOG.next.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
5353
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
5454
- Add kafka compression support for ZSTD.
5555
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
56+
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
57+
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
5658
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]
59+
5760
*Heartbeat*
5861

5962

filebeat/beater/crawler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ func (c *crawler) Start(
7979
}
8080

8181
if configInputs.Enabled() {
82-
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
82+
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs)
8383
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
8484
return fmt.Errorf("creating input reloader failed: %w", err)
8585
}
8686
}
8787

8888
if configModules.Enabled() {
89-
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
89+
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules)
9090
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
9191
return fmt.Errorf("creating module reloader failed: %w", err)
9292
}

filebeat/beater/filebeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
224224
newPath := strings.TrimSuffix(origPath, ".yml")
225225
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
226226
}
227-
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
227+
modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
228228
modulesLoader.Load(modulesFactory)
229229
}
230230

filebeat/docs/inputs/input-filestream-file-options.asciidoc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,27 @@ supported.
1919
===== `id`
2020

2121
A unique identifier for this filestream input. Each filestream input
22-
must have a unique ID.
22+
must have a unique ID. Filestream will not start inputs with duplicated IDs.
2323

2424
WARNING: Changing input ID may cause data duplication because the
2525
state of the files will be lost and they will be read from the
2626
beginning again.
2727

28+
[float]
29+
[[filestream-input-allow_deprecated_id_duplication]]
30+
===== `allow_deprecated_id_duplication`
31+
32+
This allows {beatname_uc} to run multiple instances of the {type}
33+
input with the same ID. This is intended to add backwards
34+
compatibility with the behaviour prior to 9.0. It defaults to `false`
35+
and is **not recommended** in new configurations.
36+
37+
This setting is per input, so make sure to enable it in all {type}
38+
inputs that use duplicated IDs.
39+
40+
WARNING: Duplicated IDs will lead to data duplication and some input
41+
instances will not produce any metrics.
42+
2843
[float]
2944
[[filestream-input-paths]]
3045
===== `paths`

filebeat/input/filestream/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type config struct {
4949
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
5050
Rotation *conf.Namespace `config:"rotation"`
5151
TakeOver bool `config:"take_over"`
52+
53+
// AllowIDDuplication is used by InputManager.Create
54+
// (see internal/input-logfile/manager.go).
55+
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
5256
}
5357

5458
type closerConfig struct {
@@ -142,6 +146,13 @@ func (c *config) Validate() error {
142146
return fmt.Errorf("no path is configured")
143147
}
144148

149+
if c.AllowIDDuplication {
150+
logp.L().Named("input.filestream").Warn(
151+
"setting `allow_deprecated_id_duplication` will lead to data " +
152+
"duplication and incomplete input metrics, it's use is " +
153+
"highly discouraged.")
154+
}
155+
145156
return nil
146157
}
147158

filebeat/input/filestream/input_integration_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
101101
"prospector.scanner.check_interval": "1ms",
102102
"prospector.scanner.fingerprint.enabled": false,
103103
"file_identity.native": map[string]any{},
104+
// For some reason this test became flaky, the root of the flakiness
105+
// is not on the test, it is on how a rename operation is detected.
106+
// Even though this test uses `os.Rename`, it does not seem to be an atomic
107+
// operation. https://www.man7.org/linux/man-pages/man2/rename.2.html
108+
// does not make it clear whether 'renameat' (used by `os.Rename`) is
109+
// atomic.
110+
//
111+
// On a flaky execution, the file is actually perceived as removed
112+
// and then a new file is created, both with the same inode. This
113+
// happens on a system that does not reuse inodes as soon they're
114+
// freed. Because the file is detected as removed, it's state is also
115+
// removed. Then when more data is added, only the offset of the new
116+
// data is tracked by the registry, causing the test to fail.
117+
//
118+
// A workaround for this is to not remove the state when the file is
119+
// removed, hence `clean_removed: false` is set here.
120+
"clean_removed": false,
104121
})
105122

106123
testline := []byte("log line\n")

filebeat/input/filestream/internal/input-logfile/manager.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"github.com/elastic/beats/v7/libbeat/common"
2829
"github.com/elastic/go-concert/unison"
2930

3031
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
@@ -155,26 +156,54 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
155156
}
156157

157158
settings := struct {
158-
ID string `config:"id"`
159-
CleanInactive time.Duration `config:"clean_inactive"`
160-
HarvesterLimit uint64 `config:"harvester_limit"`
159+
ID string `config:"id"`
160+
CleanInactive time.Duration `config:"clean_inactive"`
161+
HarvesterLimit uint64 `config:"harvester_limit"`
162+
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
161163
}{CleanInactive: cim.DefaultCleanTimeout}
162164
if err := config.Unpack(&settings); err != nil {
163165
return nil, err
164166
}
165167

166168
if settings.ID == "" {
167-
cim.Logger.Error("filestream input ID without ID might lead to data" +
168-
" duplication, please add an ID and restart Filebeat")
169+
cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat")
169170
}
170171

171172
metricsID := settings.ID
172173
cim.idsMux.Lock()
173174
if _, exists := cim.ids[settings.ID]; exists {
174-
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
175-
"will lead to data duplication, please use a different ID. Metrics "+
176-
"collection has been disabled on this input.", settings.ID)
177-
metricsID = ""
175+
duplicatedInput := map[string]any{}
176+
unpackErr := config.Unpack(&duplicatedInput)
177+
if unpackErr != nil {
178+
duplicatedInput["error"] = fmt.Errorf("failed to unpack duplicated input config: %w", unpackErr).Error()
179+
}
180+
181+
// Keep old behaviour so users can upgrade to 9.0 without
182+
// having their inputs not starting.
183+
if settings.AllowIDDuplication {
184+
cim.Logger.Errorf("filestream input with ID '%s' already exists, "+
185+
"this will lead to data duplication, please use a different "+
186+
"ID. Metrics collection has been disabled on this input. The "+
187+
" input will start only because "+
188+
"'allow_deprecated_id_duplication' is set to true",
189+
settings.ID)
190+
metricsID = ""
191+
} else {
192+
cim.Logger.Errorw(
193+
fmt.Sprintf(
194+
"filestream input ID '%s' is duplicated: input will NOT start",
195+
settings.ID,
196+
),
197+
"input.cfg", conf.DebugString(config, true))
198+
199+
cim.idsMux.Unlock()
200+
return nil, &common.ErrNonReloadable{
201+
Err: fmt.Errorf(
202+
"filestream input with ID '%s' already exists, this "+
203+
"will lead to data duplication, please use a different ID",
204+
settings.ID,
205+
)}
206+
}
178207
}
179208

180209
// TODO: improve how inputs with empty IDs are tracked.

filebeat/input/filestream/internal/input-logfile/manager_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ package input_logfile
1919

2020
import (
2121
"bytes"
22+
"fmt"
23+
"sync"
2224
"testing"
2325

2426
"github.com/stretchr/testify/assert"
2527
"github.com/stretchr/testify/require"
2628
"go.uber.org/zap"
2729
"go.uber.org/zap/zapcore"
2830

31+
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
2932
"github.com/elastic/beats/v7/libbeat/statestore"
3033
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
3134
"github.com/elastic/elastic-agent-libs/config"
@@ -42,6 +45,18 @@ func (s *testSource) Name() string {
4245
return s.name
4346
}
4447

48+
type noopProspector struct{}
49+
50+
func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error {
51+
return nil
52+
}
53+
54+
func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}
55+
56+
func (m noopProspector) Test() error {
57+
return nil
58+
}
59+
4560
func TestSourceIdentifier_ID(t *testing.T) {
4661
testCases := map[string]struct {
4762
userID string
@@ -198,6 +213,115 @@ func TestInputManager_Create(t *testing.T) {
198213
assert.NotContains(t, buff.String(),
199214
"already exists")
200215
})
216+
217+
t.Run("does not start an input with duplicated ID", func(t *testing.T) {
218+
tcs := []struct {
219+
name string
220+
id string
221+
}{
222+
{name: "ID is empty", id: ""},
223+
{name: "non-empty ID", id: "non-empty-ID"},
224+
}
225+
226+
for _, tc := range tcs {
227+
t.Run(tc.name, func(t *testing.T) {
228+
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
229+
testStore, err := storeReg.Get("test")
230+
require.NoError(t, err)
231+
232+
log, buff := newBufferLogger()
233+
234+
cim := &InputManager{
235+
Logger: log,
236+
StateStore: testStateStore{Store: testStore},
237+
Configure: func(_ *config.C) (Prospector, Harvester, error) {
238+
var wg sync.WaitGroup
239+
240+
return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
241+
}}
242+
cfg1 := config.MustNewConfigFrom(fmt.Sprintf(`
243+
type: filestream
244+
id: %s
245+
paths:
246+
- /var/log/foo
247+
`, tc.id))
248+
249+
// Create a different 2nd config with duplicated ID to ensure
250+
// the ID itself is the only requirement to prevent the 2nd input
251+
// from being created.
252+
cfg2 := config.MustNewConfigFrom(fmt.Sprintf(`
253+
type: filestream
254+
id: %s
255+
paths:
256+
- /var/log/bar
257+
`, tc.id))
258+
259+
_, err = cim.Create(cfg1)
260+
require.NoError(t, err, "1st input should have been created")
261+
262+
// Attempt to create an input with a duplicated ID
263+
_, err = cim.Create(cfg2)
264+
require.Error(t, err, "filestream should not have created an input with a duplicated ID")
265+
266+
logs := buff.String()
267+
// Assert the logs contain the correct log message
268+
assert.Contains(t, logs,
269+
fmt.Sprintf("filestream input ID '%s' is duplicated:", tc.id))
270+
271+
// Assert the error contains the correct text
272+
assert.Contains(t, err.Error(),
273+
fmt.Sprintf("filestream input with ID '%s' already exists", tc.id))
274+
})
275+
}
276+
})
277+
278+
t.Run("allow duplicated IDs setting", func(t *testing.T) {
279+
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
280+
testStore, err := storeReg.Get("test")
281+
require.NoError(t, err)
282+
283+
log, buff := newBufferLogger()
284+
285+
cim := &InputManager{
286+
Logger: log,
287+
StateStore: testStateStore{Store: testStore},
288+
Configure: func(_ *config.C) (Prospector, Harvester, error) {
289+
var wg sync.WaitGroup
290+
291+
return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
292+
}}
293+
cfg1 := config.MustNewConfigFrom(`
294+
type: filestream
295+
id: duplicated-id
296+
allow_deprecated_id_duplication: true
297+
paths:
298+
- /var/log/foo
299+
`)
300+
301+
// Create a different 2nd config with duplicated ID to ensure
302+
// the ID itself is the only requirement to prevent the 2nd input
303+
// from being created.
304+
cfg2 := config.MustNewConfigFrom(`
305+
type: filestream
306+
id: duplicated-id
307+
allow_deprecated_id_duplication: true
308+
paths:
309+
- /var/log/bar
310+
`)
311+
_, err = cim.Create(cfg1)
312+
require.NoError(t, err, "1st input should have been created")
313+
// Create an input with a duplicated ID
314+
_, err = cim.Create(cfg2)
315+
require.NoError(t, err, "filestream should not have created an input with a duplicated ID")
316+
317+
logs := buff.String()
318+
// Assert the logs contain the correct log message
319+
assert.Contains(t, logs,
320+
"filestream input with ID 'duplicated-id' already exists, this "+
321+
"will lead to data duplication, please use a different ID. Metrics "+
322+
"collection has been disabled on this input.",
323+
"did not find the expected message about the duplicated input ID")
324+
})
201325
}
202326

203327
func newBufferLogger() (*logp.Logger, *bytes.Buffer) {

filebeat/tests/integration/filestream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ logging:
274274
filebeat.WaitForLogs(
275275
"Input 'filestream' starting",
276276
10*time.Second,
277-
"Filebeat did log a validation error")
277+
"Filebeat did not log a validation error")
278278
}
279279

280280
func TestFilestreamCanMigrateIdentity(t *testing.T) {

heartbeat/beater/heartbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
184184
}
185185

186186
if bt.config.ConfigMonitors.Enabled() {
187-
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
187+
bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors)
188188
defer bt.monitorReloader.Stop()
189189

190190
err := bt.RunReloadableMonitors()

libbeat/autodiscover/autodiscover.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/elastic/beats/v7/libbeat/autodiscover/meta"
2525
"github.com/elastic/beats/v7/libbeat/beat"
2626
"github.com/elastic/beats/v7/libbeat/cfgfile"
27+
"github.com/elastic/beats/v7/libbeat/common"
2728
"github.com/elastic/beats/v7/libbeat/common/reload"
2829
"github.com/elastic/elastic-agent-autodiscover/bus"
2930
conf "github.com/elastic/elastic-agent-libs/config"
@@ -169,7 +170,10 @@ func (a *Autodiscover) worker() {
169170
updated = false
170171

171172
// On error, make sure the next run also updates because some runners were not properly loaded
172-
retry = err != nil
173+
retry = common.IsInputReloadable(err)
174+
if err != nil && !retry {
175+
a.logger.Errorw("all new inputs failed to start with a non-retriable error", err)
176+
}
173177
if retry {
174178
// The recoverable errors that can lead to retry are related
175179
// to the harvester state, so we need to give the publishing

0 commit comments

Comments
 (0)