Skip to content

Commit

Permalink
Block builder scheduler startup: learn state from worker updates (#9897)
Browse files Browse the repository at this point in the history
* Add a startup period where we're learning the state of the world. This includes:
   * committed offsets learned from Kafka
   * any job progress/completion updates we've learned from workers given `block-builder-scheduler.startup-observe-time`
* When the startup period if over, we compute the starting state from the observations and begin normal operation.
* Add a job assignment `epoch` mechanism to break ties when multiple workers report updates about the same job. Clients include the epoch in any job-related communications.
  • Loading branch information
seizethedave authored Nov 21, 2024
1 parent 92ebc23 commit 4f36235
Show file tree
Hide file tree
Showing 4 changed files with 576 additions and 140 deletions.
110 changes: 87 additions & 23 deletions pkg/blockbuilder/scheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,92 @@ import (
var (
errNoJobAvailable = errors.New("no job available")
errJobNotFound = errors.New("job not found")
errJobNotAssigned = errors.New("job not assigned to worker")
errJobNotAssigned = errors.New("job not assigned to given worker")
errBadEpoch = errors.New("bad epoch")
)

type jobQueue struct {
leaseTime time.Duration
logger log.Logger
leaseExpiry time.Duration
logger log.Logger

mu sync.Mutex
epoch int64
jobs map[string]*job
unassigned jobHeap
}

func newJobQueue(leaseTime time.Duration, logger log.Logger) *jobQueue {
func newJobQueue(leaseExpiry time.Duration, logger log.Logger) *jobQueue {
return &jobQueue{
leaseTime: leaseTime,
logger: logger,
leaseExpiry: leaseExpiry,
logger: logger,

jobs: make(map[string]*job),
}
}

// assign assigns the highest-priority unassigned job to the given worker.
func (s *jobQueue) assign(workerID string) (string, jobSpec, error) {
func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) {
if workerID == "" {
return "", jobSpec{}, errors.New("workerID cannot not be empty")
return jobKey{}, jobSpec{}, errors.New("workerID cannot be empty")
}

s.mu.Lock()
defer s.mu.Unlock()

if s.unassigned.Len() == 0 {
return "", jobSpec{}, errNoJobAvailable
return jobKey{}, jobSpec{}, errNoJobAvailable
}

j := heap.Pop(&s.unassigned).(*job)
j.key.epoch = s.epoch
s.epoch++
j.assignee = workerID
j.leaseExpiry = time.Now().Add(s.leaseTime)
return j.id, j.spec, nil
j.leaseExpiry = time.Now().Add(s.leaseExpiry)
return j.key, j.spec, nil
}

// importJob imports a job with the given ID and spec into the jobQueue. This is
// meant to be used during recovery, when we're reconstructing the jobQueue from
// worker updates.
func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
return errors.New("workerID cannot be empty")
}

s.mu.Lock()
defer s.mu.Unlock()

// When we start assigning new jobs, the epochs need to be compatible with
// these "imported" jobs.
s.epoch = max(s.epoch, key.epoch+1)

j, ok := s.jobs[key.id]
if ok {
if key.epoch < j.key.epoch {
return errBadEpoch
} else if key.epoch == j.key.epoch {
if j.assignee != workerID {
return errJobNotAssigned
}
} else {
// Otherwise, this caller is the new authority, so we accept the update.
j.assignee = workerID
j.key = key
j.spec = spec
}
} else {
s.jobs[key.id] = &job{
key: key,
assignee: workerID,
leaseExpiry: time.Now().Add(s.leaseExpiry),
failCount: 0,
spec: spec,
}
}
return nil
}

// addOrUpdate adds a new job or updates an existing job with the given spec.
Expand All @@ -60,18 +108,21 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {
defer s.mu.Unlock()

if j, ok := s.jobs[id]; ok {
// We can only update an unassigned job.
if j.assignee == "" {
// We can only update an unassigned job.
j.spec = spec
}
return
}

// Otherwise, add a new job.
j := &job{
id: id,
key: jobKey{
id: id,
epoch: 0,
},
assignee: "",
leaseExpiry: time.Now().Add(s.leaseTime),
leaseExpiry: time.Now().Add(s.leaseExpiry),
failCount: 0,
spec: spec,
}
Expand All @@ -81,8 +132,8 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {

// renewLease renews the lease of the job with the given ID for the given
// worker.
func (s *jobQueue) renewLease(jobID, workerID string) error {
if jobID == "" {
func (s *jobQueue) renewLease(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
Expand All @@ -92,22 +143,25 @@ func (s *jobQueue) renewLease(jobID, workerID string) error {
s.mu.Lock()
defer s.mu.Unlock()

j, ok := s.jobs[jobID]
j, ok := s.jobs[key.id]
if !ok {
return errJobNotFound
}
if j.assignee != workerID {
return errJobNotAssigned
}
if j.key.epoch != key.epoch {
return errBadEpoch
}

j.leaseExpiry = time.Now().Add(s.leaseTime)
j.leaseExpiry = time.Now().Add(s.leaseExpiry)
return nil
}

// completeJob completes the job with the given ID for the given worker,
// removing it from the jobQueue.
func (s *jobQueue) completeJob(jobID, workerID string) error {
if jobID == "" {
func (s *jobQueue) completeJob(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
Expand All @@ -117,15 +171,18 @@ func (s *jobQueue) completeJob(jobID, workerID string) error {
s.mu.Lock()
defer s.mu.Unlock()

j, ok := s.jobs[jobID]
j, ok := s.jobs[key.id]
if !ok {
return errJobNotFound
}
if j.assignee != workerID {
return errJobNotAssigned
}
if j.key.epoch != key.epoch {
return errBadEpoch
}

delete(s.jobs, jobID)
delete(s.jobs, key.id)
return nil
}

Expand All @@ -147,7 +204,7 @@ func (s *jobQueue) clearExpiredLeases() {
}

type job struct {
id string
key jobKey

assignee string
leaseExpiry time.Time
Expand All @@ -157,6 +214,13 @@ type job struct {
spec jobSpec
}

type jobKey struct {
id string
// The assignment epoch. This is used to break ties when multiple workers
// have knowledge of the same job.
epoch int64
}

type jobSpec struct {
topic string
partition int32
Expand Down
86 changes: 56 additions & 30 deletions pkg/blockbuilder/scheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,117 +17,143 @@ import (
func TestAssign(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))

j0id, j0spec, err := s.assign("w0")
require.Empty(t, j0id)
j0, j0spec, err := s.assign("w0")
require.Empty(t, j0.id)
require.Zero(t, j0spec)
require.ErrorIs(t, err, errNoJobAvailable)

s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
j1id, j1spec, err := s.assign("w0")
require.NotEmpty(t, j1id)
j1, j1spec, err := s.assign("w0")
require.NotEmpty(t, j1.id)
require.NotZero(t, j1spec)
require.NoError(t, err)
require.Equal(t, "w0", s.jobs[j1id].assignee)
require.Equal(t, "w0", s.jobs[j1.id].assignee)

j2id, j2spec, err := s.assign("w0")
require.Zero(t, j2id)
j2, j2spec, err := s.assign("w0")
require.Zero(t, j2.id)
require.Zero(t, j2spec)
require.ErrorIs(t, err, errNoJobAvailable)

s.addOrUpdate("job2", jobSpec{topic: "hello2", commitRecTs: time.Now()})
j3id, j3spec, err := s.assign("w0")
require.NotZero(t, j3id)
j3, j3spec, err := s.assign("w0")
require.NotZero(t, j3.id)
require.NotZero(t, j3spec)
require.NoError(t, err)
require.Equal(t, "w0", s.jobs[j3id].assignee)
require.Equal(t, "w0", s.jobs[j3.id].assignee)
}

func TestAssignComplete(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))

{
err := s.completeJob("rando job", "w0")
err := s.completeJob(jobKey{"rando job", 965}, "w0")
require.ErrorIs(t, err, errJobNotFound)
}

s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
jid, jspec, err := s.assign("w0")
require.NotZero(t, jid)
jk, jspec, err := s.assign("w0")
require.NotZero(t, jk)
require.NotZero(t, jspec)
require.NoError(t, err)
j, ok := s.jobs[jid]
j, ok := s.jobs[jk.id]
require.True(t, ok)
require.Equal(t, "w0", j.assignee)

{
err := s.completeJob("rando job", "w0")
err := s.completeJob(jobKey{"rando job", 64}, "w0")
require.ErrorIs(t, err, errJobNotFound)
}
{
err := s.completeJob(j.id, "rando worker")
err := s.completeJob(jk, "rando worker")
require.ErrorIs(t, err, errJobNotAssigned)
}
{
err := s.completeJob(jobKey{jk.id, 9999}, "w0")
require.ErrorIs(t, err, errBadEpoch)
}

{
err := s.completeJob(j.id, "w0")
err := s.completeJob(jk, "w0")
require.NoError(t, err)

err2 := s.completeJob(j.id, "w0")
err2 := s.completeJob(jk, "w0")
require.ErrorIs(t, err2, errJobNotFound)
}

j2id, j2spec, err := s.assign("w0")
require.Zero(t, j2id, "should be no job available")
j2k, j2spec, err := s.assign("w0")
require.Zero(t, j2k.id, "should be no job available")
require.Zero(t, j2spec, "should be no job available")
require.ErrorIs(t, err, errNoJobAvailable)
}

func TestLease(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))
s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
jid, jspec, err := s.assign("w0")
require.NotZero(t, jid)
jk, jspec, err := s.assign("w0")
require.NotZero(t, jk.id)
require.NotZero(t, jspec)
require.NoError(t, err)

j, ok := s.jobs[jid]
j, ok := s.jobs[jk.id]
require.True(t, ok)
require.Equal(t, "w0", j.assignee)

// Expire the lease.
j.leaseExpiry = time.Now().Add(-1 * time.Minute)
s.clearExpiredLeases()

j2id, j2spec, err := s.assign("w1")
require.NotZero(t, j2id, "should be able to assign a job whose lease was invalidated")
j2k, j2spec, err := s.assign("w1")
require.NotZero(t, j2k.id, "should be able to assign a job whose lease was invalidated")
require.NotZero(t, j2spec, "should be able to assign a job whose lease was invalidated")
require.Equal(t, j.spec, j2spec)
require.NoError(t, err)
j2, ok := s.jobs[j2id]
j2, ok := s.jobs[j2k.id]
require.True(t, ok)
require.Equal(t, "w1", j2.assignee)

t.Run("renewals", func(t *testing.T) {
prevExpiry := j2.leaseExpiry
e1 := s.renewLease(j2.id, "w1")
e1 := s.renewLease(j2k, "w1")
require.NoError(t, e1)
require.True(t, j2.leaseExpiry.After(prevExpiry))

e2 := s.renewLease(j2.id, "w0")
e2 := s.renewLease(j2k, "w0")
require.ErrorIs(t, e2, errJobNotAssigned)

e3 := s.renewLease("job_404", "w0")
e3 := s.renewLease(jobKey{"job_404", 1}, "w0")
require.ErrorIs(t, e3, errJobNotFound)
})
}

// TestImportJob tests the importJob method - the method that is called to learn
// about jobs in-flight from a previous scheduler instance.
func TestImportJob(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))
spec := jobSpec{commitRecTs: time.Now().Add(-1 * time.Hour)}
require.NoError(t, s.importJob(jobKey{"job1", 122}, "w0", spec))
require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec))
require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 122}, "w0", spec))
require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 60}, "w98", spec))
require.ErrorIs(t, errJobNotAssigned, s.importJob(jobKey{"job1", 123}, "w512", spec))
require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec))

j, ok := s.jobs["job1"]
require.True(t, ok)
require.Equal(t, jobKey{"job1", 123}, j.key)
require.Equal(t, spec, j.spec)
require.Equal(t, "w2", j.assignee)
}

func TestMinHeap(t *testing.T) {
n := 517
jobs := make([]*job, n)
order := make([]int, n)
for i := 0; i < n; i++ {
jobs[i] = &job{
id: fmt.Sprintf("job%d", i),
key: jobKey{
id: fmt.Sprintf("job%d", i),
epoch: 0,
},
spec: jobSpec{topic: "hello", commitRecTs: time.Unix(int64(i), 0)},
}
order[i] = i
Expand Down
Loading

0 comments on commit 4f36235

Please sign in to comment.