From 578bce672bb0ba9efa5a093c3b5122ea0d2c4253 Mon Sep 17 00:00:00 2001 From: Mark Dickson Jr Date: Sun, 26 May 2019 13:10:45 -0400 Subject: [PATCH] Enqueue offers wait or drop option --- dispatcher.go | 21 ++++++++++++++++++--- xml/batch/reader.go | 4 ++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/dispatcher.go b/dispatcher.go index 0c5e0ad..5ef7f12 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -69,7 +69,15 @@ func (d *Dispatcher) RunCount() int32 { } // allows users to enqueue jobs into the work queue -func (d *Dispatcher) EnqueueJob(job Job) { +func (d *Dispatcher) EnqueueJobAllowWait(job Job) { + if blocked := d.BlockWhileQueueFull(); blocked { + _, _ = d.logFn("blocked during enqueue because queue full") + } + d.jobQueue <- job +} + +func (d *Dispatcher) EnqueueJobAllowDrop(job Job) { + // TODO COUNT DROPS? d.jobQueue <- job } @@ -149,12 +157,15 @@ func (d *Dispatcher) WaitUntilIdle() { // pulls a job from the job queue and adds it to the worker's job queue - a worker will grab it in the worker logic func (d *Dispatcher) dispatch() { for { - + // if there are no workers ready to receive the job, let the job queue fill up - if int(d.RunCount()) == cap(d.workerPool) { + if !d.IsAnyWorkerIdle() { + time.Sleep(30 * time.Millisecond) continue } + _, _ = d.logFn("during round-robin enqueueing: %d running vs %d total\n", int(d.RunCount()), cap(d.workerPool)) + select { case job := <-d.jobQueue: go func() { @@ -186,3 +197,7 @@ func (d *Dispatcher) sample() { } }() } + +func (d *Dispatcher) IsAnyWorkerIdle() bool { + return int(d.RunCount()) < cap(d.workerPool) +} diff --git a/xml/batch/reader.go b/xml/batch/reader.go index c229fdd..8d1a05e 100644 --- a/xml/batch/reader.go +++ b/xml/batch/reader.go @@ -30,10 +30,10 @@ func (a *Reader) Init( a.jobName = jobName a.batch = &workers.Batch{} a.batch.Init(batchSize, func(i []interface{}) error { - a.dispatcher.EnqueueJob(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, false)}) + a.dispatcher.EnqueueJobAllowWait(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, false)}) return nil }, func(i []interface{}) error { - a.dispatcher.EnqueueJob(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, true)}) + a.dispatcher.EnqueueJobAllowWait(workers.Job{Name: a.jobName, Context: workersXml.RecordArrayFromInterfaceArray(i, true)}) return nil })