From 065b592e156c2d32c2e747d1b258701c7a5fc667 Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Tue, 2 Dec 2025 11:39:21 +0100 Subject: [PATCH 1/3] fix compactor file descriptor leak --- .../shipper/indexshipper/tsdb/compactor.go | 29 ++- .../tsdb/compactor_fd_leak_test.go | 242 ++++++++++++++++++ 2 files changed, 258 insertions(+), 13 deletions(-) create mode 100644 pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index ebff69810647c..0abb075e42c5e 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -229,27 +229,30 @@ func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndex return nil, err } - defer func() { - if err := os.Remove(path); err != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", err) - } - }() - indexFile, err := OpenShippableTSDB(path) if err != nil { + // Clean up the downloaded file if we can't open it + if removeErr := os.Remove(path); removeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file after open failure", "err", removeErr) + } return nil, err } - defer func() { - if err := indexFile.Close(); err != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", err) - } - }() - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { - builder.AddSeries(lbls.Copy(), fp, chks) + builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) return false }, labels.MustNewMatcher(labels.MatchEqual, "", "")) + + // Close the file immediately after use to avoid keeping file descriptors open + if closeErr := indexFile.Close(); closeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", closeErr) + } + + // Remove the downloaded file immediately after closing + if removeErr := os.Remove(path); removeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) + } + if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go new file mode 100644 index 0000000000000..4be375328d33d --- /dev/null +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go @@ -0,0 +1,242 @@ +package tsdb + +import ( + "context" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" + "github.com/grafana/loki/v3/pkg/storage/config" +) + +// TestSetupBuilder_FileDescriptorLeak tests that setupBuilder properly closes +// files immediately after use, preventing file descriptor leaks when processing +// many source files. +// +// This test detects the leak by monitoring file descriptors DURING the execution +// of setupBuilder, not after it returns. The bug causes files to accumulate during +// the loop because defer statements keep them open until the function returns. +func TestSetupBuilder_FileDescriptorLeak(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("File descriptor monitoring requires Linux /proc filesystem") + } + + now := model.Now() + periodConfig := config.PeriodConfig{ + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, + Schema: "v12", + } + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + tableName := indexBkts[0] + + tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, "objects") + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) + + require.NoError(t, util.EnsureDirectory(objectStoragePath)) + require.NoError(t, util.EnsureDirectory(tablePathInStorage)) + require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) + + // Create many per-tenant index files to simulate the scenario where + // many files need to be processed + // We need enough files to make the leak visible + numFiles := 50 // Use enough files to detect the leak + indexFormat, err := periodConfig.TSDBFormat() + require.NoError(t, err) + + // Create per-tenant index files in the user's directory + userTablePath := filepath.Join(tablePathInStorage, "user1") + require.NoError(t, util.EnsureDirectory(userTablePath)) + + lbls := mustParseLabels(`{foo="bar"}`) + for i := 0; i < numFiles; i++ { + streams := []stream{ + buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), + } + setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) + } + + // Build the client and index set + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) + require.NoError(t, err) + + // Get initial file descriptor count + initialFDCount := getFDCount(t) + + // Channel to signal when setupBuilder starts and ends + started := make(chan struct{}) + done := make(chan struct{}) + maxFDCount := 0 + var maxFDCountMutex sync.Mutex + + // Monitor file descriptors in a separate goroutine + // We need to monitor DURING the execution to catch the leak + monitorDone := make(chan struct{}) + fdSamples := make([]int, 0, 1000) + go func() { + defer close(monitorDone) + <-started + for { + select { + case <-done: + return + default: + count := getFDCount(t) + fdSamples = append(fdSamples, count) + maxFDCountMutex.Lock() + if count > maxFDCount { + maxFDCount = count + } + maxFDCountMutex.Unlock() + time.Sleep(5 * time.Millisecond) // Check every 5ms for better detection + } + } + }() + + // Call setupBuilder in a goroutine so we can monitor during execution + ctx := context.Background() + var builder *Builder + var builderErr error + go func() { + close(started) + builder, builderErr = setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) + close(done) + }() + + // Wait for completion + <-done + <-monitorDone + + require.NoError(t, builderErr) + require.NotNil(t, builder) + + // Get final file descriptor count + finalFDCount := getFDCount(t) + + maxFDCountMutex.Lock() + peakFDCount := maxFDCount + maxFDCountMutex.Unlock() + + // Calculate the increase in file descriptors + // With the bug: file descriptors accumulate during the loop (peak will be high) + // With the fix: file descriptors are closed immediately (peak should be low) + fdIncrease := peakFDCount - initialFDCount + + // Analyze FD samples to see if there's a pattern of accumulation + var avgFDCount int + if len(fdSamples) > 0 { + sum := 0 + for _, sample := range fdSamples { + sum += sample + } + avgFDCount = sum / len(fdSamples) + } + + t.Logf("File descriptor stats: initial=%d, peak=%d, avg=%d, final=%d, increase=%d, numFiles=%d, samples=%d", + initialFDCount, peakFDCount, avgFDCount, finalFDCount, fdIncrease, numFiles, len(fdSamples)) + + // The key test: with the bug, file descriptors accumulate during the loop. + // Each file opened adds ~1-2 FDs (one for the file, possibly one for mmap). + // With 30 files, we'd expect to see at least 20+ FDs accumulated if the bug exists. + // With the fix, files are closed immediately, so peak should be much lower. + + // Calculate expected accumulation: each file might use 1-2 FDs + // With the bug, we'd see accumulation proportional to numFiles + // With the fix, we should see a constant overhead (maybe 5-10 FDs) + expectedLeakFDs := numFiles * 1 // At least 1 FD per file if leak exists + maxAllowedIncrease := 20 // Allow some overhead, but not proportional to numFiles + + if fdIncrease >= expectedLeakFDs/2 { + // If we see accumulation proportional to numFiles, it's a leak + t.Errorf("File descriptor leak detected! Peak increase: %d (expected leak: ~%d, allowed: %d). "+ + "This indicates files are not being closed immediately during processing. "+ + "With the fix, file descriptors should be closed right after use, not deferred until function return. "+ + "FD samples show accumulation pattern.", + fdIncrease, expectedLeakFDs, maxAllowedIncrease) + } + + // Verify the builder was populated correctly + require.NotNil(t, builder.streams) + require.Greater(t, len(builder.streams), 0, "Builder should have streams") +} + +// getFDCount returns the current number of open file descriptors +func getFDCount(t *testing.T) int { + if runtime.GOOS != "linux" { + return 0 + } + fds, err := os.ReadDir("/proc/self/fd") + if err != nil { + t.Fatalf("Failed to read /proc/self/fd: %v", err) + } + return len(fds) +} + +// TestSetupBuilder_ManyFiles verifies that setupBuilder can handle processing +// many files without running into resource exhaustion issues. +func TestSetupBuilder_ManyFiles(t *testing.T) { + now := model.Now() + periodConfig := config.PeriodConfig{ + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, + Schema: "v12", + } + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + tableName := indexBkts[0] + + tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, "objects") + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) + + require.NoError(t, util.EnsureDirectory(objectStoragePath)) + require.NoError(t, util.EnsureDirectory(tablePathInStorage)) + require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) + + // Create a large number of files + numFiles := 100 + indexFormat, err := periodConfig.TSDBFormat() + require.NoError(t, err) + + // Create per-tenant index files in the user's directory + userTablePath := filepath.Join(tablePathInStorage, "user1") + require.NoError(t, util.EnsureDirectory(userTablePath)) + + lbls := mustParseLabels(`{foo="bar"}`) + for i := 0; i < numFiles; i++ { + streams := []stream{ + buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), + } + setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) + } + + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) + require.NoError(t, err) + + // This should complete without errors even with many files + // because files are closed immediately after processing + ctx := context.Background() + builder, err := setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) + require.NoError(t, err) + require.NotNil(t, builder) + + // Verify builder has the expected data + builder.FinalizeChunks() + require.Greater(t, len(builder.streams), 0) +} From 58fd2510c19764cdc79fa4c9c5a29c25328f27fa Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Mon, 8 Dec 2025 09:50:31 +0100 Subject: [PATCH 2/3] processSourceIndex function --- .../shipper/indexshipper/tsdb/compactor.go | 68 +++++++++++-------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index 0abb075e42c5e..39c9e8a556bfa 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" shipperindex "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" tsdbindex "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -205,6 +206,42 @@ func (t *tableCompactor) CompactTable() error { return nil } +// processSourceIndex processes a single source index file by downloading it, +// reading its series, and adding them to the builder. The file is closed and removed +// using defer statements to ensure cleanup happens even if errors occur. +func processSourceIndex(ctx context.Context, sourceIndex storage.IndexFile, sourceIndexSet compactor.IndexSet, builder *Builder) error { + path, err := sourceIndexSet.GetSourceFile(sourceIndex) + if err != nil { + return err + } + + indexFile, err := OpenShippableTSDB(path) + if err != nil { + // Clean up the downloaded file if we can't open it + if removeErr := os.Remove(path); removeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file after open failure", "err", removeErr) + } + return err + } + + // Ensure file is closed and removed when this function returns + defer func() { + if closeErr := indexFile.Close(); closeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", closeErr) + } + if removeErr := os.Remove(path); removeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) + } + }() + + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { + builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) + return false + }, labels.MustNewMatcher(labels.MatchEqual, "", "")) + + return err +} + // setupBuilder creates a Builder for a single user. // It combines the users index from multiTenantIndexes and its existing compacted index(es) func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndexSet compactor.IndexSet, multiTenantIndexes []Index) (*Builder, error) { @@ -224,36 +261,7 @@ func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndex // download all the existing compacted indexes and add them to the builder for _, sourceIndex := range sourceIndexes { - path, err := sourceIndexSet.GetSourceFile(sourceIndex) - if err != nil { - return nil, err - } - - indexFile, err := OpenShippableTSDB(path) - if err != nil { - // Clean up the downloaded file if we can't open it - if removeErr := os.Remove(path); removeErr != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file after open failure", "err", removeErr) - } - return nil, err - } - - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { - builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) - return false - }, labels.MustNewMatcher(labels.MatchEqual, "", "")) - - // Close the file immediately after use to avoid keeping file descriptors open - if closeErr := indexFile.Close(); closeErr != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", closeErr) - } - - // Remove the downloaded file immediately after closing - if removeErr := os.Remove(path); removeErr != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) - } - - if err != nil { + if err := processSourceIndex(ctx, sourceIndex, sourceIndexSet, builder); err != nil { return nil, err } } From fe8f0690fd50220816ebbf5808cee965c9e7874e Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Tue, 9 Dec 2025 10:48:37 +0100 Subject: [PATCH 3/3] address feedback --- .../shipper/indexshipper/tsdb/compactor.go | 16 +- .../tsdb/compactor_fd_leak_test.go | 242 ------------------ .../indexshipper/tsdb/compactor_test.go | 56 ++++ 3 files changed, 64 insertions(+), 250 deletions(-) delete mode 100644 pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index 39c9e8a556bfa..b19b14397d92b 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -215,23 +215,23 @@ func processSourceIndex(ctx context.Context, sourceIndex storage.IndexFile, sour return err } - indexFile, err := OpenShippableTSDB(path) - if err != nil { - // Clean up the downloaded file if we can't open it + // Ensure the downloaded file is removed when this function returns + defer func() { if removeErr := os.Remove(path); removeErr != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file after open failure", "err", removeErr) + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) } + }() + + indexFile, err := OpenShippableTSDB(path) + if err != nil { return err } - // Ensure file is closed and removed when this function returns + // Ensure file is closed when this function returns defer func() { if closeErr := indexFile.Close(); closeErr != nil { level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", closeErr) } - if removeErr := os.Remove(path); removeErr != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) - } }() err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go deleted file mode 100644 index 4be375328d33d..0000000000000 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_fd_leak_test.go +++ /dev/null @@ -1,242 +0,0 @@ -package tsdb - -import ( - "context" - "os" - "path/filepath" - "runtime" - "sync" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" - "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" - "github.com/grafana/loki/v3/pkg/storage/config" -) - -// TestSetupBuilder_FileDescriptorLeak tests that setupBuilder properly closes -// files immediately after use, preventing file descriptor leaks when processing -// many source files. -// -// This test detects the leak by monitoring file descriptors DURING the execution -// of setupBuilder, not after it returns. The bug causes files to accumulate during -// the loop because defer statements keep them open until the function returns. -func TestSetupBuilder_FileDescriptorLeak(t *testing.T) { - if runtime.GOOS != "linux" { - t.Skip("File descriptor monitoring requires Linux /proc filesystem") - } - - now := model.Now() - periodConfig := config.PeriodConfig{ - IndexTables: config.IndexPeriodicTableConfig{ - PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, - Schema: "v12", - } - indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) - tableName := indexBkts[0] - - tempDir := t.TempDir() - objectStoragePath := filepath.Join(tempDir, "objects") - tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) - tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) - - require.NoError(t, util.EnsureDirectory(objectStoragePath)) - require.NoError(t, util.EnsureDirectory(tablePathInStorage)) - require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) - - // Create many per-tenant index files to simulate the scenario where - // many files need to be processed - // We need enough files to make the leak visible - numFiles := 50 // Use enough files to detect the leak - indexFormat, err := periodConfig.TSDBFormat() - require.NoError(t, err) - - // Create per-tenant index files in the user's directory - userTablePath := filepath.Join(tablePathInStorage, "user1") - require.NoError(t, util.EnsureDirectory(userTablePath)) - - lbls := mustParseLabels(`{foo="bar"}`) - for i := 0; i < numFiles; i++ { - streams := []stream{ - buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), - } - setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) - } - - // Build the client and index set - objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) - require.NoError(t, err) - - idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) - require.NoError(t, err) - - // Get initial file descriptor count - initialFDCount := getFDCount(t) - - // Channel to signal when setupBuilder starts and ends - started := make(chan struct{}) - done := make(chan struct{}) - maxFDCount := 0 - var maxFDCountMutex sync.Mutex - - // Monitor file descriptors in a separate goroutine - // We need to monitor DURING the execution to catch the leak - monitorDone := make(chan struct{}) - fdSamples := make([]int, 0, 1000) - go func() { - defer close(monitorDone) - <-started - for { - select { - case <-done: - return - default: - count := getFDCount(t) - fdSamples = append(fdSamples, count) - maxFDCountMutex.Lock() - if count > maxFDCount { - maxFDCount = count - } - maxFDCountMutex.Unlock() - time.Sleep(5 * time.Millisecond) // Check every 5ms for better detection - } - } - }() - - // Call setupBuilder in a goroutine so we can monitor during execution - ctx := context.Background() - var builder *Builder - var builderErr error - go func() { - close(started) - builder, builderErr = setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) - close(done) - }() - - // Wait for completion - <-done - <-monitorDone - - require.NoError(t, builderErr) - require.NotNil(t, builder) - - // Get final file descriptor count - finalFDCount := getFDCount(t) - - maxFDCountMutex.Lock() - peakFDCount := maxFDCount - maxFDCountMutex.Unlock() - - // Calculate the increase in file descriptors - // With the bug: file descriptors accumulate during the loop (peak will be high) - // With the fix: file descriptors are closed immediately (peak should be low) - fdIncrease := peakFDCount - initialFDCount - - // Analyze FD samples to see if there's a pattern of accumulation - var avgFDCount int - if len(fdSamples) > 0 { - sum := 0 - for _, sample := range fdSamples { - sum += sample - } - avgFDCount = sum / len(fdSamples) - } - - t.Logf("File descriptor stats: initial=%d, peak=%d, avg=%d, final=%d, increase=%d, numFiles=%d, samples=%d", - initialFDCount, peakFDCount, avgFDCount, finalFDCount, fdIncrease, numFiles, len(fdSamples)) - - // The key test: with the bug, file descriptors accumulate during the loop. - // Each file opened adds ~1-2 FDs (one for the file, possibly one for mmap). - // With 30 files, we'd expect to see at least 20+ FDs accumulated if the bug exists. - // With the fix, files are closed immediately, so peak should be much lower. - - // Calculate expected accumulation: each file might use 1-2 FDs - // With the bug, we'd see accumulation proportional to numFiles - // With the fix, we should see a constant overhead (maybe 5-10 FDs) - expectedLeakFDs := numFiles * 1 // At least 1 FD per file if leak exists - maxAllowedIncrease := 20 // Allow some overhead, but not proportional to numFiles - - if fdIncrease >= expectedLeakFDs/2 { - // If we see accumulation proportional to numFiles, it's a leak - t.Errorf("File descriptor leak detected! Peak increase: %d (expected leak: ~%d, allowed: %d). "+ - "This indicates files are not being closed immediately during processing. "+ - "With the fix, file descriptors should be closed right after use, not deferred until function return. "+ - "FD samples show accumulation pattern.", - fdIncrease, expectedLeakFDs, maxAllowedIncrease) - } - - // Verify the builder was populated correctly - require.NotNil(t, builder.streams) - require.Greater(t, len(builder.streams), 0, "Builder should have streams") -} - -// getFDCount returns the current number of open file descriptors -func getFDCount(t *testing.T) int { - if runtime.GOOS != "linux" { - return 0 - } - fds, err := os.ReadDir("/proc/self/fd") - if err != nil { - t.Fatalf("Failed to read /proc/self/fd: %v", err) - } - return len(fds) -} - -// TestSetupBuilder_ManyFiles verifies that setupBuilder can handle processing -// many files without running into resource exhaustion issues. -func TestSetupBuilder_ManyFiles(t *testing.T) { - now := model.Now() - periodConfig := config.PeriodConfig{ - IndexTables: config.IndexPeriodicTableConfig{ - PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, - Schema: "v12", - } - indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) - tableName := indexBkts[0] - - tempDir := t.TempDir() - objectStoragePath := filepath.Join(tempDir, "objects") - tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) - tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) - - require.NoError(t, util.EnsureDirectory(objectStoragePath)) - require.NoError(t, util.EnsureDirectory(tablePathInStorage)) - require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) - - // Create a large number of files - numFiles := 100 - indexFormat, err := periodConfig.TSDBFormat() - require.NoError(t, err) - - // Create per-tenant index files in the user's directory - userTablePath := filepath.Join(tablePathInStorage, "user1") - require.NoError(t, util.EnsureDirectory(userTablePath)) - - lbls := mustParseLabels(`{foo="bar"}`) - for i := 0; i < numFiles; i++ { - streams := []stream{ - buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), - } - setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) - } - - objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) - require.NoError(t, err) - - idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) - require.NoError(t, err) - - // This should complete without errors even with many files - // because files are closed immediately after processing - ctx := context.Background() - builder, err := setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) - require.NoError(t, err) - require.NotNil(t, builder) - - // Verify builder has the expected data - builder.FinalizeChunks() - require.Greater(t, len(builder.streams), 0) -} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index 99804831deb81..ded9344c9353a 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -997,3 +997,59 @@ func (d dummyChunkData) UncompressedSize() int { func (d dummyChunkData) Entries() int { return 1 } + +// TestSetupBuilder_ManyFiles verifies that setupBuilder can handle processing +// many files without running into resource exhaustion issues. +func TestSetupBuilder_ManyFiles(t *testing.T) { + now := model.Now() + periodConfig := config.PeriodConfig{ + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, + Schema: "v12", + } + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + tableName := indexBkts[0] + + tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, "objects") + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) + + require.NoError(t, util.EnsureDirectory(objectStoragePath)) + require.NoError(t, util.EnsureDirectory(tablePathInStorage)) + require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) + + // Create a large number of files + numFiles := 100 + indexFormat, err := periodConfig.TSDBFormat() + require.NoError(t, err) + + // Create per-tenant index files in the user's directory + userTablePath := filepath.Join(tablePathInStorage, "user1") + require.NoError(t, util.EnsureDirectory(userTablePath)) + + lbls := mustParseLabels(`{foo="bar"}`) + for i := 0; i < numFiles; i++ { + streams := []stream{ + buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), + } + setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) + } + + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) + require.NoError(t, err) + + // This should complete without errors even with many files + // because files are closed immediately after processing + ctx := context.Background() + builder, err := setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) + require.NoError(t, err) + require.NotNil(t, builder) + + // Verify builder has the expected data + builder.FinalizeChunks() + require.Greater(t, len(builder.streams), 0) +}