Skip to content

Commit 082b7a8

Browse files
authored
chore(v2): improve error handling in compaction-worker (#3860)
* chore(v2): improve error handling in compaction-worker * chore(v2): add some more safety checks
1 parent 1b8eb6f commit 082b7a8

File tree

2 files changed

+49
-27
lines changed

2 files changed

+49
-27
lines changed

pkg/experiment/compactor/compaction_worker.go

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ type compactionJob struct {
7777
compacted *metastorev1.CompactedBlocks
7878
}
7979

80+
// String representation of the compaction job.
81+
// Is only used for logging and metrics.
82+
type jobStatus string
83+
84+
const (
85+
statusSuccess jobStatus = "success"
86+
statusFailure jobStatus = "failure"
87+
statusCancelled jobStatus = "cancelled"
88+
statusNoMeta jobStatus = "metadata_not_found"
89+
statusNoBlocks jobStatus = "blocks_not_found"
90+
)
91+
8092
type MetastoreClient interface {
8193
metastorev1.CompactionServiceClient
8294
metastorev1.IndexServiceClient
@@ -232,7 +244,7 @@ func (w *Worker) collectUpdates() []*metastorev1.CompactionJobStatusUpdate {
232244
updates = append(updates, update)
233245

234246
case done && job.compacted == nil:
235-
// We're not sending a status update for the job and expect that the
247+
// We're not sending the status update for the job and expect that the
236248
// assigment is to be revoked. The job is to be removed at the next
237249
// poll response handling: all jobs without assignments are canceled
238250
// and removed.
@@ -322,9 +334,9 @@ func (w *Worker) handleResponse(resp *metastorev1.PollCompactionJobsResponse) (n
322334
func (w *Worker) runCompaction(job *compactionJob) {
323335
start := time.Now()
324336
labels := []string{job.Tenant, strconv.Itoa(int(job.CompactionLevel))}
325-
statusName := "failure"
337+
statusName := statusFailure
326338
defer func() {
327-
labelsWithStatus := append(labels, statusName)
339+
labelsWithStatus := append(labels, string(statusName))
328340
w.metrics.jobDuration.WithLabelValues(labelsWithStatus...).Observe(time.Since(start).Seconds())
329341
w.metrics.jobsCompleted.WithLabelValues(labelsWithStatus...).Inc()
330342
w.metrics.jobsInProgress.WithLabelValues(labels...).Dec()
@@ -342,6 +354,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
342354
defer sp.Finish()
343355

344356
logger := log.With(w.logger, "job", job.Name)
357+
level.Info(logger).Log("msg", "starting compaction job", "source_blocks", strings.Join(job.SourceBlocks, " "))
358+
if err := w.getBlockMetadata(logger, job); err != nil {
359+
// The error is likely to be transient, therefore the job is not failed,
360+
// but just abandoned – another worker will pick it up and try again.
361+
return
362+
}
363+
345364
deleteGroup, deleteCtx := errgroup.WithContext(ctx)
346365
for _, t := range job.Tombstones {
347366
if b := t.GetBlocks(); b != nil {
@@ -357,11 +376,19 @@ func (w *Worker) runCompaction(job *compactionJob) {
357376
}
358377
}
359378

360-
level.Info(logger).Log(
361-
"msg", "starting compaction job",
362-
"source_blocks", strings.Join(job.SourceBlocks, " "),
363-
)
364-
if err := w.getBlockMetadata(logger, job); err != nil {
379+
if len(job.blocks) == 0 {
380+
// This is a very bad situation that we do not expect, unless the
381+
// metastore is restored from a snapshot: no metadata found for the
382+
// job source blocks. There's no point in retrying or failing the
383+
// job (which is likely to be retried by another worker), so we just
384+
// skip it. The same for the situation when no block objects can be
385+
// found in storage, which may happen if the blocks are deleted manually.
386+
level.Error(logger).Log("msg", "no block metadata found; skipping")
387+
job.compacted = &metastorev1.CompactedBlocks{SourceBlocks: new(metastorev1.BlockList)}
388+
statusName = statusNoMeta
389+
// We, however, want to remove the tombstones: those are not the
390+
// blocks we were supposed to compact.
391+
_ = deleteGroup.Wait()
365392
return
366393
}
367394

@@ -387,7 +414,7 @@ func (w *Worker) runCompaction(job *compactionJob) {
387414
"output_blocks", len(compacted),
388415
)
389416
for _, c := range compacted {
390-
level.Info(logger).Log(
417+
level.Debug(logger).Log(
391418
"msg", "new compacted block",
392419
"block_id", c.Id,
393420
"block_tenant", metadata.Tenant(c),
@@ -399,8 +426,6 @@ func (w *Worker) runCompaction(job *compactionJob) {
399426
"datasets", len(c.Datasets),
400427
)
401428
}
402-
403-
statusName = "success"
404429
job.compacted = &metastorev1.CompactedBlocks{
405430
NewBlocks: compacted,
406431
SourceBlocks: &metastorev1.BlockList{
@@ -412,13 +437,20 @@ func (w *Worker) runCompaction(job *compactionJob) {
412437

413438
firstBlock := metadata.Timestamp(job.blocks[0])
414439
w.metrics.timeToCompaction.WithLabelValues(labels...).Observe(time.Since(firstBlock).Seconds())
440+
statusName = statusSuccess
415441

416442
case errors.Is(err, context.Canceled):
417-
level.Warn(logger).Log("msg", "job cancelled")
418-
statusName = "cancelled"
443+
level.Warn(logger).Log("msg", "compaction cancelled")
444+
statusName = statusCancelled
445+
446+
case w.storage.IsObjNotFoundErr(err):
447+
level.Error(logger).Log("msg", "failed to find blocks", "err", err)
448+
job.compacted = &metastorev1.CompactedBlocks{SourceBlocks: new(metastorev1.BlockList)}
449+
statusName = statusNoBlocks
419450

420451
default:
421452
level.Error(logger).Log("msg", "failed to compact blocks", "err", err)
453+
statusName = statusFailure
422454
}
423455

424456
// The only error returned by Wait is the context
@@ -442,23 +474,13 @@ func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
442474
return err
443475
}
444476

445-
source := resp.GetBlocks()
446-
if len(source) == 0 {
447-
level.Warn(logger).Log(
448-
"msg", "no block metadata found; skipping",
449-
"blocks", len(job.SourceBlocks),
450-
"blocks_found", len(source),
451-
)
452-
return fmt.Errorf("no blocks to compact")
453-
}
454-
477+
job.blocks = resp.GetBlocks()
455478
// Update the plan to reflect the actual compaction job state.
456479
job.SourceBlocks = job.SourceBlocks[:0]
457-
for _, b := range source {
480+
for _, b := range job.blocks {
458481
job.SourceBlocks = append(job.SourceBlocks, b.Id)
459482
}
460483

461-
job.blocks = source
462484
return nil
463485
}
464486

pkg/experiment/metastore/compaction_raft_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@ func (h *CompactionCommandHandler) UpdateCompactionPlan(
163163

164164
for _, job := range req.PlanUpdate.CompletedJobs {
165165
compacted := job.GetCompactedBlocks()
166-
if compacted == nil {
167-
level.Error(h.logger).Log("msg", "compacted blocks are missing", "job", job.State.Name)
166+
if compacted == nil || compacted.SourceBlocks == nil || len(compacted.NewBlocks) == 0 {
167+
level.Warn(h.logger).Log("msg", "compacted blocks are missing; skipping", "job", job.State.Name)
168168
continue
169169
}
170170
if err := h.tombstones.AddTombstones(tx, cmd, blockTombstonesForCompletedJob(job)); err != nil {

0 commit comments

Comments
 (0)