diff --git a/.gitignore b/.gitignore index bf142f7..3ce0214 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ # coverage.txt (created by go.test.sh) coverage.txt + +# vscode settings +.vscode/settings.json \ No newline at end of file diff --git a/opset.go b/opset.go index b95b606..4a6d957 100644 --- a/opset.go +++ b/opset.go @@ -1,13 +1,9 @@ package inflight -import "time" - // OpSet represents the set of Ops that have been merged in an OpQueue, // It provides convenience functions for appending new Ops and for completing them. type OpSet struct { set []*Op - // used by the opWindow determine when it's ok to dequeue - enqueuedAt time.Time } func newOpSet(op *Op) *OpSet { diff --git a/opwindow.go b/opwindow.go index 1a55403..44a8697 100644 --- a/opwindow.go +++ b/opwindow.go @@ -3,6 +3,7 @@ package inflight import ( "container/list" "context" + "fmt" "sync" "time" ) @@ -11,125 +12,114 @@ import ( // that combines duplicate operations (queue entires) into sets // that will be dequeued together. // -// For example, If you enqueue an item with a key that already exists, +// For example, if you enqueue an item with a key that already exists, // then that item will be appended to that key's set of items. Otherwise -// the item is inserted into the head of the list as a new item. +// the item is inserted into the back of the list as a new item. // -// On Dequeue a SET is returned of all items that share a key in the +// On Dequeue the oldest OpSet is returned, containing all items that share a key in the // queue. It blocks on dequeue if the queue is empty, but returns an // error if the queue is full during enqueue. type OpWindow struct { - cond *sync.Cond - ctx context.Context - can context.CancelFunc + mu sync.Mutex + emptyCond sync.Cond + fullCond sync.Cond + + once sync.Once + done chan struct{} depth int width int - windowedBy int64 + windowedBy time.Duration - q *list.List - entries map[ID]*OpSet + q *list.List // *queueItem + m map[ID]*queueItem } -// NewOpWindow create a new OpWindow. Close by calling Close or cancaling -// the provided context. -func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow { - cond := sync.NewCond(&sync.Mutex{}) - myctx, can := context.WithCancel(ctx) +// NewOpWindow create a new OpWindow. +func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow { q := &OpWindow{ - cond: cond, - ctx: myctx, - can: can, + done: make(chan struct{}), depth: depth, width: width, q: list.New(), - entries: map[ID]*OpSet{}, - windowedBy: windowedBy.Nanoseconds(), + m: make(map[ID]*queueItem), + windowedBy: windowedBy, } - go func() { - t := time.NewTicker(250 * time.Millisecond) - for { - select { - case <-q.ctx.Done(): - return - case n := <-t.C: - // signal someone to wake up incase we have any Items falling - // out of the window. - q.cond.L.Lock() - if q.itemsReady(n) { - q.cond.Signal() - } - q.cond.L.Unlock() - } - } - }() + q.emptyCond.L = &q.mu + q.fullCond.L = &q.mu return q } -// Close releases resources associated with this callgroup, by canceling -// the context. The owner of this OpWindow should either call Close or cancel -// the context, both are equivalent. +// Close provides graceful shutdown: no new ops can be enqueued and, +// once drained, De func (q *OpWindow) Close() { - q.cond.L.Lock() - defer q.cond.L.Unlock() - - q.can() - // alert all dequeue calls that they should wake up and return. - q.windowedBy = 0 // turn off windowing so everything is dequeue - q.cond.Broadcast() - return + q.mu.Lock() + defer q.mu.Unlock() + + q.once.Do(func() { + close(q.done) + q.windowedBy = 0 // turn off windowing so everything is dequeue + // alert all dequeue calls that they should wake up and return. + q.emptyCond.Broadcast() + q.fullCond.Broadcast() + }) } // Len returns the number of uniq IDs in the queue, that is the depth of the // queue. func (q *OpWindow) Len() int { - q.cond.L.Lock() - defer q.cond.L.Unlock() + q.mu.Lock() + defer q.mu.Unlock() return q.q.Len() } -// Enqueue add the op to the queue. If the ID already exists then the Op -// is added to the existing OpSet for this ID, otherwise it's inserted as +// Enqueue add the op to the queue. +// If the ID already exists then the Op is added to the existing OpSet for this ID. +// Otherwise if it's inserted as // a new OpSet. // // Enqueue doesn't block if the queue if full, instead it returns a // ErrQueueSaturated error. -func (q *OpWindow) Enqueue(id ID, op *Op) error { - q.cond.L.Lock() - defer q.cond.L.Unlock() - - if q.q.Len() >= q.depth { - return ErrQueueSaturatedDepth +func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error { + q.mu.Lock() + defer q.mu.Unlock() + + select { + case <-q.done: + return ErrQueueClosed + default: } - set, ok := q.entries[id] + item, ok := q.m[id] if !ok { + for q.q.Len() >= q.depth { + select { + case <-ctx.Done(): + return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err()) + case <-q.done: + return ErrQueueClosed + default: + q.fullCond.Wait() + } + } // This is a new item, so we need to insert it into the queue. - q.newEntry(id, op) - - // Signal one waiting go routine to wake up and Dequeue - // I believe we only need to signal if we enqueue a new item. - // Consider the following possible states the queue could be in : - // 1. if no one is currently waiting in Dequeue, the signal isn't - // needed and all items will be dequeued on the next call to - // Dequeue. - // 2. One or Many go-routines are waiting in Dequeue because it's - // empty, and calling Signal will wake up one. Which will - // dequeue the item and return. - // 3. At most One go-routine is in the act of Dequeueing existing - // items from the queue (i.e. only one can have the lock and be - // in the "if OK" condition within the forloop in Dequeue). In - // which cause the signal is ignored and after returning we - // return to condition (1) above. - // Note signaled waiting go-routines will not be able the acquire - // the condition lock until this method call returns, finishing - // its append of the new operation. - q.cond.Signal() - } else if len(set.Ops()) >= q.width { + + item := &queueItem{ + ID: id, + ProcessAt: time.Now().Add(q.windowedBy), + OpSet: newOpSet(op), + } + q.m[id] = item + q.q.PushBack(item) + + q.emptyCond.Signal() + return nil + } + if len(item.OpSet.set) >= q.width { return ErrQueueSaturatedWidth - } else { - set.append(op) } + + item.OpSet.append(op) return nil } @@ -139,75 +129,51 @@ func (q *OpWindow) Enqueue(id ID, op *Op) error { // // If the OpWindow is closed, then Dequeue will return false // for the second parameter. -func (q *OpWindow) Dequeue() (*OpSet, bool) { - q.cond.L.Lock() - defer q.cond.L.Unlock() +func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) { + q.mu.Lock() + defer q.mu.Unlock() + + var item *queueItem + for item == nil { + elem := q.q.Front() + if elem == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-q.done: + return nil, ErrQueueClosed + default: + q.emptyCond.Wait() + continue + } - for { - if set, ok := q.dequeue(); ok { - return set, true } + item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item + } + waitFor := time.Until(item.ProcessAt) + if waitFor > 0 { + q.mu.Unlock() // allow others to add to OpQueue while we wait + timer := time.NewTimer(waitFor) select { - case <-q.ctx.Done(): - return nil, false - default: + case <-q.done: + // process right away + timer.Stop() + case <-timer.C: } - // release the lock and wait until signaled. On awake we'll acquire the lock. - // After wait acquires the lock we have to recheck the wait condition, - // because it's possible that someone else - // drained the queue while, we were reacquiring the lock. - q.cond.Wait() - } -} - -type queElement struct { - id ID - enqueuedAtUnixN int64 -} - -func (q *OpWindow) newEntry(id ID, op *Op) { - set := newOpSet(op) - q.entries[id] = set - - eq := &queElement{id, time.Now().UnixNano()} - q.q.PushBack(eq) -} - -func (q *OpWindow) itemsReady(tim time.Time) bool { - elem := q.q.Front() - if elem == nil { - return false + q.mu.Lock() } - eq := elem.Value.(*queElement) - qt := tim.UnixNano() - eq.enqueuedAtUnixN + ops := item.OpSet + delete(q.m, item.ID) + item = nil // gc - if qt < q.windowedBy { - return false - } - return true + q.fullCond.Signal() + return ops, nil } -func (q *OpWindow) dequeue() (*OpSet, bool) { - elem := q.q.Front() - if elem == nil { - return nil, false - } - - eq := elem.Value.(*queElement) - qt := time.Now().UnixNano() - eq.enqueuedAtUnixN - if qt < q.windowedBy { - return nil, false - } - - q.q.Remove(elem) - id := eq.id - - set, ok := q.entries[id] - if !ok { - panic("invariant broken: we dequeued a value that isn't in the map") - } - delete(q.entries, id) - return set, true +type queueItem struct { + ID ID + ProcessAt time.Time + OpSet *OpSet } diff --git a/opwindow_test.go b/opwindow_test.go index e2ef1e8..ecc45b8 100644 --- a/opwindow_test.go +++ b/opwindow_test.go @@ -23,10 +23,11 @@ func TestOpWindow(t *testing.T) { 1 * time.Second, } - for _, winTimeT := range winTimes { - winTime := winTimeT // scope it locally so it can be correctly captured + for _, winTime := range winTimes { + winTime := winTime // scope it locally so it can be correctly captured t.Run(fmt.Sprintf("windowed_by_%v", winTime), func(t *testing.T) { t.Parallel() + ctx := context.Background() completed1 := 0 completed2 := 0 cg1 := NewCallGroup(func(finalState map[ID]*Response) { @@ -44,29 +45,29 @@ func TestOpWindow(t *testing.T) { op2_1 := cg2.Add(1, &tsMsg{123, now}) op2_2 := cg2.Add(2, &tsMsg{111, now}) - window := NewOpWindow(context.Background(), 3, 3, winTime) + window := NewOpWindow(3, 3, winTime) defer window.Close() st := time.Now() { - err := window.Enqueue(op1_1.Key, op1_1) + err := window.Enqueue(ctx, op1_1.Key, op1_1) assert.Equal(t, nil, err) - err = window.Enqueue(op2_1.Key, op2_1) + err = window.Enqueue(ctx, op2_1.Key, op2_1) assert.Equal(t, nil, err) - err = window.Enqueue(op1_2.Key, op1_2) + err = window.Enqueue(ctx, op1_2.Key, op1_2) assert.Equal(t, nil, err) - err = window.Enqueue(op2_2.Key, op2_2) + err = window.Enqueue(ctx, op2_2.Key, op2_2) assert.Equal(t, nil, err) } require.Equal(t, 2, window.Len()) // only 2 unique keys - _, ok := window.Dequeue() - assert.True(t, ok) - _, ok = window.Dequeue() - assert.True(t, ok) + _, err := window.Dequeue(ctx) + assert.NoError(t, err) + _, err = window.Dequeue(ctx) + assert.NoError(t, err) - rt := time.Now().Sub(st) + rt := time.Since(st) assert.Greater(t, rt, winTime) }) } @@ -74,6 +75,7 @@ func TestOpWindow(t *testing.T) { func TestOpWindowClose(t *testing.T) { t.Parallel() + ctx := context.Background() winTime := 100 * time.Hour // we want everything to hang until we close the queue. @@ -87,15 +89,15 @@ func TestOpWindowClose(t *testing.T) { op2_1 := cg2.Add(1, &tsMsg{123, now}) op2_2 := cg2.Add(2, &tsMsg{111, now}) - window := NewOpWindow(context.Background(), 3, 3, winTime) + window := NewOpWindow(3, 3, winTime) - err := window.Enqueue(op1_1.Key, op1_1) + err := window.Enqueue(ctx, op1_1.Key, op1_1) assert.Equal(t, nil, err) - err = window.Enqueue(op2_1.Key, op2_1) + err = window.Enqueue(ctx, op2_1.Key, op2_1) assert.Equal(t, nil, err) - err = window.Enqueue(op1_2.Key, op1_2) + err = window.Enqueue(ctx, op1_2.Key, op1_2) assert.Equal(t, nil, err) - err = window.Enqueue(op2_2.Key, op2_2) + err = window.Enqueue(ctx, op2_2.Key, op2_2) assert.Equal(t, nil, err) var ops uint64 @@ -104,16 +106,13 @@ func TestOpWindowClose(t *testing.T) { for i := 0; i < workers; i++ { go func() { for { - e, ok := window.Dequeue() - if e != nil { - assert.True(t, ok) - atomic.AddUint64(&ops, 1) - } else { - assert.False(t, ok) - break + if _, err := window.Dequeue(ctx); err != nil { + require.ErrorIs(t, err, ErrQueueClosed) + atomic.AddUint64(&closes, 1) + return } + atomic.AddUint64(&ops, 1) } - atomic.AddUint64(&closes, 1) }() }