Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpWindow: Redux #10

Merged
merged 8 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@

# coverage.txt (created by go.test.sh)
coverage.txt

# vscode settings
.vscode/settings.json
4 changes: 0 additions & 4 deletions opset.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package inflight

import "time"

// OpSet represents the set of Ops that have been merged in an OpQueue,
// It provides convenience functions for appending new Ops and for completing them.
type OpSet struct {
set []*Op
// used by the opWindow determine when it's ok to dequeue
enqueuedAt time.Time
}

func newOpSet(op *Op) *OpSet {
Expand Down
295 changes: 126 additions & 169 deletions opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,211 +3,168 @@ package inflight
import (
"container/list"
"context"
"fmt"
"sync"
"time"
)

// OpWindow is a thread-safe duplicate operation suppression queue,
// that combines duplicate operations (queue entires) into sets
// that will be dequeued together.
//
// For example, If you enqueue an item with a key that already exists,
// then that item will be appended to that key's set of items. Otherwise
// the item is inserted into the head of the list as a new item.
//
// On Dequeue a SET is returned of all items that share a key in the
// queue. It blocks on dequeue if the queue is empty, but returns an
// error if the queue is full during enqueue.
// OpWindow is a windowed, microbatching priority queue.
// Operations for the same ID and time window form a microbatch. Microbatches are dequeued in FIFO order.
// OpWindow provides backpressure for both depth (i.e., number of microbatches in queue) and width (i.e., number of operations in a microbatch).
// OpWindow is safe for concurrent use. Its zero value is not safe to use, use NewOpWindow().
type OpWindow struct {
cond *sync.Cond
ctx context.Context
can context.CancelFunc
mu sync.Mutex
q list.List // *queueItem
m map[ID]*queueItem

// These are selectable sync.Cond: use blocking read for Wait() and non-blocking write for Signal().
queueHasItems chan struct{}
queueHasSpace chan struct{}

once sync.Once
done chan struct{}

depth int
width int
windowedBy int64

q *list.List
entries map[ID]*OpSet
windowedBy time.Duration
}

// NewOpWindow create a new OpWindow. Close by calling Close or cancaling
// the provided context.
func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow {
cond := sync.NewCond(&sync.Mutex{})
myctx, can := context.WithCancel(ctx)
// NewOpWindow creates a new OpWindow.
//
// depth: maximum number of entries in a queue
// width: maximum number of entries in a microbatch.
// windowedBy: window size.
func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
q := &OpWindow{
cond: cond,
ctx: myctx,
can: can,
depth: depth,
width: width,
q: list.New(),
entries: map[ID]*OpSet{},
windowedBy: windowedBy.Nanoseconds(),
queueHasItems: make(chan struct{}),
queueHasSpace: make(chan struct{}),
done: make(chan struct{}),
depth: depth,
width: width,
windowedBy: windowedBy,
m: make(map[ID]*queueItem),
}
go func() {
t := time.NewTicker(250 * time.Millisecond)
for {
select {
case <-q.ctx.Done():
return
case n := <-t.C:
// signal someone to wake up incase we have any Items falling
// out of the window.
q.cond.L.Lock()
if q.itemsReady(n) {
q.cond.Signal()
}
q.cond.L.Unlock()
}
}
}()
q.q.Init()
return q
}

// Close releases resources associated with this callgroup, by canceling
// the context. The owner of this OpWindow should either call Close or cancel
// the context, both are equivalent.
// Close provides graceful shutdown: no new ops will be enqueued.
func (q *OpWindow) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.can()
// alert all dequeue calls that they should wake up and return.
q.windowedBy = 0 // turn off windowing so everything is dequeue
q.cond.Broadcast()
return
q.once.Do(func() {
q.mu.Lock()
defer q.mu.Unlock()
close(q.done)
// HACK (2023-12) (mh): Set depth to zero so new entries are rejected.
q.depth = 0
})
}

// Len returns the number of uniq IDs in the queue, that is the depth of the
// queue.
func (q *OpWindow) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.q.Len()
}
// Enqueue op into queue, blocking until first of: op is enqueued, ID has hit max width, context is done, or queue is closed.
func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
q.mu.Lock() // locked on returns below

// Enqueue add the op to the queue. If the ID already exists then the Op
// is added to the existing OpSet for this ID, otherwise it's inserted as
// a new OpSet.
//
// Enqueue doesn't block if the queue if full, instead it returns a
// ErrQueueSaturated error.
func (q *OpWindow) Enqueue(id ID, op *Op) error {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.q.Len() >= q.depth {
return ErrQueueSaturatedDepth
}

set, ok := q.entries[id]
if !ok {
// This is a new item, so we need to insert it into the queue.
q.newEntry(id, op)

// Signal one waiting go routine to wake up and Dequeue
// I believe we only need to signal if we enqueue a new item.
// Consider the following possible states the queue could be in :
// 1. if no one is currently waiting in Dequeue, the signal isn't
// needed and all items will be dequeued on the next call to
// Dequeue.
// 2. One or Many go-routines are waiting in Dequeue because it's
// empty, and calling Signal will wake up one. Which will
// dequeue the item and return.
// 3. At most One go-routine is in the act of Dequeueing existing
// items from the queue (i.e. only one can have the lock and be
// in the "if OK" condition within the forloop in Dequeue). In
// which cause the signal is ignored and after returning we
// return to condition (1) above.
// Note signaled waiting go-routines will not be able the acquire
// the condition lock until this method call returns, finishing
// its append of the new operation.
q.cond.Signal()
} else if len(set.Ops()) >= q.width {
return ErrQueueSaturatedWidth
} else {
set.append(op)
}
return nil
}
for {
item, ok := q.m[id]
if ok {
if len(item.OpSet.set) >= q.width {
close(item.IsFull)
q.mu.Unlock()
return ErrQueueSaturatedWidth
}
item.OpSet.append(op)
q.mu.Unlock()
return nil
}

// Dequeue removes the oldest OpSet from the queue and returns it.
// Dequeue will block if the Queue is empty. An Enqueue will wake the
// go routine up and it will continue on.
//
// If the OpWindow is closed, then Dequeue will return false
// for the second parameter.
func (q *OpWindow) Dequeue() (*OpSet, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.q.Len() >= q.depth {
q.mu.Unlock()
select {
case <-ctx.Done():
return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err())
case <-q.done:
return ErrQueueClosed
case <-q.queueHasSpace:
q.mu.Lock()
continue
}
}

for {
if set, ok := q.dequeue(); ok {
return set, true
item = &queueItem{
ID: id,
ProcessAt: time.Now().Add(q.windowedBy),
OpSet: newOpSet(op),
IsFull: make(chan struct{}),
}
q.m[id] = item
q.q.PushBack(item)
q.mu.Unlock()

select {
case <-q.ctx.Done():
return nil, false
case q.queueHasItems <- struct{}{}:
default:
}
// release the lock and wait until signaled. On awake we'll acquire the lock.
// After wait acquires the lock we have to recheck the wait condition,
// because it's possible that someone else
// drained the queue while, we were reacquiring the lock.
q.cond.Wait()
}
}

type queElement struct {
id ID
enqueuedAtUnixN int64
return nil
}
}

func (q *OpWindow) newEntry(id ID, op *Op) {
set := newOpSet(op)
q.entries[id] = set
// Dequeue removes and returns the oldest OpSet whose window has passed from the queue,
// blocking until first of: OpSet is ready, context is canceled, or queue is closed.
func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
q.mu.Lock() // unlocked on returns below

eq := &queElement{id, time.Now().UnixNano()}
q.q.PushBack(eq)
}
var item *queueItem
for item == nil {
elem := q.q.Front()
if elem == nil {
q.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
return nil, ErrQueueClosed
case <-q.queueHasItems:
q.mu.Lock()
continue
}

func (q *OpWindow) itemsReady(tim time.Time) bool {
elem := q.q.Front()
if elem == nil {
return false
}
item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item
}

eq := elem.Value.(*queElement)
qt := tim.UnixNano() - eq.enqueuedAtUnixN

if qt < q.windowedBy {
return false
waitFor := time.Until(item.ProcessAt)
if waitFor > 0 {
q.mu.Unlock() //
// NOTE (2023-12) (mh): Do we need to pool these?
timer := time.NewTimer(waitFor)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
// process right away
case <-item.IsFull:
// process once full, regardless of windowing
case <-timer.C:
}
q.mu.Lock()
}
return true
}

func (q *OpWindow) dequeue() (*OpSet, bool) {
elem := q.q.Front()
if elem == nil {
return nil, false
}
ops := item.OpSet
delete(q.m, item.ID)
q.mu.Unlock()
item = nil // gc

eq := elem.Value.(*queElement)
qt := time.Now().UnixNano() - eq.enqueuedAtUnixN
if qt < q.windowedBy {
return nil, false
select {
case q.queueHasSpace <- struct{}{}:
default:
}
return ops, nil
}

q.q.Remove(elem)
id := eq.id

set, ok := q.entries[id]
if !ok {
panic("invariant broken: we dequeued a value that isn't in the map")
}
delete(q.entries, id)
return set, true
type queueItem struct {
ID ID
ProcessAt time.Time
OpSet *OpSet
IsFull chan struct{}
}
Loading
Loading