Skip to content

Commit

Permalink
OpWindow: Fix bugs, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mattayes committed Dec 21, 2023
1 parent 8d7d896 commit bf6bae1
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 171 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@

# coverage.txt (created by go.test.sh)
coverage.txt

# vscode settings
.vscode/settings.json
4 changes: 0 additions & 4 deletions opset.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package inflight

import "time"

// OpSet represents the set of Ops that have been merged in an OpQueue,
// It provides convenience functions for appending new Ops and for completing them.
type OpSet struct {
set []*Op
// used by the opWindow determine when it's ok to dequeue
enqueuedAt time.Time
}

func newOpSet(op *Op) *OpSet {
Expand Down
250 changes: 108 additions & 142 deletions opwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package inflight
import (
"container/list"
"context"
"fmt"
"sync"
"time"
)
Expand All @@ -11,125 +12,114 @@ import (
// that combines duplicate operations (queue entires) into sets
// that will be dequeued together.
//
// For example, If you enqueue an item with a key that already exists,
// For example, if you enqueue an item with a key that already exists,
// then that item will be appended to that key's set of items. Otherwise
// the item is inserted into the head of the list as a new item.
// the item is inserted into the back of the list as a new item.
//
// On Dequeue a SET is returned of all items that share a key in the
// On Dequeue the oldest OpSet is returned, containing all items that share a key in the
// queue. It blocks on dequeue if the queue is empty, but returns an
// error if the queue is full during enqueue.
type OpWindow struct {
cond *sync.Cond
ctx context.Context
can context.CancelFunc
mu sync.Mutex
emptyCond sync.Cond
fullCond sync.Cond

once sync.Once
done chan struct{}

depth int
width int
windowedBy int64
windowedBy time.Duration

q *list.List
entries map[ID]*OpSet
q *list.List // *queueItem
m map[ID]*queueItem
}

// NewOpWindow create a new OpWindow. Close by calling Close or cancaling
// the provided context.
func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow {
cond := sync.NewCond(&sync.Mutex{})
myctx, can := context.WithCancel(ctx)
// NewOpWindow create a new OpWindow.
func NewOpWindow(depth, width int, windowedBy time.Duration) *OpWindow {
q := &OpWindow{
cond: cond,
ctx: myctx,
can: can,
done: make(chan struct{}),
depth: depth,
width: width,
q: list.New(),
entries: map[ID]*OpSet{},
windowedBy: windowedBy.Nanoseconds(),
m: make(map[ID]*queueItem),
windowedBy: windowedBy,
}
go func() {
t := time.NewTicker(250 * time.Millisecond)
for {
select {
case <-q.ctx.Done():
return
case n := <-t.C:
// signal someone to wake up incase we have any Items falling
// out of the window.
q.cond.L.Lock()
if q.itemsReady(n) {
q.cond.Signal()
}
q.cond.L.Unlock()
}
}
}()
q.emptyCond.L = &q.mu
q.fullCond.L = &q.mu
return q
}

// Close releases resources associated with this callgroup, by canceling
// the context. The owner of this OpWindow should either call Close or cancel
// the context, both are equivalent.
// Close provides graceful shutdown: no new ops can be enqueued and,
// once drained, De
func (q *OpWindow) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

q.can()
// alert all dequeue calls that they should wake up and return.
q.windowedBy = 0 // turn off windowing so everything is dequeue
q.cond.Broadcast()
return
q.mu.Lock()
defer q.mu.Unlock()

q.once.Do(func() {
close(q.done)
q.windowedBy = 0 // turn off windowing so everything is dequeue
// alert all dequeue calls that they should wake up and return.
q.emptyCond.Broadcast()
q.fullCond.Broadcast()
})
}

// Len returns the number of uniq IDs in the queue, that is the depth of the
// queue.
func (q *OpWindow) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.mu.Lock()
defer q.mu.Unlock()
return q.q.Len()
}

// Enqueue add the op to the queue. If the ID already exists then the Op
// is added to the existing OpSet for this ID, otherwise it's inserted as
// Enqueue add the op to the queue.
// If the ID already exists then the Op is added to the existing OpSet for this ID.
// Otherwise if it's inserted as
// a new OpSet.
//
// Enqueue doesn't block if the queue if full, instead it returns a
// ErrQueueSaturated error.
func (q *OpWindow) Enqueue(id ID, op *Op) error {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.q.Len() >= q.depth {
return ErrQueueSaturatedDepth
func (q *OpWindow) Enqueue(ctx context.Context, id ID, op *Op) error {
q.mu.Lock()
defer q.mu.Unlock()

select {
case <-q.done:
return ErrQueueClosed
default:
}

set, ok := q.entries[id]
item, ok := q.m[id]
if !ok {
for q.q.Len() >= q.depth {
select {
case <-ctx.Done():
return fmt.Errorf("%w: %w", ErrQueueSaturatedDepth, ctx.Err())
case <-q.done:
return ErrQueueClosed
default:
q.fullCond.Wait()
}
}
// This is a new item, so we need to insert it into the queue.
q.newEntry(id, op)

// Signal one waiting go routine to wake up and Dequeue
// I believe we only need to signal if we enqueue a new item.
// Consider the following possible states the queue could be in :
// 1. if no one is currently waiting in Dequeue, the signal isn't
// needed and all items will be dequeued on the next call to
// Dequeue.
// 2. One or Many go-routines are waiting in Dequeue because it's
// empty, and calling Signal will wake up one. Which will
// dequeue the item and return.
// 3. At most One go-routine is in the act of Dequeueing existing
// items from the queue (i.e. only one can have the lock and be
// in the "if OK" condition within the forloop in Dequeue). In
// which cause the signal is ignored and after returning we
// return to condition (1) above.
// Note signaled waiting go-routines will not be able the acquire
// the condition lock until this method call returns, finishing
// its append of the new operation.
q.cond.Signal()
} else if len(set.Ops()) >= q.width {

item := &queueItem{
ID: id,
ProcessAt: time.Now().Add(q.windowedBy),
OpSet: newOpSet(op),
}
q.m[id] = item
q.q.PushBack(item)

q.emptyCond.Signal()
return nil
}
if len(item.OpSet.set) >= q.width {
return ErrQueueSaturatedWidth
} else {
set.append(op)
}

item.OpSet.append(op)
return nil
}

Expand All @@ -139,75 +129,51 @@ func (q *OpWindow) Enqueue(id ID, op *Op) error {
//
// If the OpWindow is closed, then Dequeue will return false
// for the second parameter.
func (q *OpWindow) Dequeue() (*OpSet, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
func (q *OpWindow) Dequeue(ctx context.Context) (*OpSet, error) {
q.mu.Lock()
defer q.mu.Unlock()

var item *queueItem
for item == nil {
elem := q.q.Front()
if elem == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-q.done:
return nil, ErrQueueClosed
default:
q.emptyCond.Wait()
continue
}

for {
if set, ok := q.dequeue(); ok {
return set, true
}
item = q.q.Remove(elem).(*queueItem) // next caller will wait for a different item
}

waitFor := time.Until(item.ProcessAt)
if waitFor > 0 {
q.mu.Unlock() // allow others to add to OpQueue while we wait
timer := time.NewTimer(waitFor)
select {
case <-q.ctx.Done():
return nil, false
default:
case <-q.done:
// process right away
timer.Stop()
case <-timer.C:
}
// release the lock and wait until signaled. On awake we'll acquire the lock.
// After wait acquires the lock we have to recheck the wait condition,
// because it's possible that someone else
// drained the queue while, we were reacquiring the lock.
q.cond.Wait()
}
}

type queElement struct {
id ID
enqueuedAtUnixN int64
}

func (q *OpWindow) newEntry(id ID, op *Op) {
set := newOpSet(op)
q.entries[id] = set

eq := &queElement{id, time.Now().UnixNano()}
q.q.PushBack(eq)
}

func (q *OpWindow) itemsReady(tim time.Time) bool {
elem := q.q.Front()
if elem == nil {
return false
q.mu.Lock()
}

eq := elem.Value.(*queElement)
qt := tim.UnixNano() - eq.enqueuedAtUnixN
ops := item.OpSet
delete(q.m, item.ID)
item = nil // gc

if qt < q.windowedBy {
return false
}
return true
q.fullCond.Signal()
return ops, nil
}

func (q *OpWindow) dequeue() (*OpSet, bool) {
elem := q.q.Front()
if elem == nil {
return nil, false
}

eq := elem.Value.(*queElement)
qt := time.Now().UnixNano() - eq.enqueuedAtUnixN
if qt < q.windowedBy {
return nil, false
}

q.q.Remove(elem)
id := eq.id

set, ok := q.entries[id]
if !ok {
panic("invariant broken: we dequeued a value that isn't in the map")
}
delete(q.entries, id)
return set, true
type queueItem struct {
ID ID
ProcessAt time.Time
OpSet *OpSet
}
Loading

0 comments on commit bf6bae1

Please sign in to comment.