Skip to content

Commit

Permalink
Enqueue offers wait or drop option
Browse files Browse the repository at this point in the history
  • Loading branch information
markdicksonjr committed May 26, 2019
1 parent dbebe79 commit 578bce6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
21 changes: 18 additions & 3 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,15 @@ func (d *Dispatcher) RunCount() int32 {
}

// allows users to enqueue jobs into the work queue
func (d *Dispatcher) EnqueueJob(job Job) {
func (d *Dispatcher) EnqueueJobAllowWait(job Job) {
if blocked := d.BlockWhileQueueFull(); blocked {
_, _ = d.logFn("blocked during enqueue because queue full")
}
d.jobQueue <- job
}

func (d *Dispatcher) EnqueueJobAllowDrop(job Job) {
// TODO COUNT DROPS?
d.jobQueue <- job
}

Expand Down Expand Up @@ -149,12 +157,15 @@ func (d *Dispatcher) WaitUntilIdle() {
// pulls a job from the job queue and adds it to the worker's job queue - a worker will grab it in the worker logic
func (d *Dispatcher) dispatch() {
for {

// if there are no workers ready to receive the job, let the job queue fill up
if int(d.RunCount()) == cap(d.workerPool) {
if !d.IsAnyWorkerIdle() {
time.Sleep(30 * time.Millisecond)
continue
}

_, _ = d.logFn("during round-robin enqueueing: %d running vs %d total\n", int(d.RunCount()), cap(d.workerPool))

select {
case job := <-d.jobQueue:
go func() {
Expand Down Expand Up @@ -186,3 +197,7 @@ func (d *Dispatcher) sample() {
}
}()
}

func (d *Dispatcher) IsAnyWorkerIdle() bool {
return int(d.RunCount()) < cap(d.workerPool)
}
4 changes: 2 additions & 2 deletions xml/batch/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func (a *Reader) Init(
a.jobName = jobName
a.batch = &workers.Batch{}
a.batch.Init(batchSize, func(i []interface{}) error {
a.dispatcher.EnqueueJob(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, false)})
a.dispatcher.EnqueueJobAllowWait(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, false)})
return nil
}, func(i []interface{}) error {
a.dispatcher.EnqueueJob(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, true)})
a.dispatcher.EnqueueJobAllowWait(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, true)})
return nil
})

Expand Down

0 comments on commit 578bce6

Please sign in to comment.