Skip to content

Commit

Permalink
Swtiched batch position to atomic value
Browse files Browse the repository at this point in the history
  • Loading branch information
markdicksonjr committed Nov 13, 2019
1 parent 22becd6 commit 240c4e1
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package work
import (
"errors"
"sync"
"sync/atomic"
)

type Batch struct {
batchPosition int
batchPosition atomic.Value
batchSize int
itemsToSave []interface{}
pushHandler BatchHandler
Expand Down Expand Up @@ -46,7 +47,7 @@ type BytesSource interface {
type BatchHandler func([]interface{}) error

func (b *Batch) Init(batchSize int, pushHandler BatchHandler, flushHandler ...BatchHandler) {
b.batchPosition = 0
b.batchPosition.Store(0)

// grab the batch size - default to 100
b.batchSize = batchSize
Expand Down Expand Up @@ -82,19 +83,20 @@ func (b *Batch) Push(record interface{}) error {
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
}

batchPosition, _ := b.batchPosition.Load().(int)

// if our batch is full
if b.batchPosition >= b.batchSize {
if batchPosition >= b.batchSize {
batch := b.itemsToSave

// allocate a new buffer, put the inbound record as the first item
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
b.itemsToSave[0] = record
b.batchPosition = 1
b.batchPosition.Store(1)

// release the lock
b.mutex.Unlock()

// TODO: review impact of making this call from a goroutine - definitely faster, but would bugs arise from timing changes?
if err := b.pushHandler(batch); err != nil {
return err
}
Expand All @@ -103,8 +105,9 @@ func (b *Batch) Push(record interface{}) error {
} else {

// our batch is not full - if the batch size
b.itemsToSave[b.batchPosition] = record
b.batchPosition++
b.itemsToSave[batchPosition] = record
batchPosition++
b.batchPosition.Store(batchPosition)
b.mutex.Unlock()
}

Expand All @@ -113,7 +116,7 @@ func (b *Batch) Push(record interface{}) error {

func (b *Batch) GetPosition() int {
b.mutex.Lock()
pos := b.batchPosition
pos, _ := b.batchPosition.Load().(int)
b.mutex.Unlock()
return pos
}
Expand All @@ -125,12 +128,13 @@ func (b *Batch) Flush() error {

// lock around batch processing
b.mutex.Lock()
if b.batchPosition > 0 {
batchPosition, _ := b.batchPosition.Load().(int)
if batchPosition > 0 {

// snag the rest of the buffer as a slice, reset buffer
subSlice := (b.itemsToSave)[0:b.batchPosition]
subSlice := (b.itemsToSave)[0:batchPosition]
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
b.batchPosition = 0
b.batchPosition.Store(0)

// we've finished batch processing, unlock
b.mutex.Unlock()
Expand Down

0 comments on commit 240c4e1

Please sign in to comment.