-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch.go
163 lines (126 loc) · 3.86 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package work
import (
"errors"
"sync"
)
type Batch struct {
batchPosition int
batchSize int
itemsToSave []interface{}
pushHandler BatchHandler
flushHandler BatchHandler
mutex *sync.Mutex
}
// BatchSource is a convenience interface - not used directly by this module
type BatchSource interface {
// when the caller wants to process slices of data
// gives the batch and some context about where in the whole set
GetBatches(
onBatch func(batch []interface{}, batchIndex, batchSize, totalItemCount int) error,
) error
// when the caller wants to close/finalize assets and resources
Finalize() error
}
// convenience interface - not used directly by this module
type BatchSourceFactory func() BatchSource
// BatchDestination is a convenience interface - not used directly by this module
type BatchDestination interface {
// when the caller wants to put a slice of data somewhere
PutBatch([]interface{}) error
// when the caller wants to close/finalize assets and resources
Finalize() error
}
// BytesSource is a convenience interface - not used directly by this module
type BytesSource interface {
// when the caller wants to process bytes of data per batch
// gives the batch and some context about where in the whole set
GetBatches(
onBatch func(bytes []byte, batchIndex, batchSize, totalItemCount int) error,
) error
// when the caller wants to close/finalize assets and resources
Finalize() error
}
// note: io.WriteCloser makes a convenient alternative to "BytesDestination"
type BatchHandler func([]interface{}) error
func NewBatch(batchSize int, pushHandler BatchHandler, flushHandler ...BatchHandler) *Batch {
b := Batch{}
b.Init(batchSize, pushHandler, flushHandler...)
return &b
}
func (b *Batch) Init(batchSize int, pushHandler BatchHandler, flushHandler ...BatchHandler) {
b.batchPosition = 0
// grab the batch size - default to 100
b.batchSize = batchSize
if b.batchSize == 0 {
b.batchSize = 100
}
b.pushHandler = pushHandler
b.flushHandler = pushHandler
if len(flushHandler) > 0 {
b.flushHandler = flushHandler[0]
}
b.mutex = &sync.Mutex{}
}
func (b *Batch) Push(record interface{}) error {
if b.batchSize == 0 {
return errors.New("batch not initialized")
}
// if only one item is in the batch, don't even bother storing it
if b.batchSize == 1 {
return b.pushHandler([]interface{}{record})
}
// lock around batch processing
b.mutex.Lock()
// allocate the buffer of items to save, if needed
if b.itemsToSave == nil {
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
}
// if our batch is full
if b.batchPosition >= b.batchSize {
batch := b.itemsToSave
// allocate a new buffer, put the inbound record as the first item
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
b.itemsToSave[0] = record
b.batchPosition = 1
// release the lock
b.mutex.Unlock()
// TODO: review impact of making this call from a goroutine - definitely faster, but would bugs arise from timing changes?
if err := b.pushHandler(batch); err != nil {
return err
}
batch = nil
} else {
// our batch is not full - if the batch size
b.itemsToSave[b.batchPosition] = record
b.batchPosition++
b.mutex.Unlock()
}
return nil
}
func (b *Batch) GetPosition() int {
b.mutex.Lock()
pos := b.batchPosition
b.mutex.Unlock()
return pos
}
func (b *Batch) Flush() error {
if b.batchSize == 0 {
return errors.New("batch not initialized")
}
// lock around batch processing
b.mutex.Lock()
if b.batchPosition > 0 {
// snag the rest of the buffer as a slice, reset buffer
subSlice := (b.itemsToSave)[0:b.batchPosition]
b.itemsToSave = make([]interface{}, b.batchSize, b.batchSize)
b.batchPosition = 0
// we've finished batch processing, unlock
b.mutex.Unlock()
// call the configured flush handler
err := b.flushHandler(subSlice)
subSlice = nil
return err
}
b.mutex.Unlock()
return nil
}