diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index d9acd954384..2d36ab22bc8 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -16,9 +16,7 @@ ingest_storage: topic: mimir-ingest last_produced_offset_poll_interval: 500ms startup_fetch_concurrency: 15 - startup_records_per_fetch: 2400 ongoing_fetch_concurrency: 2 - ongoing_records_per_fetch: 30 ingester: track_ingester_owned_series: true diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index 1a54972f61f..61858d37d9d 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -54,8 +54,6 @@ std.manifestYamlDoc({ '-ingester.ring.prefix=exclusive-prefix', '-ingest-storage.kafka.consume-from-position-at-startup=end', '-ingest-storage.kafka.consume-from-timestamp-at-startup=0', - '-ingest-storage.kafka.ingestion-concurrency=2', - '-ingest-storage.kafka.ingestion-concurrency-batch-size=150', '-ingest-storage.kafka.startup-fetch-concurrency=15', '-ingest-storage.kafka.ongoing-fetch-concurrency=2', ], diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index 37f8d9bef54..e87c8f47712 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -322,7 +322,7 @@ "command": - "sh" - "-c" - - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2" + - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2" "depends_on": "kafka_1": "condition": "service_healthy" diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index 56f7d246a3b..b3783301dfc 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1529,6 +1529,25 @@ How to **investigate**: - If the call exists and it's waiting on a lock then there may be a deadlock. - If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code. + +### MimirIngesterMissedRecordsFromKafka + +This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets. + +How it **works**: + +- Ingester reads records from Kafka, and processes them sequentially. It keeps track of the offset of the last record it processed. +- Upon fetching the next batch of records, it checks if the first available record has an offset one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records. +- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or the records have been produced in a transaction and the transaction was aborted, then there may be a gap. +- Mimir doesn't produce in transactions and does not delete records. +- When the ingester starts up, it will attempt to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester will miss some records. + +How to **investigate**: + +- Verify that there have been no deleted records in your Kafka cluster by humans or other applications. +- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition. +- Report a bug. + ### MimirStrongConsistencyEnforcementFailed This alert fires when too many read requests with strong consistency are failing. diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml index ebb3c5fd135..929e6927b57 100644 --- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml +++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml @@ -1133,6 +1133,15 @@ spec: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml index 77846f2f2d6..9283e7e8eea 100644 --- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml +++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml @@ -1107,6 +1107,15 @@ groups: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml index cef80213c98..be782a551da 100644 --- a/operations/mimir-mixin-compiled/alerts.yaml +++ b/operations/mimir-mixin-compiled/alerts.yaml @@ -1121,6 +1121,15 @@ groups: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin/alerts/ingest-storage.libsonnet b/operations/mimir-mixin/alerts/ingest-storage.libsonnet index 3cca0186d4b..b223a0514b7 100644 --- a/operations/mimir-mixin/alerts/ingest-storage.libsonnet +++ b/operations/mimir-mixin/alerts/ingest-storage.libsonnet @@ -161,6 +161,21 @@ }, }, + // Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck. + { + alert: $.alertName('IngesterMissedRecordsFromKafka'), + expr: ||| + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0 + ||| % $.alertRangeInterval(10), + labels: { + severity: 'critical', + }, + annotations: { + message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config, + }, + }, + // Alert firing if Mimir is failing to enforce strong read consistency. { alert: $.alertName('StrongConsistencyEnforcementFailed'), diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 710c7f5a6d4..2fe21c960ea 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -227,7 +227,7 @@ type concurrentFetchers struct { // ordering. orderedFetches chan fetchResult - lastReturnedRecord int64 + lastReturnedOffset int64 startOffsets *genericOffsetReader[int64] // trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes. @@ -283,7 +283,7 @@ func newConcurrentFetchers( partitionID: partition, metrics: metrics, minBytesWaitTime: minBytesWaitTime, - lastReturnedRecord: startOffset - 1, + lastReturnedOffset: startOffset - 1, startOffsets: startOffsetsReader, trackCompressedBytes: trackCompressedBytes, maxBufferedBytesLimit: maxBufferedBytesLimit, @@ -343,7 +343,7 @@ func (r *concurrentFetchers) Stop() { r.bufferedFetchedRecords.Store(0) r.bufferedFetchedBytes.Store(0) - level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset) } // Update implements fetcher @@ -352,7 +352,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) { r.done = make(chan struct{}) r.wg.Add(1) - go r.start(ctx, r.lastReturnedRecord+1, concurrency) + go r.start(ctx, r.lastReturnedOffset+1, concurrency) } // PollFetches implements fetcher @@ -369,12 +369,20 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont // PollFetches() calls). r.bufferedFetchedRecords.Sub(int64(len(f.Records))) - firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) f.Records = f.Records[firstUnreturnedRecordIdx:] if len(f.Records) > 0 { - r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset + if firstOffset := f.Records[0].Offset; firstOffset > r.lastReturnedOffset+1 { + r.metrics.missedRecords.Add(float64(firstOffset - r.lastReturnedOffset - 1)) + level.Error(r.logger).Log( + "msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka", + "next_available_offset", firstOffset, + "last_returned_offset", r.lastReturnedOffset, + ) + } + r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset } return kgo.Fetches{{ diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 9a4bf73426b..009b4fe039e 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -1007,6 +1007,7 @@ type readerMetrics struct { lastConsumedOffset prometheus.Gauge consumeLatency prometheus.Histogram kprom *kprom.Metrics + missedRecords prometheus.Counter } type readerMetricsSource interface { @@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), lastConsumedOffset: lastConsumedOffset, kprom: NewKafkaReaderClientMetrics(component, reg), + missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_missed_records_total", + Help: "The number of offsets that were never consumed by the reader because they weren't fetched.", + }), } m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {