Skip to content

Commit 7da6924

Browse files
committed
feat(ARCO-291): Ordered send manager with batch callbacks
1 parent 6efb866 commit 7da6924

File tree

2 files changed

+290
-69
lines changed

2 files changed

+290
-69
lines changed

internal/callbacker/send_manager/ordered/send_manager.go

Lines changed: 160 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,30 @@ type SendManager struct {
3838
cancelAll context.CancelFunc
3939
ctx context.Context
4040

41-
singleSendInterval time.Duration
41+
queueProcessInterval time.Duration
4242
backfillQueueInterval time.Duration
4343
sortByTimestampInterval time.Duration
44+
batchSendInterval time.Duration
45+
batchSize int
4446

45-
bufferSize int
46-
callbackList *list.List
47+
bufferSize int
48+
callbackQueue *list.List
4749

4850
now func() time.Time
4951
}
5052

5153
const (
52-
entriesBufferSize = 10000
53-
54-
singleSendIntervalDefault = 5 * time.Second
54+
entriesBufferSize = 10000
55+
batchSizeDefault = 50
56+
queueProcessIntervalDefault = 5 * time.Second
5557
backfillQueueIntervalDefault = 5 * time.Second
5658
expirationDefault = 24 * time.Hour
5759
sortByTimestampIntervalDefault = 10 * time.Second
60+
batchSendIntervalDefault = 5 * time.Second
61+
)
62+
63+
var (
64+
ErrSendBatchedCallbacks = errors.New("failed to send batched callback")
5865
)
5966

6067
func WithNow(nowFunc func() time.Time) func(*SendManager) {
@@ -69,9 +76,9 @@ func WithBufferSize(size int) func(*SendManager) {
6976
}
7077
}
7178

72-
func WithSingleSendInterval(d time.Duration) func(*SendManager) {
79+
func WithQueueProcessInterval(d time.Duration) func(*SendManager) {
7380
return func(m *SendManager) {
74-
m.singleSendInterval = d
81+
m.queueProcessInterval = d
7582
}
7683
}
7784

@@ -93,20 +100,34 @@ func WithSortByTimestampInterval(d time.Duration) func(*SendManager) {
93100
}
94101
}
95102

103+
func WithBatchSendInterval(d time.Duration) func(*SendManager) {
104+
return func(m *SendManager) {
105+
m.batchSendInterval = d
106+
}
107+
}
108+
109+
func WithBatchSize(size int) func(*SendManager) {
110+
return func(m *SendManager) {
111+
m.batchSize = size
112+
}
113+
}
114+
96115
func New(url string, sender callbacker.SenderI, store SendManagerStore, logger *slog.Logger, opts ...func(*SendManager)) *SendManager {
97116
m := &SendManager{
98117
url: url,
99118
sender: sender,
100119
store: store,
101120
logger: logger,
102121

103-
singleSendInterval: singleSendIntervalDefault,
122+
queueProcessInterval: queueProcessIntervalDefault,
104123
expiration: expirationDefault,
105124
backfillQueueInterval: backfillQueueIntervalDefault,
106125
sortByTimestampInterval: sortByTimestampIntervalDefault,
126+
batchSendInterval: batchSendIntervalDefault,
127+
batchSize: batchSizeDefault,
107128

108-
callbackList: list.New(),
109-
bufferSize: entriesBufferSize,
129+
callbackQueue: list.New(),
130+
bufferSize: entriesBufferSize,
110131
}
111132

112133
for _, opt := range opts {
@@ -121,17 +142,17 @@ func New(url string, sender callbacker.SenderI, store SendManagerStore, logger *
121142
}
122143

123144
func (m *SendManager) Enqueue(entry callbacker.CallbackEntry) {
124-
if m.callbackList.Len() >= m.bufferSize {
145+
if m.callbackQueue.Len() >= m.bufferSize {
125146
m.storeToDB(entry)
126147
return
127148
}
128149

129-
m.callbackList.PushBack(entry)
150+
m.callbackQueue.PushBack(entry)
130151
}
131152

132153
func (m *SendManager) sortByTimestamp() error {
133-
current := m.callbackList.Front()
134-
if m.callbackList.Front() == nil {
154+
current := m.callbackQueue.Front()
155+
if m.callbackQueue.Front() == nil {
135156
return nil
136157
}
137158
for current != nil {
@@ -160,20 +181,31 @@ func (m *SendManager) sortByTimestamp() error {
160181
}
161182

162183
func (m *SendManager) CallbacksQueued() int {
163-
return m.callbackList.Len()
184+
return m.callbackQueue.Len()
164185
}
165186

166187
func (m *SendManager) Start() {
167-
queueTicker := time.NewTicker(m.singleSendInterval)
188+
queueTicker := time.NewTicker(m.queueProcessInterval)
168189
sortQueueTicker := time.NewTicker(m.sortByTimestampInterval)
169190
backfillQueueTicker := time.NewTicker(m.backfillQueueInterval)
191+
batchSendTicker := time.NewTicker(m.batchSendInterval)
170192

171193
m.entriesWg.Add(1)
194+
var callbackBatch []*list.Element
195+
172196
go func() {
173197
var err error
174198
defer func() {
175199
// read all from callback queue and store in database
176-
data := make([]*store.CallbackData, m.callbackList.Len())
200+
data := make([]*store.CallbackData, m.callbackQueue.Len()+len(callbackBatch))
201+
202+
for _, callbackElement := range callbackBatch {
203+
entry, ok := callbackElement.Value.(callbacker.CallbackEntry)
204+
if !ok {
205+
continue
206+
}
207+
m.callbackQueue.PushBack(entry)
208+
}
177209

178210
for i, entry := range m.dequeueAll() {
179211
data[i] = toStoreDto(m.url, entry)
@@ -189,6 +221,8 @@ func (m *SendManager) Start() {
189221
m.entriesWg.Done()
190222
}()
191223

224+
lastIterationWasBatch := false
225+
192226
for {
193227
select {
194228
case <-m.ctx.Done():
@@ -202,54 +236,132 @@ func (m *SendManager) Start() {
202236
case <-backfillQueueTicker.C:
203237
m.backfillQueue()
204238

239+
m.logger.Debug("Callback queue backfilled", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
240+
case <-batchSendTicker.C:
241+
if len(callbackBatch) == 0 {
242+
continue
243+
}
244+
245+
err = m.sendElementBatch(callbackBatch)
246+
if err != nil {
247+
m.logger.Error("Failed to send batch of callbacks", slog.String("url", m.url))
248+
continue
249+
}
250+
251+
callbackBatch = callbackBatch[:0]
252+
m.logger.Debug("Batched callbacks sent on interval", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
205253
case <-queueTicker.C:
206-
m.processQueueSingle()
254+
front := m.callbackQueue.Front()
255+
if front == nil {
256+
continue
257+
}
258+
259+
callbackEntry, ok := front.Value.(callbacker.CallbackEntry)
260+
if !ok {
261+
continue
262+
}
263+
264+
// If item is expired - dequeue without storing
265+
if m.now().Sub(callbackEntry.Data.Timestamp) > m.expiration {
266+
m.logger.Warn("Callback expired", slog.Time("timestamp", callbackEntry.Data.Timestamp), slog.String("hash", callbackEntry.Data.TxID), slog.String("status", callbackEntry.Data.TxStatus))
267+
m.callbackQueue.Remove(front)
268+
continue
269+
}
270+
271+
if callbackEntry.AllowBatch {
272+
lastIterationWasBatch = true
273+
274+
if len(callbackBatch) < m.batchSize {
275+
callbackBatch = append(callbackBatch, front)
276+
queueTicker.Reset(m.queueProcessInterval)
277+
m.callbackQueue.Remove(front)
278+
continue
279+
}
280+
281+
err = m.sendElementBatch(callbackBatch)
282+
if err != nil {
283+
m.logger.Error("Failed to send batch of callbacks", slog.String("url", m.url))
284+
continue
285+
}
286+
287+
callbackBatch = callbackBatch[:0]
288+
m.logger.Debug("Batched callbacks sent", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
289+
continue
290+
}
291+
292+
if lastIterationWasBatch {
293+
lastIterationWasBatch = false
294+
if len(callbackBatch) > 0 {
295+
// if entry is not a batched entry, but last one was, send batch to keep the order
296+
err = m.sendElementBatch(callbackBatch)
297+
if err != nil {
298+
m.logger.Error("Failed to send batch of callbacks", slog.String("url", m.url))
299+
continue
300+
}
301+
callbackBatch = callbackBatch[:0]
302+
m.logger.Debug("Batched callbacks sent before sending single callback", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
303+
}
304+
}
305+
306+
success, retry := m.sender.Send(m.url, callbackEntry.Token, callbackEntry.Data)
307+
if !retry || success {
308+
m.callbackQueue.Remove(front)
309+
m.logger.Debug("Single callback sent", slog.Int("callback elements", len(callbackBatch)), slog.Int("queue length", m.CallbacksQueued()), slog.String("url", m.url))
310+
continue
311+
}
312+
m.logger.Error("Failed to send single callback", slog.String("url", m.url))
207313
}
208314
}
209315
}()
210316
}
211317

212-
func (m *SendManager) backfillQueue() {
213-
capacityLeft := m.bufferSize - m.callbackList.Len()
214-
if capacityLeft == 0 {
215-
return
318+
func (m *SendManager) sendElementBatch(callbackElements []*list.Element) error {
319+
var callbackElement *list.Element
320+
callbackBatch := make([]callbacker.CallbackEntry, 0, len(callbackElements))
321+
for _, element := range callbackElements {
322+
callback, ok := element.Value.(callbacker.CallbackEntry)
323+
if !ok {
324+
continue
325+
}
326+
callbackBatch = append(callbackBatch, callback)
216327
}
328+
success, retry := m.sendBatch(callbackBatch)
329+
if !retry || success {
330+
for _, callbackElement = range callbackElements {
331+
m.callbackQueue.Remove(callbackElement)
332+
}
217333

218-
callbacks, err := m.store.GetAndDelete(m.ctx, m.url, capacityLeft)
219-
if err != nil {
220-
m.logger.Error("Failed to load callbacks", slog.String("err", err.Error()))
221-
return
334+
return nil
222335
}
223336

224-
for _, callback := range callbacks {
225-
m.Enqueue(toEntry(callback))
226-
}
337+
return ErrSendBatchedCallbacks
227338
}
228339

229-
func (m *SendManager) processQueueSingle() {
230-
front := m.callbackList.Front()
231-
if front == nil {
232-
return
340+
func (m *SendManager) sendBatch(batch []callbacker.CallbackEntry) (success, retry bool) {
341+
token := batch[0].Token
342+
callbacks := make([]*callbacker.Callback, len(batch))
343+
for i, e := range batch {
344+
callbacks[i] = e.Data
233345
}
234346

235-
callbackEntry, ok := front.Value.(callbacker.CallbackEntry)
236-
if !ok {
347+
return m.sender.SendBatch(m.url, token, callbacks)
348+
}
349+
350+
func (m *SendManager) backfillQueue() {
351+
capacityLeft := m.bufferSize - m.callbackQueue.Len()
352+
if capacityLeft == 0 {
237353
return
238354
}
239355

240-
// If item is expired - dequeue without storing
241-
if m.now().Sub(callbackEntry.Data.Timestamp) > m.expiration {
242-
m.logger.Warn("callback expired", slog.Time("timestamp", callbackEntry.Data.Timestamp), slog.String("hash", callbackEntry.Data.TxID), slog.String("status", callbackEntry.Data.TxStatus))
243-
m.callbackList.Remove(front)
356+
callbacks, err := m.store.GetAndDelete(m.ctx, m.url, capacityLeft)
357+
if err != nil {
358+
m.logger.Error("Failed to load callbacks", slog.String("err", err.Error()))
244359
return
245360
}
246361

247-
success, retry := m.sender.Send(m.url, callbackEntry.Token, callbackEntry.Data)
248-
if !retry || success {
249-
m.callbackList.Remove(front)
250-
return
362+
for _, callback := range callbacks {
363+
m.Enqueue(toEntry(callback))
251364
}
252-
m.logger.Error("failed to send single callback", slog.String("url", m.url))
253365
}
254366

255367
func (m *SendManager) storeToDB(entry callbacker.CallbackEntry) {
@@ -297,18 +409,18 @@ func toEntry(callbackData *store.CallbackData) callbacker.CallbackEntry {
297409
}
298410

299411
func (m *SendManager) dequeueAll() []callbacker.CallbackEntry {
300-
callbacks := make([]callbacker.CallbackEntry, 0, m.callbackList.Len())
412+
callbacks := make([]callbacker.CallbackEntry, 0, m.callbackQueue.Len())
301413

302414
var next *list.Element
303-
for front := m.callbackList.Front(); front != nil; front = next {
415+
for front := m.callbackQueue.Front(); front != nil; front = next {
304416
next = front.Next()
305417
entry, ok := front.Value.(callbacker.CallbackEntry)
306418
if !ok {
307419
continue
308420
}
309421
callbacks = append(callbacks, entry)
310422

311-
m.callbackList.Remove(front)
423+
m.callbackQueue.Remove(front)
312424
}
313425

314426
return callbacks

0 commit comments

Comments
 (0)