Skip to content

Commit 3469b30

Browse files
committed
Thread-safe batch
1 parent 2435da1 commit 3469b30

File tree

1 file changed

+33
-4
lines changed

1 file changed

+33
-4
lines changed

batch.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package worker
22

3-
import "errors"
3+
import (
4+
"errors"
5+
"sync"
6+
)
47

58
type Batch struct {
69
batchPosition int
710
batchSize int
811
itemsToSave []interface{}
912
pushHandler BatchHandler
1013
flushHandler BatchHandler
14+
mutex *sync.Mutex
1115
}
1216

1317
type BatchHandler func([]interface{}) error
@@ -23,26 +27,33 @@ func (b *Batch) Init(batchSize int, pushHandler BatchHandler, flushHandler Batch
2327

2428
b.pushHandler = pushHandler
2529
b.flushHandler = flushHandler
30+
b.mutex = &sync.Mutex{}
2631
}
2732

2833
func (b *Batch) Push(record interface{}) error {
2934
if b.batchSize == 0 {
3035
return errors.New("batch not initialized")
3136
}
3237

38+
// lock around batch processing
39+
b.mutex.Lock()
40+
3341
// allocate the buffer of items to save, if needed
3442
if b.itemsToSave == nil {
3543
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
3644
b.batchPosition = 0
37-
}
38-
39-
if b.batchPosition >= b.batchSize {
45+
b.mutex.Unlock()
46+
} else if b.batchPosition >= b.batchSize {
4047
batch := b.itemsToSave
4148

4249
// allocate a new buffer, put the inbound record as the first item
4350
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
4451
b.itemsToSave[0] = record
4552
b.batchPosition = 1
53+
54+
// release the lock
55+
b.mutex.Unlock()
56+
4657
if err := b.pushHandler(batch); err != nil {
4758
return err
4859
}
@@ -52,24 +63,42 @@ func (b *Batch) Push(record interface{}) error {
5263
} else {
5364
b.itemsToSave[b.batchPosition] = record
5465
b.batchPosition++
66+
b.mutex.Unlock()
5567
}
5668

5769
return nil
5870
}
5971

72+
func (b *Batch) GetPosition() int {
73+
b.mutex.Lock()
74+
pos := b.batchPosition
75+
b.mutex.Unlock()
76+
return pos
77+
}
78+
6079
func (b *Batch) Flush() error {
6180
if b.batchSize == 0 {
6281
return errors.New("batch not initialized")
6382
}
6483

84+
// lock around batch processing
85+
b.mutex.Lock()
6586
if len(b.itemsToSave) > 0 {
87+
88+
// snag the rest of the buffer as a slice, reset buffer
6689
subSlice := (b.itemsToSave)[0:b.batchPosition]
6790
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
6891
b.batchPosition = 0
92+
93+
// we've finished batch processing, unlock
94+
b.mutex.Unlock()
95+
96+
// call the configured flush handler
6997
err := b.flushHandler(subSlice)
7098
subSlice = nil
7199
return err
72100
}
101+
b.mutex.Unlock()
73102

74103
return nil
75104
}

0 commit comments

Comments
 (0)