Skip to content

Commit

Permalink
kafka replay speed: add alert for when we miss records in Kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Nov 15, 2024
1 parent 70fa210 commit 09b0814
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 11 deletions.
2 changes: 0 additions & 2 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ ingest_storage:
topic: mimir-ingest
last_produced_offset_poll_interval: 500ms
startup_fetch_concurrency: 15
startup_records_per_fetch: 2400
ongoing_fetch_concurrency: 2
ongoing_records_per_fetch: 30

ingester:
track_ingester_owned_series: true
Expand Down
2 changes: 0 additions & 2 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ std.manifestYamlDoc({
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=end',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.ingestion-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.startup-fetch-concurrency=15',
'-ingest-storage.kafka.ongoing-fetch-concurrency=2',
],
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
19 changes: 19 additions & 0 deletions docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,25 @@ How to **investigate**:
- If the call exists and it's waiting on a lock then there may be a deadlock.
- If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code.
### MimirIngesterMissedRecordsFromKafka
This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets.
How it **works**:
- Ingester reads records from Kafka, and processes them sequentially. It keeps track of the offset of the last record it processed.
- Upon fetching the next batch of records, it checks if the first available record has an offset one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.
- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or the records have been produced in a transaction and the transaction was aborted, then there may be a gap.
- Mimir doesn't produce in transactions and does not delete records.
- When the ingester starts up, it will attempt to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester will miss some records.
How to **investigate**:
- Verify that there have been no deleted records in your Kafka cluster by humans or other applications.
- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition.
- Report a bug.
### MimirStrongConsistencyEnforcementFailed
This alert fires when too many read requests with strong consistency are failing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,15 @@ spec:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
9 changes: 9 additions & 0 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,15 @@ groups:
for: 5m
labels:
severity: critical
- alert: MimirIngesterMissedRecordsFromKafka
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
expr: |
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
labels:
severity: critical
- alert: MimirStrongConsistencyEnforcementFailed
annotations:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
Expand Down
15 changes: 15 additions & 0 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@
},
},

// Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck.
{
alert: $.alertName('IngesterMissedRecordsFromKafka'),
expr: |||
# Alert if the ingester missed some records from Kafka.
increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0
||| % $.alertRangeInterval(10),
labels: {
severity: 'critical',
},
annotations: {
message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config,
},
},

// Alert firing if Mimir is failing to enforce strong read consistency.
{
alert: $.alertName('StrongConsistencyEnforcementFailed'),
Expand Down
20 changes: 14 additions & 6 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type concurrentFetchers struct {
// ordering.
orderedFetches chan fetchResult

lastReturnedRecord int64
lastReturnedOffset int64
startOffsets *genericOffsetReader[int64]

// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
Expand Down Expand Up @@ -283,7 +283,7 @@ func newConcurrentFetchers(
partitionID: partition,
metrics: metrics,
minBytesWaitTime: minBytesWaitTime,
lastReturnedRecord: startOffset - 1,
lastReturnedOffset: startOffset - 1,
startOffsets: startOffsetsReader,
trackCompressedBytes: trackCompressedBytes,
maxBufferedBytesLimit: maxBufferedBytesLimit,
Expand Down Expand Up @@ -343,7 +343,7 @@ func (r *concurrentFetchers) Stop() {
r.bufferedFetchedRecords.Store(0)
r.bufferedFetchedBytes.Store(0)

level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset)
}

// Update implements fetcher
Expand All @@ -352,7 +352,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) {
r.done = make(chan struct{})

r.wg.Add(1)
go r.start(ctx, r.lastReturnedRecord+1, concurrency)
go r.start(ctx, r.lastReturnedOffset+1, concurrency)
}

// PollFetches implements fetcher
Expand All @@ -369,12 +369,20 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
// PollFetches() calls).
r.bufferedFetchedRecords.Sub(int64(len(f.Records)))

firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

f.Records = f.Records[firstUnreturnedRecordIdx:]
if len(f.Records) > 0 {
r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
if firstOffset := f.Records[0].Offset; firstOffset > r.lastReturnedOffset+1 {
r.metrics.missedRecords.Add(float64(firstOffset - r.lastReturnedOffset - 1))
level.Error(r.logger).Log(
"msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka",
"next_available_offset", firstOffset,
"last_returned_offset", r.lastReturnedOffset,
)
}
r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset
}

return kgo.Fetches{{
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ type readerMetrics struct {
lastConsumedOffset prometheus.Gauge
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
missedRecords prometheus.Counter
}

type readerMetricsSource interface {
Expand Down Expand Up @@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
lastConsumedOffset: lastConsumedOffset,
kprom: NewKafkaReaderClientMetrics(component, reg),
missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_missed_records_total",
Help: "The number of offsets that were never consumed by the reader because they weren't fetched.",
}),
}

m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {
Expand Down

0 comments on commit 09b0814

Please sign in to comment.