From 80a745208382cc31f501f935ac66a3c4d1917e9e Mon Sep 17 00:00:00 2001 From: Mark Dickson Jr Date: Tue, 25 Jun 2019 18:12:56 -0500 Subject: [PATCH] Ran dispatcher at init time I can't see why anyone would wait --- README.md | 26 +++++++++++++++++++++----- dispatcher.go | 15 ++++++++++----- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index b5d1b4a..c133bc3 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,43 @@ # Work (for Go) -A worker pool and batch processing library. Ships with a few utilities for common use-cases (e.g. XML processing, +A worker pool and batch processing library. Worker pools provide a simple mechanism for +parallel processing. The batch utility provides a mechanism for collecting +individual records until a size threshold is hit (or it is flushed). Once the batch +is cleared, some function is applied to the collection. + +This library also ships with a few utilities for common use-cases (e.g. XML processing, splitting inputs, ensuring a single writer at a time for file output). +## Data Processing Patterns + +### Scrolled Input, Split Processing, Single Unordered Output + +Examples: Process a (large?) chunk of an Elastic Search index to perform some sort of transformation, save transformed +documents to another index + +### Scrolled Input, Split Processing, Single Ordered Output + +Examples: Process a (large?) chunk of an Elastic Search index in a specific order to perform some sort of transformation, +write results to a file in the correct order. + ## Job Queue Usage ```go -// start the dispatcher, using 8 workers and up to 100 queued jobs +// start the dispatcher, using 8 parallel workers and up to 100 queued jobs maxWorkers := 8 maxJobQueueSize := 100 dispatcher := work.NewDispatcher(maxJobQueueSize, maxWorkers, doWork) -dispatcher.Run() // do something that loads up the jobs repeatedly (here, we use a for loop) // for example, until EOF is reached while reading a file for someCondition { // do some task to get something to put on the queue - data, isEndOfStream, err := BogusDataSource(...) + ... // put the thing on the queue, wait if the queue is full - dispatcher.EnqueueJobAllowWait(work.Job{Name: "address processing", Context: &data}) + dispatcher.EnqueueJobAllowWait(work.Job{Context: &data}) } // let all jobs finish before proceeding diff --git a/dispatcher.go b/dispatcher.go index b59e510..f775965 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -10,7 +10,7 @@ func NewDispatcher( maxWorkers int, workFn WorkFunction, ) *Dispatcher { - return &Dispatcher{ + d := &Dispatcher{ idlenessSamplerInterval: 100 * time.Millisecond, jobQueue: make(chan Job, maxJobQueueSize), maxWorkers: maxWorkers, @@ -20,6 +20,8 @@ func NewDispatcher( workerLogFn: NoLogFunction, waitLogFn: NoLogFunction, } + d.run() + return d } type Dispatcher struct { @@ -40,12 +42,15 @@ type Dispatcher struct { idlenessIntervals int64 } -// start the dispatcher -// note that this will in no way block the app from proceeding -func (d *Dispatcher) Run() { +func (d *Dispatcher) run() { + if len(d.workers) == 0 { + return + } + + d.workers = make([]*Worker, d.maxWorkers) for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(i+1, d.workerPool, d.workFn, d.jobErrorFn, d.workerLogFn) - d.workers = append(d.workers, &worker) + d.workers[i] = &worker worker.start() }