From 26b27357682530445808d5b901fff29f572b67ab Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Fri, 15 Mar 2024 11:00:48 +0800 Subject: [PATCH] fix: query plan optimisation (#3100) --- pkg/ingester/retention.go | 3 ++ pkg/phlaredb/block_querier.go | 5 --- pkg/querier/replication.go | 62 +++++++++++++++++++++------------ pkg/querier/replication_test.go | 49 +++++++++++++++++++++++--- 4 files changed, 87 insertions(+), 32 deletions(-) diff --git a/pkg/ingester/retention.go b/pkg/ingester/retention.go index b5e14843f6..bdb12baefd 100644 --- a/pkg/ingester/retention.go +++ b/pkg/ingester/retention.go @@ -290,6 +290,9 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context) // isBlockDeletable returns true if this block can be deleted. func (dc *diskCleaner) isBlockDeletable(block *tenantBlock) bool { + // TODO(kolesnikovae): + // Expiry defaults to -querier.query-store-after which should be deprecated, + // blocks-storage.bucket-store.ignore-blocks-within can be used instead. expiryTs := time.Now().Add(-dc.policy.Expiry) return block.Uploaded && ulid.Time(block.ID.Time()).Before(expiryTs) } diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 3da1f8cb4e..b2df28ea09 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -285,11 +285,6 @@ type BlockInfo struct { Series uint64 } -func (b *BlockQuerier) BlockInfo() []BlockInfo { - result := make([]BlockInfo, len(b.queriers)) - return result -} - type singleBlockQuerier struct { logger log.Logger metrics *BlocksMetrics diff --git a/pkg/querier/replication.go b/pkg/querier/replication.go index 8f4c6f96a0..2969c9ee0e 100644 --- a/pkg/querier/replication.go +++ b/pkg/querier/replication.go @@ -200,10 +200,10 @@ func (r *replicasPerBlockID) removeBlock(ulid string) { } // this step removes sharded blocks that don't have all the shards present for a time window -func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { +func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() (bool, error) { type compactionKey struct { - level int32 - minT int64 + level int32 + minTime int64 } compactions := make(map[compactionKey][]string) @@ -211,15 +211,12 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { for blockID := range r.m { meta, ok := r.meta[blockID] if !ok { - return fmt.Errorf("meta missing for block id %s", blockID) - } - if !ok { - continue + return false, fmt.Errorf("meta missing for block id %s", blockID) } key := compactionKey{ - level: 0, - minT: meta.MinTime, + level: 0, + minTime: meta.MinTime, } if meta.Compaction != nil { @@ -230,8 +227,9 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { // now we go through every group and check if we see at least a block for each shard var ( - shardsSeen []bool - shardedBlocks []string + shardsSeen []bool + shardedBlocks []string + hasShardedBlocks bool ) for _, blocks := range compactions { shardsSeen = shardsSeen[:0] @@ -239,7 +237,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { for _, block := range blocks { meta, ok := r.meta[block] if !ok { - return fmt.Errorf("meta missing for block id %s", block) + return false, fmt.Errorf("meta missing for block id %s", block) } shardIdx, shards, ok := shardFromBlock(meta) @@ -247,6 +245,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { // not a sharded block continue continue } + hasShardedBlocks = true shardedBlocks = append(shardedBlocks, block) if len(shardsSeen) == 0 { @@ -261,7 +260,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { } if len(shardsSeen) != int(shards) { - return fmt.Errorf("shard length mismatch, shards seen: %d, shards as per label: %d", len(shardsSeen), shards) + return false, fmt.Errorf("shard length mismatch, shards seen: %d, shards as per label: %d", len(shardsSeen), shards) } shardsSeen[shardIdx] = true @@ -285,17 +284,15 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error { } } - return nil + return hasShardedBlocks, nil } // prunes blocks that are contained by a higher compaction level block -func (r *replicasPerBlockID) pruneSupersededBlocks() error { +func (r *replicasPerBlockID) pruneSupersededBlocks(sharded bool) error { for blockID := range r.m { meta, ok := r.meta[blockID] if !ok { - if !ok { - return fmt.Errorf("meta missing for block id %s", blockID) - } + return fmt.Errorf("meta missing for block id %s", blockID) } if meta.Compaction == nil { continue @@ -303,6 +300,16 @@ func (r *replicasPerBlockID) pruneSupersededBlocks() error { if meta.Compaction.Level < 2 { continue } + // At split phase of compaction, L2 is an intermediate step where we + // split each group into split_shards parts, thus there will be up to + // groups_num * split_shards blocks, which is typically _significantly_ + // greater that the number of source blocks. Moreover, these blocks are + // not yet deduplicated, therefore we should prefer L1 blocks over them. + // As an optimisation, we drop all L2 blocks. + if sharded && meta.Compaction.Level == 2 { + r.removeBlock(blockID) + continue + } for _, blockID := range meta.Compaction.Parents { r.removeBlock(blockID) } @@ -331,11 +338,21 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*ingestv1 smallestCompactionLevel = int32(0) ) - if err := r.pruneIncompleteShardedBlocks(); err != nil { + sharded, err := r.pruneIncompleteShardedBlocks() + if err != nil { level.Warn(r.logger).Log("msg", "block planning failed to prune incomplete sharded blocks", "err", err) return nil } - if err := r.pruneSupersededBlocks(); err != nil { + + // Depending on whether split sharding is used, the compaction level at + // which the data gets deduplicated differs: if split sharding is enabled, + // we deduplicate at level 3, and at level 2 otherwise. + var deduplicationLevel int32 = 2 + if sharded { + deduplicationLevel = 3 + } + + if err := r.pruneSupersededBlocks(sharded); err != nil { level.Warn(r.logger).Log("msg", "block planning failed to prune superseded blocks", "err", err) return nil } @@ -351,8 +368,9 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*ingestv1 if !ok { continue } - // when we see a block with CompactionLevel <=1 or a block without compaction section, we want the queriers to deduplicate - if meta.Compaction == nil || meta.Compaction.Level <= 1 { + // when we see a block with CompactionLevel less than the level at which data is deduplicated, + // or a block without compaction section, we want the queriers to deduplicate + if meta.Compaction == nil || meta.Compaction.Level < deduplicationLevel { deduplicate = true } diff --git a/pkg/querier/replication_test.go b/pkg/querier/replication_test.go index a2f5b36871..79abd2b2d1 100644 --- a/pkg/querier/replication_test.go +++ b/pkg/querier/replication_test.go @@ -190,9 +190,28 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) { { addr: "store-gateway-0", response: []*typesv1.BlockInfo{ - newBlockInfo("a-1").withCompactionLevel(2).withCompactionSources("a").withCompactionParents("a").withCompactorShard(1, 2).withMinTime(t1, time.Hour-time.Second).info(), - newBlockInfo("b-1").withCompactionLevel(2).withCompactionSources("b").withCompactionParents("b").withCompactorShard(2, 2).withMinTime(t2, time.Hour-(500*time.Millisecond)).info(), - newBlockInfo("b-2").withCompactionLevel(2).withCompactionSources("b").withCompactionParents("b").withCompactorShard(2, 2).withMinTime(t2, time.Hour-time.Second).info(), + newBlockInfo("a-1"). + withCompactionLevel(3). + withCompactionSources("a"). + withCompactionParents("a"). + withCompactorShard(0, 2). + withMinTime(t1, time.Hour-time.Second). + info(), + + newBlockInfo("b-1"). + withCompactionLevel(3). + withCompactionSources("b"). + withCompactionParents("b"). + withCompactorShard(0, 2). + withMinTime(t2, time.Hour-(500*time.Millisecond)).info(), + + newBlockInfo("b-2"). + withCompactionLevel(3). + withCompactionSources("b"). + withCompactionParents("b"). + withCompactorShard(1, 2). + withMinTime(t2, time.Hour-time.Second). + info(), }, }, }, storeGatewayInstance) @@ -205,8 +224,9 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) { }, }, { - // Using a split-and-merge compactor, the level 2 will be referencing sources but will not guarantee that all the source is in the - // TEST if all splits are there before erasing the level 1 before. + // Using a split-and-merge compactor, deduplication happens at level 3, + // level 2 is intermediate step, where series distributed among shards + // but not yet deduplicated. name: "ignore blocks which are sharded and in level 2", inputs: func(r *replicasPerBlockID) { r.add([]ResponseFromReplica[[]*typesv1.BlockInfo]{ @@ -223,6 +243,25 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) { addr: "store-gateway-0", response: []*typesv1.BlockInfo{ newBlockInfo("a").info(), + newBlockInfo("a-1"). + withCompactionLevel(2). + withCompactionSources("a"). + withCompactionParents("a"). + withCompactorShard(0, 2). + info(), + + newBlockInfo("a-2"). + withCompactionLevel(2). + withCompactionSources("a"). + withCompactionParents("a"). + withCompactorShard(1, 2). + info(), + + newBlockInfo("a-3"). + withCompactionLevel(3). + withCompactionSources("a-2"). + withCompactorShard(0, 3). + info(), }, }, }, storeGatewayInstance)