Skip to content

Commit

Permalink
Added lock to the progress bar update function, and made the worker p…
Browse files Browse the repository at this point in the history
…ool call the callback both when jobs are submitted and when jobs are finished.
  • Loading branch information
Martin Bruse committed Apr 19, 2024
1 parent b6f13f2 commit 4d3e9e9
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go/dataset/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,8 @@ func (d *Dataset) Calculate(metrics []MetricRunner, updateProgress func(submitte
}

pool := &worker.Pool[any]{
Workers: maxWorkers,
OnComplete: updateProgress,
Workers: maxWorkers,
OnChange: updateProgress,
}

for _, loopRef := range d.References {
Expand Down
4 changes: 2 additions & 2 deletions go/dataset/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s Setup) getOptimizeComparisons() (map[string]comparisonSlice, error) {
defer fmt.Println()
pool := &worker.Pool[prepareResult]{
Workers: safeInt(s.MaxWorkers),
OnComplete: func(submitted, completed int) {
OnChange: func(submitted, completed int) {
bar.Update(submitted, completed)
},
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (s Setup) optimize() error {
defer fmt.Println()
pool := &worker.Pool[compareResult]{
Workers: safeInt(s.MaxWorkers),
OnComplete: func(submitted, completed int) {
OnChange: func(submitted, completed int) {
bar.Update(submitted, completed)
},
}
Expand Down
5 changes: 5 additions & 0 deletions go/progress/bar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"fmt"
"math"
"sync"
"syscall"
"time"
"unsafe"
Expand Down Expand Up @@ -59,6 +60,7 @@ type Bar struct {
total int
emaSpeed float64
lastRender time.Time
lock sync.Mutex
}

// AddCompleted adds completed tasks to the bar and renders it.
Expand All @@ -68,6 +70,9 @@ func (b *Bar) AddCompleted(num int) {

// Update update completed and total tasks to the bar and updates it.
func (b *Bar) Update(total, completed int) {
b.lock.Lock()
defer b.lock.Unlock()

prefix := fmt.Sprintf("%s, %d/%d ", b.name, completed, total)

now := time.Now()
Expand Down
11 changes: 7 additions & 4 deletions go/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

// Pool is a pool of workers.
type Pool[T any] struct {
Workers int
OnComplete func(submitted, completed int)
Workers int
OnChange func(submitted, completed int)

startOnce sync.Once

Expand Down Expand Up @@ -62,8 +62,8 @@ func (p *Pool[T]) init() {
}
p.jobsWaitGroup.Done()
atomic.AddUint32(&p.completedJobs, 1)
if p.OnComplete != nil {
p.OnComplete(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs)))
if p.OnChange != nil {
p.OnChange(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs)))
}
}
}()
Expand All @@ -77,6 +77,9 @@ func (p *Pool[T]) Submit(job func(func(T)) error) error {

p.jobsWaitGroup.Add(1)
atomic.AddUint32(&p.submittedJobs, 1)
if p.OnChange != nil {
p.OnChange(int(atomic.LoadUint32(&p.submittedJobs)), int(atomic.LoadUint32(&p.completedJobs)))
}

go func() {
p.jobs <- job
Expand Down

0 comments on commit 4d3e9e9

Please sign in to comment.