From 240c4e1985604497e405d3d81f5c7c0f1cb467bb Mon Sep 17 00:00:00 2001 From: Mark Dickson Jr Date: Wed, 13 Nov 2019 12:18:35 -0500 Subject: [PATCH] Swtiched batch position to atomic value --- batch.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/batch.go b/batch.go index ab9ebc8..8efef01 100644 --- a/batch.go +++ b/batch.go @@ -3,10 +3,11 @@ package work import ( "errors" "sync" + "sync/atomic" ) type Batch struct { - batchPosition int + batchPosition atomic.Value batchSize int itemsToSave []interface{} pushHandler BatchHandler @@ -46,7 +47,7 @@ type BytesSource interface { type BatchHandler func([]interface{}) error func (b *Batch) Init(batchSize int, pushHandler BatchHandler, flushHandler ...BatchHandler) { - b.batchPosition = 0 + b.batchPosition.Store(0) // grab the batch size - default to 100 b.batchSize = batchSize @@ -82,19 +83,20 @@ func (b *Batch) Push(record interface{}) error { b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize) } + batchPosition, _ := b.batchPosition.Load().(int) + // if our batch is full - if b.batchPosition >= b.batchSize { + if batchPosition >= b.batchSize { batch := b.itemsToSave // allocate a new buffer, put the inbound record as the first item b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize) b.itemsToSave[0] = record - b.batchPosition = 1 + b.batchPosition.Store(1) // release the lock b.mutex.Unlock() - // TODO: review impact of making this call from a goroutine - definitely faster, but would bugs arise from timing changes? if err := b.pushHandler(batch); err != nil { return err } @@ -103,8 +105,9 @@ func (b *Batch) Push(record interface{}) error { } else { // our batch is not full - if the batch size - b.itemsToSave[b.batchPosition] = record - b.batchPosition++ + b.itemsToSave[batchPosition] = record + batchPosition++ + b.batchPosition.Store(batchPosition) b.mutex.Unlock() } @@ -113,7 +116,7 @@ func (b *Batch) Push(record interface{}) error { func (b *Batch) GetPosition() int { b.mutex.Lock() - pos := b.batchPosition + pos, _ := b.batchPosition.Load().(int) b.mutex.Unlock() return pos } @@ -125,12 +128,13 @@ func (b *Batch) Flush() error { // lock around batch processing b.mutex.Lock() - if b.batchPosition > 0 { + batchPosition, _ := b.batchPosition.Load().(int) + if batchPosition > 0 { // snag the rest of the buffer as a slice, reset buffer - subSlice := (b.itemsToSave)[0:b.batchPosition] + subSlice := (b.itemsToSave)[0:batchPosition] b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize) - b.batchPosition = 0 + b.batchPosition.Store(0) // we've finished batch processing, unlock b.mutex.Unlock()