Skip to content

Commit

Permalink
Moved worker ID to worker context
Browse files Browse the repository at this point in the history
This lets dispatch handlers get to the ID while keeping the model somewhat lean (consolidating the root-level id into context)
  • Loading branch information
markdicksonjr committed Feb 5, 2020
1 parent 04b8156 commit dc6be6e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (d *Dispatcher) GetUtilization() Utilization {
for _, v := range d.workers {
results = append(results, WorkerUtilization{
PercentUtilization: v.GetPercentUtilization(),
Id: v.id,
Id: v.workerContext.Id,
})
}

Expand Down
30 changes: 15 additions & 15 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@ type Job struct {
// provides a mechanism for shared data and context across calls to the work function
type Context struct {
Data interface{}
Id int
}

func NewWorker(id int, workerPool chan chan Job, workFn WorkFunction, jobErrorFn JobErrorFunction, logFn LogFunction) Worker {
return Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
workFn: workFn,
jobErrorFn: jobErrorFn,
logFn: logFn,
workerContext: Context{},
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
workFn: workFn,
jobErrorFn: jobErrorFn,
logFn: logFn,
workerContext: Context{
Id: id,
},
}
}

type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
Expand Down Expand Up @@ -80,22 +81,22 @@ func (w *Worker) start() {
case job := <-w.jobQueue:
workFnStart := time.Now()
atomic.AddInt32(&w.runningCount, 1)
_, _ = w.log("worker%d: started %s\n", w.id, job.Name)
_, _ = w.log("worker%d: started %s\n", w.workerContext.Id, job.Name)
err := w.workFn(job, &w.workerContext)
atomic.AddInt32(&w.runningCount, -1)
atomic.AddInt64(&w.totalProcessingTimeNs, time.Now().Sub(workFnStart).Nanoseconds())

if err != nil {
_, _ = w.log("worker%d: had error in %s: %s!\n", w.id, job.Name, err.Error())
_, _ = w.log("worker%d: had error in %s: %s!\n", w.workerContext.Id, job.Name, err.Error())
w.error(job, &w.workerContext, err)
}

// nil out data to clue GC
job.Context = nil

_, _ = w.log("worker%d: completed %s!\n", w.id, job.Name)
_, _ = w.log("worker%d: completed %s!\n", w.workerContext.Id, job.Name)
case <-w.quitChan:
_, _ = w.log("worker%d stopping\n", w.id)
_, _ = w.log("worker%d stopping\n", w.workerContext.Id)
return
}
}
Expand All @@ -108,7 +109,6 @@ func (w Worker) stop() {
}()
}


func (w Worker) log(format string, a ...interface{}) (n int, err error) {
if w.logFn != nil {
return w.logFn(format, a)
Expand All @@ -121,4 +121,4 @@ func (w Worker) error(job Job, workerContext *Context, err error) {
if w.jobErrorFn != nil {
w.jobErrorFn(job, workerContext, err)
}
}
}

0 comments on commit dc6be6e

Please sign in to comment.