Skip to content

Commit

Permalink
Ran dispatcher at init time
Browse files Browse the repository at this point in the history
I can't see why anyone would wait
  • Loading branch information
markdicksonjr committed Jun 25, 2019
1 parent 483c335 commit 80a7452
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
# Work (for Go)

A worker pool and batch processing library. Ships with a few utilities for common use-cases (e.g. XML processing,
A worker pool and batch processing library. Worker pools provide a simple mechanism for
parallel processing. The batch utility provides a mechanism for collecting
individual records until a size threshold is hit (or it is flushed). Once the batch
is cleared, some function is applied to the collection.

This library also ships with a few utilities for common use-cases (e.g. XML processing,
splitting inputs, ensuring a single writer at a time for file output).

## Data Processing Patterns

### Scrolled Input, Split Processing, Single Unordered Output

Examples: Process a (large?) chunk of an Elastic Search index to perform some sort of transformation, save transformed
documents to another index

### Scrolled Input, Split Processing, Single Ordered Output

Examples: Process a (large?) chunk of an Elastic Search index in a specific order to perform some sort of transformation,
write results to a file in the correct order.

## Job Queue Usage

```go

// start the dispatcher, using 8 workers and up to 100 queued jobs
// start the dispatcher, using 8 parallel workers and up to 100 queued jobs
maxWorkers := 8
maxJobQueueSize := 100
dispatcher := work.NewDispatcher(maxJobQueueSize, maxWorkers, doWork)
dispatcher.Run()

// do something that loads up the jobs repeatedly (here, we use a for loop)
// for example, until EOF is reached while reading a file
for someCondition {

// do some task to get something to put on the queue
data, isEndOfStream, err := BogusDataSource(...)
...

// put the thing on the queue, wait if the queue is full
dispatcher.EnqueueJobAllowWait(work.Job{Name: "address processing", Context: &data})
dispatcher.EnqueueJobAllowWait(work.Job{Context: &data})
}

// let all jobs finish before proceeding
Expand Down
15 changes: 10 additions & 5 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func NewDispatcher(
maxWorkers int,
workFn WorkFunction,
) *Dispatcher {
return &Dispatcher{
d := &Dispatcher{
idlenessSamplerInterval: 100 * time.Millisecond,
jobQueue: make(chan Job, maxJobQueueSize),
maxWorkers: maxWorkers,
Expand All @@ -20,6 +20,8 @@ func NewDispatcher(
workerLogFn: NoLogFunction,
waitLogFn: NoLogFunction,
}
d.run()
return d
}

type Dispatcher struct {
Expand All @@ -40,12 +42,15 @@ type Dispatcher struct {
idlenessIntervals int64
}

// start the dispatcher
// note that this will in no way block the app from proceeding
func (d *Dispatcher) Run() {
func (d *Dispatcher) run() {
if len(d.workers) == 0 {
return
}

d.workers = make([]*Worker, d.maxWorkers)
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(i+1, d.workerPool, d.workFn, d.jobErrorFn, d.workerLogFn)
d.workers = append(d.workers, &worker)
d.workers[i] = &worker
worker.start()
}

Expand Down

0 comments on commit 80a7452

Please sign in to comment.