From 4d3e9e986e837e22ebe910f50508ab4b2098386f Mon Sep 17 00:00:00 2001 From: Martin Bruse Date: Fri, 19 Apr 2024 10:54:16 +0000 Subject: [PATCH] Added lock to the progress bar update function, and made the worker pool call the callback both when jobs are submitted and when jobs are finished. --- go/dataset/dataset.go | 4 ++-- go/dataset/runner/runner.go | 4 ++-- go/progress/bar.go | 5 +++++ go/worker/pool.go | 11 +++++++---- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/go/dataset/dataset.go b/go/dataset/dataset.go index 671201b..244280b 100644 --- a/go/dataset/dataset.go +++ b/go/dataset/dataset.go @@ -532,8 +532,8 @@ func (d *Dataset) Calculate(metrics []MetricRunner, updateProgress func(submitte } pool := &worker.Pool[any]{ - Workers: maxWorkers, - OnComplete: updateProgress, + Workers: maxWorkers, + OnChange: updateProgress, } for _, loopRef := range d.References { diff --git a/go/dataset/runner/runner.go b/go/dataset/runner/runner.go index 041aa97..3e219d2 100644 --- a/go/dataset/runner/runner.go +++ b/go/dataset/runner/runner.go @@ -160,7 +160,7 @@ func (s Setup) getOptimizeComparisons() (map[string]comparisonSlice, error) { defer fmt.Println() pool := &worker.Pool[prepareResult]{ Workers: safeInt(s.MaxWorkers), - OnComplete: func(submitted, completed int) { + OnChange: func(submitted, completed int) { bar.Update(submitted, completed) }, } @@ -255,7 +255,7 @@ func (s Setup) optimize() error { defer fmt.Println() pool := &worker.Pool[compareResult]{ Workers: safeInt(s.MaxWorkers), - OnComplete: func(submitted, completed int) { + OnChange: func(submitted, completed int) { bar.Update(submitted, completed) }, } diff --git a/go/progress/bar.go b/go/progress/bar.go index 9ffc5ce..eec5098 100644 --- a/go/progress/bar.go +++ b/go/progress/bar.go @@ -19,6 +19,7 @@ import ( "bytes" "fmt" "math" + "sync" "syscall" "time" "unsafe" @@ -59,6 +60,7 @@ type Bar struct { total int emaSpeed float64 lastRender time.Time + lock sync.Mutex } // AddCompleted adds completed tasks to the bar and renders it. @@ -68,6 +70,9 @@ func (b *Bar) AddCompleted(num int) { // Update update completed and total tasks to the bar and updates it. func (b *Bar) Update(total, completed int) { + b.lock.Lock() + defer b.lock.Unlock() + prefix := fmt.Sprintf("%s, %d/%d ", b.name, completed, total) now := time.Now() diff --git a/go/worker/pool.go b/go/worker/pool.go index 40377a5..561dfb5 100644 --- a/go/worker/pool.go +++ b/go/worker/pool.go @@ -23,8 +23,8 @@ import ( // Pool is a pool of workers. type Pool[T any] struct { - Workers int - OnComplete func(submitted, completed int) + Workers int + OnChange func(submitted, completed int) startOnce sync.Once @@ -62,8 +62,8 @@ func (p *Pool[T]) init() { } p.jobsWaitGroup.Done() atomic.AddUint32(&p.completedJobs, 1) - if p.OnComplete != nil { - p.OnComplete(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs))) + if p.OnChange != nil { + p.OnChange(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs))) } } }() @@ -77,6 +77,9 @@ func (p *Pool[T]) Submit(job func(func(T)) error) error { p.jobsWaitGroup.Add(1) atomic.AddUint32(&p.submittedJobs, 1) + if p.OnChange != nil { + p.OnChange(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs))) + } go func() { p.jobs <- job