Skip to content

Commit

Permalink
Cleanup of var names/comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Jan 2, 2025
1 parent 6206183 commit 0062ff8
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,9 @@ func (b *BlockBuilder) runningPullMode(ctx context.Context) error {

if _, err := b.consumeJob(ctx, key, spec); err != nil {
level.Error(b.logger).Log("msg", "failed to consume job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
continue // ?
continue
}

// TODO: CompleteJob needs to accept the new state returned from consumeJob.

if err := b.scheduler.CompleteJob(key); err != nil {
level.Error(b.logger).Log("msg", "failed to complete job", "job_id", key.Id, "epoch", key.Epoch, "err", err)
}
Expand All @@ -212,20 +210,20 @@ func (b *BlockBuilder) runningPullMode(ctx context.Context) error {
}

// consumeJob performs block consumption from Kafka into object storage based on the given job spec.
func (b *BlockBuilder) consumeJob(ctx context.Context, k schedulerpb.JobKey, jobSpec schedulerpb.JobSpec) (PartitionState, error) {
func (b *BlockBuilder) consumeJob(ctx context.Context, key schedulerpb.JobKey, spec schedulerpb.JobSpec) (PartitionState, error) {
state := PartitionState{
Commit: kadm.Offset{
Topic: jobSpec.Topic,
Partition: jobSpec.Partition,
At: jobSpec.StartOffset,
Topic: spec.Topic,
Partition: spec.Partition,
At: spec.StartOffset,
},
CommitRecordTimestamp: jobSpec.CommitRecTs,
LastSeenOffset: jobSpec.LastSeenOffset,
LastBlockEnd: jobSpec.LastBlockEndTs,
CommitRecordTimestamp: spec.CommitRecTs,
LastSeenOffset: spec.LastSeenOffset,
LastBlockEnd: spec.LastBlockEndTs,
}

logger := log.With(b.logger, "job_id", k.Id, "job_epoch", k.Epoch)
return b.consumePartition(ctx, jobSpec.Partition, state, jobSpec.CycleEndTs, jobSpec.CycleEndOffset, logger)
logger := log.With(b.logger, "job_id", key.Id, "job_epoch", key.Epoch)
return b.consumePartition(ctx, spec.Partition, state, spec.CycleEndTs, spec.CycleEndOffset, logger)
}

// runningStandaloneMode is a service `running` function for standalone mode,
Expand Down

0 comments on commit 0062ff8

Please sign in to comment.