diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index ebff69810647c..b19b14397d92b 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" shipperindex "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" tsdbindex "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -205,6 +206,42 @@ func (t *tableCompactor) CompactTable() error { return nil } +// processSourceIndex processes a single source index file by downloading it, +// reading its series, and adding them to the builder. The file is closed and removed +// using defer statements to ensure cleanup happens even if errors occur. +func processSourceIndex(ctx context.Context, sourceIndex storage.IndexFile, sourceIndexSet compactor.IndexSet, builder *Builder) error { + path, err := sourceIndexSet.GetSourceFile(sourceIndex) + if err != nil { + return err + } + + // Ensure the downloaded file is removed when this function returns + defer func() { + if removeErr := os.Remove(path); removeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", removeErr) + } + }() + + indexFile, err := OpenShippableTSDB(path) + if err != nil { + return err + } + + // Ensure file is closed when this function returns + defer func() { + if closeErr := indexFile.Close(); closeErr != nil { + level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", closeErr) + } + }() + + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { + builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) + return false + }, labels.MustNewMatcher(labels.MatchEqual, "", "")) + + return err +} + // setupBuilder creates a Builder for a single user. // It combines the users index from multiTenantIndexes and its existing compacted index(es) func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndexSet compactor.IndexSet, multiTenantIndexes []Index) (*Builder, error) { @@ -224,33 +261,7 @@ func setupBuilder(ctx context.Context, indexType int, userID string, sourceIndex // download all the existing compacted indexes and add them to the builder for _, sourceIndex := range sourceIndexes { - path, err := sourceIndexSet.GetSourceFile(sourceIndex) - if err != nil { - return nil, err - } - - defer func() { - if err := os.Remove(path); err != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "error removing source index file", "err", err) - } - }() - - indexFile, err := OpenShippableTSDB(path) - if err != nil { - return nil, err - } - - defer func() { - if err := indexFile.Close(); err != nil { - level.Error(sourceIndexSet.GetLogger()).Log("msg", "failed to close index file", "err", err) - } - }() - - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, "", nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []tsdbindex.ChunkMeta) (stop bool) { - builder.AddSeries(lbls.Copy(), fp, chks) - return false - }, labels.MustNewMatcher(labels.MatchEqual, "", "")) - if err != nil { + if err := processSourceIndex(ctx, sourceIndex, sourceIndexSet, builder); err != nil { return nil, err } } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index 99804831deb81..ded9344c9353a 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -997,3 +997,59 @@ func (d dummyChunkData) UncompressedSize() int { func (d dummyChunkData) Entries() int { return 1 } + +// TestSetupBuilder_ManyFiles verifies that setupBuilder can handle processing +// many files without running into resource exhaustion issues. +func TestSetupBuilder_ManyFiles(t *testing.T) { + now := model.Now() + periodConfig := config.PeriodConfig{ + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}}, + Schema: "v12", + } + indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + tableName := indexBkts[0] + + tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, "objects") + tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix) + tableWorkingDirectory := filepath.Join(tempDir, "working-dir", tableName.Prefix) + + require.NoError(t, util.EnsureDirectory(objectStoragePath)) + require.NoError(t, util.EnsureDirectory(tablePathInStorage)) + require.NoError(t, util.EnsureDirectory(tableWorkingDirectory)) + + // Create a large number of files + numFiles := 100 + indexFormat, err := periodConfig.TSDBFormat() + require.NoError(t, err) + + // Create per-tenant index files in the user's directory + userTablePath := filepath.Join(tablePathInStorage, "user1") + require.NoError(t, util.EnsureDirectory(userTablePath)) + + lbls := mustParseLabels(`{foo="bar"}`) + for i := 0; i < numFiles; i++ { + streams := []stream{ + buildStream(lbls, buildChunkMetas(int64(i*1000), int64(i*1000+100)), ""), + } + setupPerTenantIndex(t, indexFormat, streams, userTablePath, time.Unix(int64(i), 0)) + } + + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + idxSet, err := newMockIndexSet("user1", tableName.Prefix, filepath.Join(tableWorkingDirectory, "user1"), objectClient) + require.NoError(t, err) + + // This should complete without errors even with many files + // because files are closed immediately after processing + ctx := context.Background() + builder, err := setupBuilder(ctx, indexFormat, "user1", idxSet, []Index{}) + require.NoError(t, err) + require.NotNil(t, builder) + + // Verify builder has the expected data + builder.FinalizeChunks() + require.Greater(t, len(builder.streams), 0) +}