Skip to content

Commit

Permalink
Add fullness as dequeue factor
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Dec 21, 2023
1 parent 94b5ecf commit ebdb465
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// OpWindow is a windowed, microbatching priority queue.
// Operations for the same ID and time window form a microbatch. Microbatches whose windows have passed are dequeued in FIFO order.
// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order.
// OpWindow provides back-pressure for both depth (i.e., number of entries in queue) and width (i.e., number of entries in a microbatch).
// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow().
type OpWindow struct {
Expand Down Expand Up @@ -67,6 +67,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
item, ok := q.m[id]
if ok {
if len(item.OpSet.set) >= q.width {
close(item.IsFull)
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
Expand All @@ -92,6 +93,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
ID: id,
ProcessAt: time.Now().Add(q.windowedBy),
OpSet: newOpSet(op),
IsFull: make(chan struct{}),
}
q.m[id] = item
q.q.PushBack(item)
Expand Down Expand Up @@ -132,13 +134,17 @@ func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {

waitFor := time.Until(item.ProcessAt)
if waitFor > 0 {
q.mu.Unlock() // allow others to add to OpQueue while we wait
q.mu.Unlock() //
// NOTE (2023-12) (mh): Do we need to pool these?
timer := time.NewTimer(waitFor)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
// process right away
timer.Stop()
case <-item.IsFull:
// process once full, regardless of windowing
case <-timer.C:
}
q.mu.Lock()
Expand All @@ -160,4 +166,5 @@ type queueItem struct {
ID ID
ProcessAt time.Time
OpSet *OpSet
IsFull chan struct{}
}

0 comments on commit ebdb465

Please sign in to comment.