From d175ffff7265b35668ea404a03011dea2d1d9472 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Wed, 16 Oct 2024 15:02:59 -0700 Subject: [PATCH] Add reducer to inflight queues --- opqueue.go | 12 +++++++++++- opwindow.go | 11 ++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/opqueue.go b/opqueue.go index 1d6dd66..5f64f47 100644 --- a/opqueue.go +++ b/opqueue.go @@ -36,6 +36,8 @@ type OpQueue struct { q *list.List entries map[ID]*OpSet closed bool + + reducer func(opset *OpSet, op *Op) } // NewOpQueue create a new OpQueue. @@ -45,11 +47,19 @@ func NewOpQueue(depth, width int) *OpQueue { width: width, q: list.New(), entries: map[ID]*OpSet{}, + + reducer: func(opset *OpSet, op *Op) { + opset.append(op) + }, } q.cond.L = &q.mu return &q } +func (q *OpQueue) SetReducer(fn func(opset *OpSet, op *Op)) { + q.reducer = fn +} + // Close releases resources associated with this callgroup, by canceling the context. // The owner of this OpQueue should either call Close or cancel the context, both are // equivalent. @@ -112,7 +122,7 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { return ErrQueueSaturatedWidth } - set.append(op) + q.reducer(set, op) return nil } diff --git a/opwindow.go b/opwindow.go index 6e9ff79..19f5c16 100644 --- a/opwindow.go +++ b/opwindow.go @@ -27,6 +27,8 @@ type OpWindow struct { depth int width int windowedBy time.Duration + + reducer func(opset *OpSet, op *Op) } // NewOpWindow creates a new OpWindow. @@ -43,11 +45,18 @@ func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { width: width, windowedBy: windowedBy, m: make(map[ID]*queueItem), + reducer: func(opset *OpSet, op *Op) { + opset.append(op) + }, } q.q.Init() return q } +func (q *OpWindow) SetReducer(fn func(opset *OpSet, op *Op)) { + q.reducer = fn +} + // Close provides graceful shutdown: no new ops will be enqueued. func (q *OpWindow) Close() { q.once.Do(func() { @@ -74,7 +83,7 @@ func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { q.mu.Unlock() return ErrQueueSaturatedWidth } - item.OpSet.append(op) + q.reducer(item.OpSet, op) q.mu.Unlock() return nil }