Skip to content

Commit

Permalink
Cherry-pick fixes to release 1.15 branch (#5241)
Browse files Browse the repository at this point in the history
* Batch Iterator optimization (#5237)

* Batch Opmization

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Add test bacj

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Testing Multiples scrape intervals

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* no assimption

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Using max chunk ts

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* test with scrape 10

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* rename method

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* comments

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* using next

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* change test name

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* changelog/comments

Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
Signed-off-by: Ben Ye <benye@amazon.com>

* Store Gateway: Convert metrics from summary to histograms (#5239)

* Convert following metrics from summary to histogram

cortex_bucket_store_series_blocks_queried
cortex_bucket_store_series_data_fetched
cortex_bucket_store_series_data_size_touched_bytes
cortex_bucket_store_series_data_size_fetched_bytes
cortex_bucket_store_series_data_touched
cortex_bucket_store_series_result_series

Signed-off-by: Friedrich Gonzalez <friedrichg@gmail.com>

* Update changelog

Signed-off-by: Friedrich Gonzalez <friedrichg@gmail.com>

* fix changelog

Signed-off-by: Friedrich Gonzalez <friedrichg@gmail.com>

---------

Signed-off-by: Friedrich Gonzalez <friedrichg@gmail.com>
Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* Catch context error in the s3 bucket client (#5240)

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
Signed-off-by: Ben Ye <benye@amazon.com>

* bump RC version

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
Signed-off-by: Ben Ye <benye@amazon.com>
Signed-off-by: Friedrich Gonzalez <friedrichg@gmail.com>
Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
Co-authored-by: Alan Protasio <approtas@amazon.com>
Co-authored-by: Friedrich Gonzalez <friedrichg@gmail.com>
Co-authored-by: Xiaochao Dong <the.xcdong@gmail.com>
  • Loading branch information
4 people authored Apr 1, 2023
1 parent ebb1835 commit af49d70
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 46 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Tracing: Use the default OTEL trace sampler when `-tracing.otel.exporter-type` is set to `awsxray`. #5141
* [CHANGE] Ingester partial error log line to debug level. #5192
* [CHANGE] Change HTTP status code from 503/422 to 499 if a request is canceled. #5220
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down Expand Up @@ -44,6 +45,7 @@
* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193
* [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198
* [ENHANCEMENT] Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations. #5213
* [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055
Expand All @@ -57,6 +59,7 @@
* [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188
* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217
* [BUGFIX] Query Frontend: Disable `absent`, `absent_over_time` and `scalar` for vertical sharding. #5221
* [BUGFIX] Catch context error in the s3 bucket client. #5240

## 1.14.0 2022-12-02

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.15.0-rc.0
1.15.0-rc.1
14 changes: 14 additions & 0 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type iterator interface {
// Seek or Next have returned true.
AtTime() int64

// MaxCurrentChunkTime returns the max time on the current chunk.
MaxCurrentChunkTime() int64

// Batch returns the current batch. Must only be called after Seek or Next
// have returned true.
Batch() promchunk.Batch
Expand Down Expand Up @@ -98,6 +101,17 @@ func (a *iteratorAdapter) Seek(t int64) bool {
a.curr.Index++
}
return true
} else if t <= a.underlying.MaxCurrentChunkTime() {
// In this case, some timestamp inside the current underlying chunk can fulfill the seek.
// In this case we will call next until we find the sample as it will be faster than calling
// `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk.
// See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45
// https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95
for a.Next() {
if t <= a.curr.Timestamps[a.curr.Index] {
return true
}
}
}
}

Expand Down
59 changes: 54 additions & 5 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
scenario.duplicationFactor,
scenario.enc.String())

chunks := createChunks(b, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
chunks := createChunks(b, step, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)

b.Run(name, func(b *testing.B) {
b.ReportAllocs()
Expand All @@ -55,10 +55,59 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
}
}

func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
scenarios := []struct {
numChunks int
numSamplesPerChunk int
duplicationFactor int
seekStep time.Duration
scrapeInterval time.Duration
enc promchunk.Encoding
}{
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 200, enc: promchunk.PrometheusXorChunk},

{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 200, enc: promchunk.PrometheusXorChunk},
}

for _, scenario := range scenarios {
name := fmt.Sprintf("scrapeInterval %vs seekStep: %vs",
scenario.scrapeInterval.Seconds(),
scenario.seekStep.Seconds())

chunks := createChunks(b, scenario.scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)

b.Run(name, func(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
i := int64(0)
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
i++
}
}
})
}
}

func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
t.Parallel()
chunkOne := mkChunk(t, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunks := []chunk.Chunk{chunkOne, chunkTwo}

sut := NewChunkMergeIterator(chunks, 0, 0)
Expand All @@ -72,13 +121,13 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
require.Equal(t, int64(1*time.Second/time.Millisecond), actual)
}

func createChunks(b *testing.B, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
result := make([]chunk.Chunk, 0, numChunks)

for d := 0; d < duplicationFactor; d++ {
for c := 0; c < numChunks; c++ {
minTime := step * time.Duration(c*numSamplesPerChunk)
result = append(result, mkChunk(b, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
result = append(result, mkChunk(b, step, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (i *chunkIterator) reset(chunk GenericChunk) {
i.batch.Index = 0
}

func (i *chunkIterator) MaxCurrentChunkTime() int64 {
return i.chunk.MaxTime
}

// Seek advances the iterator forward to the value at or after
// the given timestamp.
func (i *chunkIterator) Seek(t int64, size int) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) {
}
}

func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
metric := labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
}
Expand All @@ -65,7 +65,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
}

func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) GenericChunk {
ck := mkChunk(t, from, points, enc)
ck := mkChunk(t, step, from, points, enc)
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func (c *mergeIterator) AtTime() int64 {
return c.batches[0].Timestamps[0]
}

func (c *mergeIterator) MaxCurrentChunkTime() int64 {
if len(c.h) < 1 {
return -1
}

return c.h[0].MaxCurrentChunkTime()
}

func (c *mergeIterator) Batch() promchunk.Batch {
return c.batches[0]
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/non_overlapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (it *nonOverlappingIterator) Seek(t int64, size int) bool {
}
}

func (it *nonOverlappingIterator) MaxCurrentChunkTime() int64 {
return it.iter.MaxCurrentChunkTime()
}

func (it *nonOverlappingIterator) Next(size int) bool {
for {
if it.iter.Next(size) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation
level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr, "operation", operationInfo)
return lastErr
}
return nil
return retries.Err()
}

func (b *BucketWithRetries) Name() string {
Expand Down
21 changes: 20 additions & 1 deletion pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ func TestBucketWithRetries_UploadFailed(t *testing.T) {
require.ErrorContains(t, err, "failed upload: ")
}

func TestBucketWithRetries_ContextCanceled(t *testing.T) {
t.Parallel()

m := mockBucket{}
b := BucketWithRetries{
logger: log.NewNopLogger(),
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

ctx, cancel := context.WithCancel(context.Background())
cancel()
obj, err := b.GetRange(ctx, "dummy", 0, 10)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, obj)
}

type fakeReader struct {
}

Expand Down Expand Up @@ -121,7 +140,7 @@ func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error

// GetRange mocks objstore.Bucket.GetRange()
func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return nil, nil
return io.NopCloser(bytes.NewBuffer(bytes.Repeat([]byte{1}, int(length)))), nil
}

// Exists mocks objstore.Bucket.Exists()
Expand Down
12 changes: 6 additions & 6 deletions pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) {

data.SendSumOfGaugesPerUser(out, m.blocksLoaded, "thanos_bucket_store_blocks_loaded")

data.SendSumOfSummariesWithLabels(out, m.seriesDataTouched, "thanos_bucket_store_series_data_touched", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataFetched, "thanos_bucket_store_series_data_fetched", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeTouched, "thanos_bucket_store_series_data_size_touched_bytes", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeFetched, "thanos_bucket_store_series_data_size_fetched_bytes", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesBlocksQueried, "thanos_bucket_store_series_blocks_queried")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataTouched, "thanos_bucket_store_series_data_touched", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataFetched, "thanos_bucket_store_series_data_fetched", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataSizeTouched, "thanos_bucket_store_series_data_size_touched_bytes", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataSizeFetched, "thanos_bucket_store_series_data_size_fetched_bytes", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesBlocksQueried, "thanos_bucket_store_series_blocks_queried")

data.SendSumOfHistograms(out, m.seriesGetAllDuration, "thanos_bucket_store_series_get_all_duration_seconds")
data.SendSumOfHistograms(out, m.seriesMergeDuration, "thanos_bucket_store_series_merge_duration_seconds")
data.SendSumOfCounters(out, m.seriesRefetches, "thanos_bucket_store_series_refetches_total")
data.SendSumOfSummaries(out, m.resultSeriesCount, "thanos_bucket_store_series_result_series")
data.SendSumOfHistograms(out, m.resultSeriesCount, "thanos_bucket_store_series_result_series")
data.SendSumOfCounters(out, m.queriesDropped, "thanos_bucket_store_queries_dropped_total")

data.SendSumOfCountersWithLabels(out, m.cachedPostingsCompressions, "thanos_bucket_store_cached_postings_compressions_total", "op")
Expand Down
Loading

0 comments on commit af49d70

Please sign in to comment.