Skip to content

Commit fba0faa

Browse files
authored
fix(v2): avoid starvation of failed jobs in the compaction scheduler (#3732)
1 parent c51531c commit fba0faa

File tree

7 files changed

+138
-39
lines changed

7 files changed

+138
-39
lines changed

pkg/experiment/metastore/compaction/compaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,6 @@ type Schedule interface {
5555
// AssignJob is called on behalf of the worker to request a new job.
5656
AssignJob() (*raft_log.AssignedCompactionJob, error)
5757
// AddJob is called on behalf of the planner to add a new job to the schedule.
58+
// The scheduler may decline the job by returning a nil state.
5859
AddJob(*raft_log.CompactionJobPlan) *raft_log.CompactionJobState
5960
}

pkg/experiment/metastore/compaction/compactor/compactor_strategy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func DefaultStrategy() Strategy {
3030
MaxBlocksPerLevel: []uint{20, 10, 10},
3131
MaxBlocksDefault: 10,
3232
MaxLevel: 3,
33-
MaxBatchAge: 3 * time.Minute.Nanoseconds(), //defaultMaxBlockBatchAge,
33+
MaxBatchAge: defaultMaxBlockBatchAge,
3434
CleanupBatchSize: 2,
3535
CleanupDelay: 15 * time.Minute,
3636
CleanupJobMaxLevel: 1,

pkg/experiment/metastore/compaction/scheduler/schedule.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ type schedule struct {
2121
// Read-only.
2222
scheduler *Scheduler
2323
// Uncommitted schedule updates.
24-
updates map[string]*raft_log.CompactionJobState
24+
updates map[string]*raft_log.CompactionJobState
25+
addedJobs int
2526
// Modified copy of the job queue.
2627
copied []priorityJobQueue
2728
level int
@@ -90,10 +91,29 @@ func (p *schedule) newStateForStatusReport(status *raft_log.CompactionJobStatusU
9091
return nil
9192
}
9293

93-
// AddJob creates a state for the new plan. The method must be called
94-
// after the last AssignJob and UpdateJob calls.
94+
// AddJob creates a state for the newly planned job.
95+
//
96+
// The method must be called after the last AssignJob and UpdateJob calls.
97+
// It returns an empty state if the queue size limit is reached.
98+
//
99+
// TODO(kolesnikovae): Implement displacement policy.
100+
// When the scheduler queue is full, no new jobs can be added. Currently,
101+
// it's possible that all jobs fail and can't be retried, and consequently,
102+
// can't leave the queue, blocking the entire compaction process until the
103+
// failure or queue limit is increased. Additionally, it's possible for a
104+
// job to never be completed and thus remain in the queue indefinitely.
105+
//
106+
// One way to implement this is to evict the job with the highest number of
107+
// failures (exceeding a configurable threshold, in addition to MaxFailures).
108+
// This way, we can easily remove the job least likely to succeed.
109+
// However, this needs to be handled explicitly in UpdateSchedule; at this
110+
// point, we can only identify candidates for eviction.
95111
func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.CompactionJobState {
96-
// TODO(kolesnikovae): Job queue size limit.
112+
if limit := p.scheduler.config.MaxQueueSize; limit > 0 {
113+
if size := uint64(p.addedJobs + p.scheduler.queue.size()); size >= limit {
114+
return nil
115+
}
116+
}
97117
state := &raft_log.CompactionJobState{
98118
Name: plan.Name,
99119
CompactionLevel: plan.CompactionLevel,
@@ -102,13 +122,19 @@ func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.Compaction
102122
Token: p.token,
103123
}
104124
p.updates[state.Name] = state
125+
p.addedJobs++
105126
return state
106127
}
107128

108129
func (p *schedule) nextAssignment() *raft_log.CompactionJobState {
109130
// We don't need to check the job ownership here: the worker asks
110131
// for a job assigment (new ownership).
111132
for p.level < len(p.scheduler.queue.levels) {
133+
// We evict the job from our copy of the queue: each job is only
134+
// accessible once. When we reach the bottom of the queue (the first
135+
// failed job, or the last job in the queue), we move to the next
136+
// level. Note that we check all in-progress jobs if there are not
137+
// enough unassigned jobs in the queue.
112138
pq := p.queueLevelCopy(p.level)
113139
if pq.Len() == 0 {
114140
p.level++
@@ -130,15 +156,17 @@ func (p *schedule) nextAssignment() *raft_log.CompactionJobState {
130156
return p.assignJob(job)
131157

132158
case metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS:
133-
if p.shouldReassign(job) {
159+
if p.isFailed(job) {
160+
// We reached the bottom of the queue: only failed jobs left.
161+
p.level++
162+
continue
163+
}
164+
if p.isAbandoned(job) {
134165
state := p.assignJob(job)
135166
state.Failures++
136167
return state
137168
}
138169
}
139-
140-
// If no jobs can be assigned at this level.
141-
p.level++
142170
}
143171

144172
return nil
@@ -156,11 +184,13 @@ func (p *schedule) assignJob(e *jobEntry) *raft_log.CompactionJobState {
156184
return job
157185
}
158186

159-
func (p *schedule) shouldReassign(job *jobEntry) bool {
160-
abandoned := p.now.UnixNano() > job.LeaseExpiresAt
187+
func (p *schedule) isAbandoned(job *jobEntry) bool {
188+
return p.now.UnixNano() > job.LeaseExpiresAt
189+
}
190+
191+
func (p *schedule) isFailed(job *jobEntry) bool {
161192
limit := p.scheduler.config.MaxFailures
162-
faulty := limit > 0 && uint64(job.Failures) >= limit
163-
return abandoned && !faulty
193+
return limit > 0 && uint64(job.Failures) >= limit
164194
}
165195

166196
// The queue must not be modified by the assigner. Therefore, we're copying the

pkg/experiment/metastore/compaction/scheduler/schedule_test.go

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,12 @@ func TestSchedule_Assign(t *testing.T) {
123123
}
124124

125125
scheduler := NewScheduler(config, store, nil)
126+
// The job plans are accessed when it's getting assigned.
127+
// Their content is not important for the test.
126128
plans := []*raft_log.CompactionJobPlan{
127-
{Name: "2", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"d", "e", "f"}},
128-
{Name: "3", Tenant: "A", Shard: 1, CompactionLevel: 0, SourceBlocks: []string{"j", "h", "i"}},
129-
{Name: "1", Tenant: "A", Shard: 1, CompactionLevel: 1, SourceBlocks: []string{"a", "b", "c"}},
129+
{Name: "2", CompactionLevel: 0},
130+
{Name: "3", CompactionLevel: 0},
131+
{Name: "1", CompactionLevel: 1},
130132
}
131133
for _, p := range plans {
132134
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
@@ -169,37 +171,55 @@ func TestSchedule_ReAssign(t *testing.T) {
169171

170172
scheduler := NewScheduler(config, store, nil)
171173
plans := []*raft_log.CompactionJobPlan{
172-
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
173-
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
174-
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
174+
{Name: "1"},
175+
{Name: "2"},
176+
{Name: "3"},
177+
{Name: "4"},
178+
{Name: "5"},
179+
{Name: "6"},
175180
}
176181
for _, p := range plans {
177182
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
178183
}
179184

185+
now := int64(5)
180186
states := []*raft_log.CompactionJobState{
181-
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
182-
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
183-
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0},
187+
// Jobs with expired leases (now > LeaseExpiresAt).
188+
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
189+
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
190+
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1},
191+
// This job can't be reassigned as its lease is still valid.
192+
{Name: "4", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 10},
193+
// The job has already failed in the past.
194+
{Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 1},
195+
// The job has already failed in the past and exceeded the error threshold.
196+
{Name: "6", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 1, Failures: 3},
184197
}
185198
for _, s := range states {
186199
scheduler.queue.put(s)
187200
}
188201

202+
lease := now + int64(config.LeaseDuration)
203+
expected := []*raft_log.CompactionJobState{
204+
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
205+
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
206+
{Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 1},
207+
{Name: "5", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 2, LeaseExpiresAt: lease, Failures: 2},
208+
}
209+
189210
test.AssertIdempotent(t, func(t *testing.T) {
190-
s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, 1)})
191-
for j := range plans {
211+
s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, now)})
212+
assigned := make([]*raft_log.CompactionJobState, 0, len(expected))
213+
for {
192214
update, err := s.AssignJob()
193215
require.NoError(t, err)
194-
assert.Equal(t, plans[j], update.Plan)
195-
assert.Equal(t, metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, update.State.Status)
196-
assert.Equal(t, int64(config.LeaseDuration)+1, update.State.LeaseExpiresAt)
197-
assert.Equal(t, uint64(2), update.State.Token)
216+
if update == nil {
217+
break
218+
}
219+
assigned = append(assigned, update.State)
198220
}
199221

200-
update, err := s.AssignJob()
201-
require.NoError(t, err)
202-
assert.Nil(t, update)
222+
assert.Equal(t, expected, assigned)
203223
})
204224
}
205225

@@ -212,9 +232,9 @@ func TestSchedule_UpdateAssign(t *testing.T) {
212232

213233
scheduler := NewScheduler(config, store, nil)
214234
plans := []*raft_log.CompactionJobPlan{
215-
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
216-
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
217-
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
235+
{Name: "1"},
236+
{Name: "2"},
237+
{Name: "3"},
218238
}
219239
for _, p := range plans {
220240
store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil)
@@ -301,9 +321,9 @@ func TestSchedule_Add(t *testing.T) {
301321

302322
scheduler := NewScheduler(config, store, nil)
303323
plans := []*raft_log.CompactionJobPlan{
304-
{Name: "1", Tenant: "A", Shard: 1, SourceBlocks: []string{"a", "b", "c"}},
305-
{Name: "2", Tenant: "A", Shard: 1, SourceBlocks: []string{"d", "e", "f"}},
306-
{Name: "3", Tenant: "A", Shard: 1, SourceBlocks: []string{"j", "h", "i"}},
324+
{Name: "1"},
325+
{Name: "2"},
326+
{Name: "3"},
307327
}
308328

309329
states := []*raft_log.CompactionJobState{
@@ -319,3 +339,31 @@ func TestSchedule_Add(t *testing.T) {
319339
}
320340
})
321341
}
342+
343+
func TestSchedule_QueueSizeLimit(t *testing.T) {
344+
store := new(mockscheduler.MockJobStore)
345+
config := Config{
346+
MaxFailures: 3,
347+
LeaseDuration: 10 * time.Second,
348+
MaxQueueSize: 2,
349+
}
350+
351+
scheduler := NewScheduler(config, store, nil)
352+
plans := []*raft_log.CompactionJobPlan{
353+
{Name: "1"},
354+
{Name: "2"},
355+
{Name: "3"},
356+
}
357+
358+
states := []*raft_log.CompactionJobState{
359+
{Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1},
360+
{Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_UNSPECIFIED, AddedAt: 1, Token: 1},
361+
}
362+
363+
test.AssertIdempotent(t, func(t *testing.T) {
364+
s := scheduler.NewSchedule(nil, &raft.Log{Index: 1, AppendedAt: time.Unix(0, 1)})
365+
assert.Equal(t, states[0], s.AddJob(plans[0]))
366+
assert.Equal(t, states[1], s.AddJob(plans[1]))
367+
assert.Nil(t, s.AddJob(plans[2]))
368+
})
369+
}

pkg/experiment/metastore/compaction/scheduler/scheduler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ type JobStore interface {
4545
type Config struct {
4646
MaxFailures uint64 `yaml:"compaction_max_failures" doc:""`
4747
LeaseDuration time.Duration `yaml:"compaction_job_lease_duration" doc:""`
48+
MaxQueueSize uint64 `yaml:"compaction_max_job_queue_size" doc:""`
4849
}
4950

5051
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
5152
f.Uint64Var(&c.MaxFailures, prefix+"compaction-max-failures", 3, "")
5253
f.DurationVar(&c.LeaseDuration, prefix+"compaction-job-lease-duration", 15*time.Second, "")
54+
f.Uint64Var(&c.MaxQueueSize, prefix+"compaction-max-job-queue-size", 2000, "")
5355
}
5456

5557
type Scheduler struct {

pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ func (q *schedulerQueue) delete(name string) *raft_log.CompactionJobState {
4545
return nil
4646
}
4747

48+
func (q *schedulerQueue) size() int {
49+
var size int
50+
for _, level := range q.levels {
51+
if level != nil {
52+
size += level.jobs.Len()
53+
}
54+
}
55+
return size
56+
}
57+
4858
func (q *schedulerQueue) level(x uint32) *jobQueue {
4959
s := x + 1 // Levels are 0-based.
5060
if s >= uint32(len(q.levels)) {
@@ -136,7 +146,9 @@ func compareJobs(a, b *jobEntry) int {
136146
if a.Status != b.Status {
137147
return int(a.Status) - int(b.Status)
138148
}
139-
// Faulty jobs should wait.
149+
// Faulty jobs should wait. Our aim is to put them at the
150+
// end of the queue, after all the jobs we may consider
151+
// for assigment.
140152
if a.Failures != b.Failures {
141153
return int(a.Failures) - int(b.Failures)
142154
}

pkg/experiment/metastore/compaction_raft_handler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,14 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate(
117117
// No more jobs to create.
118118
break
119119
}
120+
state := scheduler.AddJob(plan)
121+
if state == nil {
122+
// Scheduler declined the job. The only case when this may happen
123+
// is when the scheduler queue is full.
124+
break
125+
}
120126
p.NewJobs = append(p.NewJobs, &raft_log.NewCompactionJob{
121-
State: scheduler.AddJob(plan),
127+
State: state,
122128
Plan: plan,
123129
})
124130
}

0 commit comments

Comments
 (0)