From ebdb465cddaa6659599d34d17f7027590887ddcf Mon Sep 17 00:00:00 2001 From: Matt Braymer-Hayes Date: Thu, 21 Dec 2023 16:05:53 -0500 Subject: [PATCH] Add fullness as dequeue factor --- opwindow.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/opwindow.go b/opwindow.go index 4567d56..7483553 100644 --- a/opwindow.go +++ b/opwindow.go @@ -9,7 +9,7 @@ import ( ) // OpWindow is a windowed, microbatching priority queue. -// Operations for the same ID and time window form a microbatch. Microbatches whose windows have passed are dequeued in FIFO order. +// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order. // OpWindow provides back-pressure for both depth (i.e., number of entries in queue) and width (i.e., number of entries in a microbatch). // OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow(). type OpWindow struct { @@ -67,6 +67,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { item, ok := q.m[id] if ok { if len(item.OpSet.set) >= q.width { + close(item.IsFull) q.mu.Unlock() return ErrQueueSaturatedWidth } @@ -92,6 +93,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { ID: id, ProcessAt: time.Now().Add(q.windowedBy), OpSet: newOpSet(op), + IsFull: make(chan struct{}), } q.m[id] = item q.q.PushBack(item) @@ -132,13 +134,17 @@ func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) { waitFor := time.Until(item.ProcessAt) if waitFor > 0 { - q.mu.Unlock() // allow others to add to OpQueue while we wait + q.mu.Unlock() // // NOTE (2023-12) (mh): Do we need to pool these? timer := time.NewTimer(waitFor) + defer timer.Stop() select { + case <-ctx.Done(): + return nil, ctx.Err() case <-q.done: // process right away - timer.Stop() + case <-item.IsFull: + // process once full, regardless of windowing case <-timer.C: } q.mu.Lock() @@ -160,4 +166,5 @@ type queueItem struct { ID ID ProcessAt time.Time OpSet *OpSet + IsFull chan struct{} }