From 0062ff83a36fe334fc03001e18fe12f536229799 Mon Sep 17 00:00:00 2001 From: David Grant Date: Thu, 2 Jan 2025 15:43:32 -0800 Subject: [PATCH] Cleanup of var names/comments. --- pkg/blockbuilder/blockbuilder.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 15fa842a36b..9b3389b17f5 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -198,11 +198,9 @@ func (b *BlockBuilder) runningPullMode(ctx context.Context) error { if _, err := b.consumeJob(ctx, key, spec); err != nil { level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err) - continue // ? + continue } - // TODO: CompleteJob needs to accept the new state returned from consumeJob. - if err := b.scheduler.CompleteJob(key); err != nil { level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err) } @@ -212,20 +210,20 @@ func (b *BlockBuilder) runningPullMode(ctx context.Context) error { } // consumeJob performs block consumption from Kafka into object storage based on the given job spec. -func (b *BlockBuilder) consumeJob(ctx context.Context, k schedulerpb.JobKey, jobSpec schedulerpb.JobSpec) (PartitionState, error) { +func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (PartitionState, error) { state := PartitionState{ Commit: kadm.Offset{ - Topic: jobSpec.Topic, - Partition: jobSpec.Partition, - At: jobSpec.StartOffset, + Topic: spec.Topic, + Partition: spec.Partition, + At: spec.StartOffset, }, - CommitRecordTimestamp: jobSpec.CommitRecTs, - LastSeenOffset: jobSpec.LastSeenOffset, - LastBlockEnd: jobSpec.LastBlockEndTs, + CommitRecordTimestamp: spec.CommitRecTs, + LastSeenOffset: spec.LastSeenOffset, + LastBlockEnd: spec.LastBlockEndTs, } - logger := log.With(b.logger, "job_id", k.Id, "job_epoch", k.Epoch) - return b.consumePartition(ctx, jobSpec.Partition, state, jobSpec.CycleEndTs, jobSpec.CycleEndOffset, logger) + logger := log.With(b.logger, "job_id", key.Id, "job_epoch", key.Epoch) + return b.consumePartition(ctx, spec.Partition, state, spec.CycleEndTs, spec.CycleEndOffset, logger) } // runningStandaloneMode is a service `running` function for standalone mode,