Skip to content

Commit

Permalink
Keep duplicate chunks if there is 3 identical ones
Browse files Browse the repository at this point in the history
  • Loading branch information
hczhu-db committed Aug 7, 2024
1 parent 759d333 commit 149ecdc
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 5 deletions.
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func registerQuery(app *extkingpin.App) {

enableDedupMerge := cmd.Flag("query.dedup-merge", "Enable deduplication merge of multiple time series with the same labels.").
Default("false").Bool()
enableQuorumChunkDedup := cmd.Flag("query.quorum-chunk-dedup", "Enable quorum-based deduplication for chuncks from replicas.").
Default("false").Bool()

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -378,6 +380,7 @@ func registerQuery(app *extkingpin.App) {
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*enableDedupMerge,
*enableQuorumChunkDedup,
)
})
}
Expand Down Expand Up @@ -462,6 +465,7 @@ func runQuery(
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
enableDedupMerge bool,
enableQuorumChunkDedup bool,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -536,6 +540,7 @@ func runQuery(
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
store.WithQuorumChunkDedup(enableQuorumChunkDedup),
}

var (
Expand Down
10 changes: 10 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
quorumChunkDedup bool
}

type proxyStoreMetrics struct {
Expand Down Expand Up @@ -127,6 +128,12 @@ func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
}
}

func WithQuorumChunkDedup(enable bool) ProxyStoreOption {
return func(s *ProxyStore) {
s.quorumChunkDedup = enable
}
}

// WithTSDBSelector sets the TSDB selector for the proxy.
func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
return func(s *ProxyStore) {
Expand Down Expand Up @@ -449,6 +456,9 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";"))

respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...))
if s.quorumChunkDedup {
respHeap.quorumChunkDedup = true
}
for respHeap.Next() {
resp := respHeap.At()

Expand Down
31 changes: 26 additions & 5 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type responseDeduplicator struct {

prev *storepb.SeriesResponse
ok bool
quorumChunkDedup bool
}

// NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one.
Expand Down Expand Up @@ -73,7 +74,7 @@ func (d *responseDeduplicator) Next() bool {
d.ok = d.h.Next()
if !d.ok {
if len(d.bufferedSameSeries) > 0 {
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries))
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries, d.quorumChunkDedup))
}
return len(d.bufferedResp) > 0
}
Expand Down Expand Up @@ -101,15 +102,16 @@ func (d *responseDeduplicator) Next() bool {
continue
}

d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries))
d.bufferedResp = append(d.bufferedResp, chainSeriesAndRemIdenticalChunks(d.bufferedSameSeries, d.quorumChunkDedup))
d.prev = s

return true
}
}

func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb.SeriesResponse {
func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse, quorum bool) *storepb.SeriesResponse {
chunkDedupMap := map[uint64]*storepb.AggrChunk{}
chunckCountMap := map[uint64]int{}

for _, s := range series {
for _, chk := range s.GetSeries().Chunks {
Expand All @@ -127,7 +129,10 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb
if _, ok := chunkDedupMap[hash]; !ok {
chk := chk
chunkDedupMap[hash] = &chk
chunckCountMap[hash] = 1
break
} else {
chunckCountMap[hash]++
}
}
}
Expand All @@ -139,8 +144,24 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb
}

finalChunks := make([]storepb.AggrChunk, 0, len(chunkDedupMap))
for _, chk := range chunkDedupMap {
finalChunks = append(finalChunks, *chk)
for hash, chk := range chunkDedupMap {
if quorum {
// NB: this is specific to Databricks' setup where each time series is written to at least 2 out of 3 replicas.
// Each chunk should have 3 replicas in most cases, and 2 replicas in the worst acceptable cases.
// Quorum-based deduplication is used to pick the majority value among 3 replicas.
// If a chunck has only 2 identical replicas, there might be another chunk with corrupt data.
// We want to send those two identical replicas to the later quorum-based deduplication process to dominate any corrupt third replica.
if chunckCountMap[hash] >= 3 {
// Most of cases should hit this branch.
finalChunks = append(finalChunks, *chk)
} else {
for i := 0; i < chunckCountMap[hash]; i++ {
finalChunks = append(finalChunks, *chk)
}
}
} else {
finalChunks = append(finalChunks, *chk)
}
}

sort.Slice(finalChunks, func(i, j int) bool {
Expand Down
152 changes: 152 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2295,3 +2295,155 @@ func TestDedupRespHeap_Deduplication(t *testing.T) {
}

}

func TestDedupRespHeap_QuorumChunkDedup(t *testing.T) {
t.Parallel()

for _, tcase := range []struct {
responses []*storepb.SeriesResponse
testFn func(responses []*storepb.SeriesResponse, h *responseDeduplicator)
tname string
}{
{
tname: "edge case with only one response",
responses: []*storepb.SeriesResponse{
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
},
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
testutil.Equals(t, true, h.Next())
resp := h.At()
testutil.Equals(t, responses[0], resp)
testutil.Equals(t, false, h.Next())
},
},
{
tname: "keep 2 identical series",
responses: []*storepb.SeriesResponse{
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
},
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
testutil.Equals(t, true, h.Next())
resp := h.At()
testutil.Equals(t, 2, len(resp.GetSeries().Chunks))
testutil.Equals(t, false, h.Next())
},
},
{
tname: "dedup 3 identical series",
responses: []*storepb.SeriesResponse{
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
{
Result: &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Chunks: []storepb.AggrChunk{
{
Raw: &storepb.Chunk{
Type: storepb.Chunk_XOR,
Hash: xxhash.Sum64([]byte(`abcdefgh`)),
Data: []byte(`abcdefgh`),
},
},
},
},
},
},
},
testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) {
testutil.Equals(t, true, h.Next())
resp := h.At()
testutil.Equals(t, responses[0], resp)
testutil.Equals(t, 1, len(resp.GetSeries().Chunks))
testutil.Equals(t, false, h.Next())
},
},
} {
t.Run(tcase.tname, func(t *testing.T) {
h := NewResponseDeduplicator(NewProxyResponseLoserTree(
&eagerRespSet{
closeSeries: func() {},
wg: &sync.WaitGroup{},
bufferedResponses: tcase.responses,
},
))
h.quorumChunkDedup = true
tcase.testFn(tcase.responses, h)
})
}

}

0 comments on commit 149ecdc

Please sign in to comment.