diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 4f849a84e367..51db8c8db307 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -110,6 +110,7 @@ func (m *Manager) poll(ctx context.Context) { if err != nil { m.Debugf("finding files: %v", err) } + m.Debugf("matched files", zap.Strings("paths", matches)) for len(matches) > m.maxBatchFiles { m.consume(ctx, matches[:m.maxBatchFiles]) diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 069ab0fbb043..48f07f1a5e15 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -17,9 +17,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/featuregate" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" @@ -1014,12 +1011,9 @@ func TestFileBatching(t *testing.T) { files := 100 linesPerFile := 10 maxConcurrentFiles := 20 - maxBatchFiles := maxConcurrentFiles / 2 // Explicitly setting maxBatches to ensure a value of 0 does not enforce a limit maxBatches := 0 - expectedBatches := files / maxBatchFiles // assumes no remainder - tempDir := t.TempDir() cfg := NewConfig().includeDir(tempDir) cfg.StartAt = "beginning" @@ -1029,9 +1023,6 @@ func TestFileBatching(t *testing.T) { operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewUnscopedMockPersister() - core, observedLogs := observer.New(zap.DebugLevel) - operator.SugaredLogger = zap.New(core).Sugar() - temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { temps = append(temps, openTemp(t, tempDir)) @@ -1054,23 +1045,6 @@ func TestFileBatching(t *testing.T) { actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, len(expectedTokens))...) require.ElementsMatch(t, expectedTokens, actualTokens) - // During the first poll, we expect one log per batch and one log per file - require.Equal(t, files+expectedBatches, observedLogs.Len()) - logNum := 0 - for b := 0; b < expectedBatches; b++ { - log := observedLogs.All()[logNum] - require.Equal(t, "Consuming files", log.Message) - require.Equal(t, zapcore.DebugLevel, log.Level) - logNum++ - - for f := 0; f < maxBatchFiles; f++ { - log = observedLogs.All()[logNum] - require.Equal(t, "Started watching file", log.Message) - require.Equal(t, zapcore.InfoLevel, log.Level) - logNum++ - } - } - // Write more logs to each file so we can validate that all files are still known expectedTokens = make([][]byte, 0, files*linesPerFile) for i, temp := range temps { @@ -1087,15 +1061,6 @@ func TestFileBatching(t *testing.T) { actualTokens = make([][]byte, 0, files*linesPerFile) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, len(expectedTokens))...) require.ElementsMatch(t, expectedTokens, actualTokens) - - // During the second poll, we only expect one log per batch - require.Equal(t, files+expectedBatches*2, observedLogs.Len()) - for b := logNum; b < observedLogs.Len(); b++ { - log := observedLogs.All()[logNum] - require.Equal(t, "Consuming files", log.Message) - require.Equal(t, zapcore.DebugLevel, log.Level) - logNum++ - } } func TestFileBatchingRespectsStartAtEnd(t *testing.T) { @@ -1454,7 +1419,6 @@ func TestMaxBatching(t *testing.T) { maxBatchFiles := maxConcurrentFiles / 2 maxBatches := 2 - expectedBatches := maxBatches expectedMaxFilesPerPoll := maxBatches * maxBatchFiles tempDir := t.TempDir() @@ -1466,9 +1430,6 @@ func TestMaxBatching(t *testing.T) { operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls)) operator.persister = testutil.NewUnscopedMockPersister() - core, observedLogs := observer.New(zap.DebugLevel) - operator.SugaredLogger = zap.New(core).Sugar() - temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { temps = append(temps, openTemp(t, tempDir)) @@ -1490,23 +1451,6 @@ func TestMaxBatching(t *testing.T) { actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, numExpectedTokens)...) require.Len(t, actualTokens, numExpectedTokens) - // During the first poll, we expect one log per batch and one log per file - require.Equal(t, expectedMaxFilesPerPoll+expectedBatches, observedLogs.Len()) - logNum := 0 - for b := 0; b < expectedBatches; b++ { - log := observedLogs.All()[logNum] - require.Equal(t, "Consuming files", log.Message) - require.Equal(t, zapcore.DebugLevel, log.Level) - logNum++ - - for f := 0; f < maxBatchFiles; f++ { - log = observedLogs.All()[logNum] - require.Equal(t, "Started watching file", log.Message) - require.Equal(t, zapcore.InfoLevel, log.Level) - logNum++ - } - } - // Write more logs to each file so we can validate that all files are still known for i, temp := range temps { for j := 0; j < linesPerFile; j++ { @@ -1521,15 +1465,6 @@ func TestMaxBatching(t *testing.T) { actualTokens = make([][]byte, 0, numExpectedTokens) actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, numExpectedTokens)...) require.Len(t, actualTokens, numExpectedTokens) - - // During the second poll, we only expect one log per batch - require.Equal(t, expectedMaxFilesPerPoll+expectedBatches*2, observedLogs.Len()) - for b := logNum; b < observedLogs.Len(); b++ { - log := observedLogs.All()[logNum] - require.Equal(t, "Consuming files", log.Message) - require.Equal(t, zapcore.DebugLevel, log.Level) - logNum++ - } } // TestReadExistingLogsWithHeader tests that, when starting from beginning, we