diff --git a/.travis.yml b/.travis.yml index cfeae2b..69c0f96 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,7 @@ language: go go: - - 1.9.x - - 1.10.x + - master before_install: - go get -t -v ./... diff --git a/callgroup_test.go b/callgroup_test.go index 5d9c0a1..82eb95f 100644 --- a/callgroup_test.go +++ b/callgroup_test.go @@ -4,6 +4,7 @@ import ( "runtime" "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -17,8 +18,10 @@ func TestCompletion(t *testing.T) { reslen += len(finalState) }) - op1 := cg.Add(1, &tsMsg{123, 5, "user", 1234567}) - op2 := cg.Add(2, &tsMsg{123, 5, "user", 2222222}) + now := time.Now() + + op1 := cg.Add(1, &tsMsg{123, now}) + op2 := cg.Add(2, &tsMsg{123, now}) assert.Equal(t, 0, completed) assert.Equal(t, 0, reslen) @@ -41,8 +44,10 @@ func TestConcurrentDone(t *testing.T) { }) ops := []*Op{} + now := time.Now() + for i := 0; i < 1000; i++ { - ops = append(ops, cg.Add(uint64(i), &tsMsg{123, 5, "user", uint64(i)})) + ops = append(ops, cg.Add(uint64(i), &tsMsg{123, now})) } wgend := sync.WaitGroup{} @@ -65,8 +70,6 @@ func TestConcurrentDone(t *testing.T) { } type tsMsg struct { - Aid int - Gen int - Table string - RefsID uint64 + ID uint64 + Time time.Time } diff --git a/go.test.sh b/go.test.sh index 80d33f9..ddb6844 100755 --- a/go.test.sh +++ b/go.test.sh @@ -1,10 +1,10 @@ #!/usr/bin/env bash set -e -echo "" > coverage.txt +echo "" > coverage.txt for d in $(go list ./... | grep -v vendor); do - go test -race -coverprofile=profile.out -covermode=atomic $d + go test -count=5 -v -race -coverprofile=profile.out -covermode=atomic $d if [ -f profile.out ]; then cat profile.out >> coverage.txt rm profile.out diff --git a/opqueue.go b/opqueue.go index 286b2e2..e3956af 100644 --- a/opqueue.go +++ b/opqueue.go @@ -18,46 +18,13 @@ var ( ErrQueueSaturatedWidth = fmt.Errorf("queue is saturated (width)") ) -// OpSet represents the set of Ops that have been merged in an OpQueue, -// It provides convenience functions for appending new Ops and for completing them. -type OpSet struct { - set []*Op -} - -func newOpSet() *OpSet { - return &OpSet{ - set: []*Op{}, - } -} - -func (os *OpSet) append(op *Op) { - os.set = append(os.set, op) -} - -// Ops get the list of ops in this set. -func (os *OpSet) Ops() []*Op { - return os.set -} - -// FinishAll a convenience func that calls finish on each Op in the set, passing the -// results or error to all the Ops in the OpSet. -// -// NOTE: The call group that owns this OP will not call it's finish function until all -// Ops are complete. And one callgroup could be spread over multiple op sets or -// multiple op queues. -func (os *OpSet) FinishAll(err error, resp interface{}) { - for _, op := range os.set { - op.Finish(err, resp) - } -} - // OpQueue is a thread-safe duplicate operation suppression queue, that combines // duplicate operations (queue entires) into sets that will be dequeued together. - +// // For example, If you enqueue an item with a key that already exists, then that // item will be appended to that key's set of items. Otherwise the item is // inserted into the head of the list as a new item. - +// // On Dequeue a SET is returned of all items that share a key in the queue. // It blocks on dequeue if the queue is empty, but returns an error if the // queue is full during enqueue. @@ -72,6 +39,7 @@ type OpQueue struct { entries map[ID]*OpSet } +// NewOpQueue create a new OpQueue. func NewOpQueue(depth, width int) *OpQueue { cond := sync.NewCond(&sync.Mutex{}) myctx, can := context.WithCancel(context.Background()) @@ -123,7 +91,9 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { set, ok := q.entries[id] if !ok { - set = newOpSet() + set = newOpSet(op) + q.entries[id] = set + // This is a new item, so we need to insert it into the queue. q.enqueue(id) @@ -144,15 +114,11 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error { // the condition lock until this method call returns, finishing // its append of the new operation. q.cond.Signal() - } - - if len(set.Ops()) >= q.width { + } else if len(set.Ops()) >= q.width { return ErrQueueSaturatedWidth + } else { + set.append(op) } - - set.append(op) - q.entries[id] = set - return nil } @@ -176,16 +142,11 @@ func (q *OpQueue) Dequeue() (*OpSet, bool) { return nil, false default: } - // release the lock and wait until signaled. On awake we'll require the lock. - // After wait requires the lock we have to recheck the wait condition - // (calling q.dequeue), because it's possible that someone else + // release the lock and wait until signaled. On awake we'll acquire the lock. + // After wait acquires the lock we have to recheck the wait condition, + // because it's possible that someone else // drained the queue while, we were reacquiring the lock. q.cond.Wait() - select { - case <-q.ctx.Done(): - return nil, false - default: - } } } diff --git a/opqueue_test.go b/opqueue_test.go index f3a59bc..731346e 100644 --- a/opqueue_test.go +++ b/opqueue_test.go @@ -3,6 +3,7 @@ package inflight import ( "context" "sync" + "sync/atomic" "testing" "time" @@ -29,10 +30,11 @@ func TestOpQueue(t *testing.T) { completed2++ }) - op1_1 := cg1.Add(1, &tsMsg{123, 5, "user", 1234567}) - op1_2 := cg1.Add(2, &tsMsg{111, 6, "user", 2222222}) - op2_1 := cg2.Add(1, &tsMsg{123, 5, "user", 1234567}) - op2_2 := cg2.Add(2, &tsMsg{111, 6, "user", 2222222}) + now := time.Now() + op1_1 := cg1.Add(1, &tsMsg{123, now}) + op1_2 := cg1.Add(2, &tsMsg{111, now}) + op2_1 := cg2.Add(1, &tsMsg{123, now}) + op2_2 := cg2.Add(2, &tsMsg{111, now}) opq := NewOpQueue(10, 10) defer opq.Close() @@ -85,9 +87,10 @@ func TestOpQueueClose(t *testing.T) { }) opq := NewOpQueue(10, 10) + now := time.Now() for i := 0; i < 9; i++ { - op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) err := opq.Enqueue(op.Key, op) assert.Equal(t, nil, err) } @@ -127,8 +130,10 @@ func TestOpQueueFullDepth(t *testing.T) { succuess := 0 depthErrors := 0 widthErrors := 0 + now := time.Now() + for i := 0; i < 100; i++ { - op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) err := opq.Enqueue(op.Key, op) switch err { case nil: @@ -171,8 +176,10 @@ func TestOpQueueFullWidth(t *testing.T) { succuess := 0 depthErrors := 0 widthErrors := 0 + now := time.Now() + for i := 0; i < 100; i++ { - op := cg1.Add(1, &tsMsg{i, i, "user", 2222222}) + op := cg1.Add(1, &tsMsg{uint64(i), now}) err := opq.Enqueue(op.Key, op) switch err { case nil: @@ -227,6 +234,8 @@ func TestOpQueueForRaceDetection(t *testing.T) { finishLine, finish := context.WithCancel(context.Background()) dequeFinishLine, deqFinish := context.WithCancel(context.Background()) const concurrency = 2 + now := time.Now() + for w := 0; w < concurrency; w++ { go func(w int) { startingLine1.Wait() @@ -237,7 +246,7 @@ func TestOpQueueForRaceDetection(t *testing.T) { return default: } - op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + op := cg1.Add(uint64(i), &tsMsg{uint64(i), now}) err := opq.Enqueue(op.Key, op) switch err { case nil: @@ -302,5 +311,58 @@ func TestOpQueueForRaceDetection(t *testing.T) { // NOTE: I get the following performance on my laptop: // opqueue_test.go:275: enqueue errors: 137075 mergedMsgs:2553 enqueueCnt:231437 dequeueCnt:231437 rate:115718 msgs/sec // Over 100k msg a sec is more than fast enough for linkgrid... - t.Logf("enqueue errors: [depth:%v width:%v] mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", depthErrorCnt.Get(), widthErrorCnt.Get(), mergeCnt.Get(), enq, deq, enq/runtime) + t.Logf("Run Stats [note errors are expect for this test]") + t.Logf(" enqueue errors:[depth-errs:%v width-errs:%v]", depthErrorCnt.Get(), widthErrorCnt.Get()) + t.Logf(" mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", mergeCnt.Get(), enq, deq, enq/runtime) +} + +func TestOpWindowCloseConcurrent(t *testing.T) { + t.Parallel() + + cg1 := NewCallGroup(func(finalState map[ID]*Response) {}) + cg2 := NewCallGroup(func(finalState map[ID]*Response) {}) + + now := time.Now() + + op1 := cg1.Add(1, &tsMsg{123, now}) + op2 := cg2.Add(2, &tsMsg{321, now}) + + oq := NewOpQueue(300, 500) + + var ops uint64 + var closes uint64 + const workers int = 12 + for i := 0; i < workers; i++ { + go func() { + for { + e, ok := oq.Dequeue() + if e != nil { + assert.True(t, ok) + atomic.AddUint64(&ops, 1) + } else { + assert.False(t, ok) + break + } + } + atomic.AddUint64(&closes, 1) + }() + } + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(0), atomic.LoadUint64(&ops)) // nothing should have been dequeued yet + assert.Equal(t, uint64(0), atomic.LoadUint64(&closes)) + + err := oq.Enqueue(op1.Key, op1) + assert.Equal(t, nil, err) + err = oq.Enqueue(op2.Key, op2) + assert.Equal(t, nil, err) + + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued + assert.Equal(t, uint64(0), atomic.LoadUint64(&closes)) + + oq.Close() + time.Sleep(100 * time.Millisecond) + assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // we still only had 2 uniq keys seen + assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes)) } diff --git a/opset.go b/opset.go new file mode 100644 index 0000000..b95b606 --- /dev/null +++ b/opset.go @@ -0,0 +1,38 @@ +package inflight + +import "time" + +// OpSet represents the set of Ops that have been merged in an OpQueue, +// It provides convenience functions for appending new Ops and for completing them. +type OpSet struct { + set []*Op + // used by the opWindow determine when it's ok to dequeue + enqueuedAt time.Time +} + +func newOpSet(op *Op) *OpSet { + return &OpSet{ + set: []*Op{op}, + } +} + +func (os *OpSet) append(op *Op) { + os.set = append(os.set, op) +} + +// Ops get the list of ops in this set. +func (os *OpSet) Ops() []*Op { + return os.set +} + +// FinishAll a convenience func that calls finish on each Op in the set, passing the +// results or error to all the Ops in the OpSet. +// +// NOTE: The call group that owns this OP will not call it's finish function until all +// Ops are complete. And one callgroup could be spread over multiple op sets or +// multiple op queues. +func (os *OpSet) FinishAll(err error, resp interface{}) { + for _, op := range os.set { + op.Finish(err, resp) + } +} diff --git a/opwindow.go b/opwindow.go new file mode 100644 index 0000000..d77899f --- /dev/null +++ b/opwindow.go @@ -0,0 +1,213 @@ +package inflight + +import ( + "container/list" + "context" + "sync" + "time" +) + +// OpWindow is a thread-safe duplicate operation suppression queue, +// that combines duplicate operations (queue entires) into sets +// that will be dequeued together. +// +// For example, If you enqueue an item with a key that already exists, +// then that item will be appended to that key's set of items. Otherwise +// the item is inserted into the head of the list as a new item. +// +// On Dequeue a SET is returned of all items that share a key in the +// queue. It blocks on dequeue if the queue is empty, but returns an +// error if the queue is full during enqueue. +type OpWindow struct { + cond *sync.Cond + ctx context.Context + can context.CancelFunc + + depth int + width int + windowedBy int64 + + q *list.List + entries map[ID]*OpSet +} + +// NewOpWindow create a new OpWindow. Close by calling Close or cancaling +// the provided context. +func NewOpWindow(ctx context.Context, depth, width int, windowedBy time.Duration) *OpWindow { + cond := sync.NewCond(&sync.Mutex{}) + myctx, can := context.WithCancel(ctx) + q := &OpWindow{ + cond: cond, + ctx: myctx, + can: can, + depth: depth, + width: width, + q: list.New(), + entries: map[ID]*OpSet{}, + windowedBy: windowedBy.Nanoseconds(), + } + go func() { + t := time.NewTicker(250 * time.Millisecond) + for { + select { + case <-q.ctx.Done(): + return + case n := <-t.C: + // signal someone to wake up incase we have any Items falling + // out of the window. + q.cond.L.Lock() + if q.itemsReady(n) { + q.cond.Signal() + } + q.cond.L.Unlock() + } + } + }() + return q +} + +// Close releases resources associated with this callgroup, by canceling +// the context. The owner of this OpWindow should either call Close or cancel +// the context, both are equivalent. +func (q *OpWindow) Close() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.can() + // alert all dequeue calls that they should wake up and return. + q.windowedBy = 0 // turn off windowing so everything is dequeue + q.cond.Broadcast() + return +} + +// Len returns the number of uniq IDs in the queue, that is the depth of the +// queue. +func (q *OpWindow) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.q.Len() +} + +// Enqueue add the op to the queue. If the ID already exists then the Op +// is added to the existing OpSet for this ID, otherwise it's inserted as +// a new OpSet. +// +// Enqueue doesn't block if the queue if full, instead it returns a +// ErrQueueSaturated error. +func (q *OpWindow) Enqueue(id ID, op *Op) error { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if q.q.Len() >= q.depth { + return ErrQueueSaturatedDepth + } + + set, ok := q.entries[id] + if !ok { + set = newOpSet(op) + q.entries[id] = set + + // This is a new item, so we need to insert it into the queue. + q.enqueue(id) + + // Signal one waiting go routine to wake up and Dequeue + // I believe we only need to signal if we enqueue a new item. + // Consider the following possible states the queue could be in : + // 1. if no one is currently waiting in Dequeue, the signal isn't + // needed and all items will be dequeued on the next call to + // Dequeue. + // 2. One or Many go-routines are waiting in Dequeue because it's + // empty, and calling Signal will wake up one. Which will + // dequeue the item and return. + // 3. At most One go-routine is in the act of Dequeueing existing + // items from the queue (i.e. only one can have the lock and be + // in the "if OK" condition within the forloop in Dequeue). In + // which cause the signal is ignored and after returning we + // return to condition (1) above. + // Note signaled waiting go-routines will not be able the acquire + // the condition lock until this method call returns, finishing + // its append of the new operation. + q.cond.Signal() + } else if len(set.Ops()) >= q.width { + return ErrQueueSaturatedWidth + } else { + set.append(op) + } + return nil +} + +// Dequeue removes the oldest OpSet from the queue and returns it. +// Dequeue will block if the Queue is empty. An Enqueue will wake the +// go routine up and it will continue on. +// +// If the OpWindow is closed, then Dequeue will return false +// for the second parameter. +func (q *OpWindow) Dequeue() (*OpSet, bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + for { + if set, ok := q.dequeue(); ok { + return set, true + } + + select { + case <-q.ctx.Done(): + return nil, false + default: + } + // release the lock and wait until signaled. On awake we'll acquire the lock. + // After wait acquires the lock we have to recheck the wait condition, + // because it's possible that someone else + // drained the queue while, we were reacquiring the lock. + q.cond.Wait() + } +} + +type queElement struct { + id ID + enqueuedAtUnixN int64 +} + +func (q *OpWindow) enqueue(id ID) { + eq := &queElement{id, time.Now().UnixNano()} + q.q.PushBack(eq) +} + +func (q *OpWindow) itemsReady(tim time.Time) bool { + elem := q.q.Front() + if elem == nil { + return false + } + + eq := elem.Value.(*queElement) + qt := tim.UnixNano() - eq.enqueuedAtUnixN + + if qt < q.windowedBy { + return false + } + return true +} + +func (q *OpWindow) dequeue() (*OpSet, bool) { + elem := q.q.Front() + if elem == nil { + return nil, false + } + + eq := elem.Value.(*queElement) + qt := time.Now().UnixNano() - eq.enqueuedAtUnixN + if qt < q.windowedBy { + return nil, false + } + + q.q.Remove(elem) + id := eq.id + + set, ok := q.entries[id] + if !ok { + panic("invariant broken: we dequeued a value that isn't in the map") + } + delete(q.entries, id) + return set, true +} diff --git a/opwindow_test.go b/opwindow_test.go new file mode 100644 index 0000000..e2ef1e8 --- /dev/null +++ b/opwindow_test.go @@ -0,0 +1,127 @@ +package inflight + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOpWindow(t *testing.T) { + t.Parallel() + + winTimes := []time.Duration{ + time.Duration(0), + 1 * time.Millisecond, + 10 * time.Millisecond, + 100 * time.Millisecond, + 500 * time.Millisecond, + 1 * time.Second, + } + + for _, winTimeT := range winTimes { + winTime := winTimeT // scope it locally so it can be correctly captured + t.Run(fmt.Sprintf("windowed_by_%v", winTime), func(t *testing.T) { + t.Parallel() + completed1 := 0 + completed2 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + cg2 := NewCallGroup(func(finalState map[ID]*Response) { + completed2++ + }) + + now := time.Now() + + op1_1 := cg1.Add(1, &tsMsg{123, now}) + op1_2 := cg1.Add(2, &tsMsg{111, now}) + op2_1 := cg2.Add(1, &tsMsg{123, now}) + op2_2 := cg2.Add(2, &tsMsg{111, now}) + + window := NewOpWindow(context.Background(), 3, 3, winTime) + + defer window.Close() + st := time.Now() + { + err := window.Enqueue(op1_1.Key, op1_1) + assert.Equal(t, nil, err) + err = window.Enqueue(op2_1.Key, op2_1) + assert.Equal(t, nil, err) + err = window.Enqueue(op1_2.Key, op1_2) + assert.Equal(t, nil, err) + err = window.Enqueue(op2_2.Key, op2_2) + assert.Equal(t, nil, err) + } + + require.Equal(t, 2, window.Len()) // only 2 unique keys + + _, ok := window.Dequeue() + assert.True(t, ok) + _, ok = window.Dequeue() + assert.True(t, ok) + + rt := time.Now().Sub(st) + assert.Greater(t, rt, winTime) + }) + } +} + +func TestOpWindowClose(t *testing.T) { + t.Parallel() + + winTime := 100 * time.Hour // we want everything to hang until we close the queue. + + cg1 := NewCallGroup(func(finalState map[ID]*Response) {}) + cg2 := NewCallGroup(func(finalState map[ID]*Response) {}) + + now := time.Now() + + op1_1 := cg1.Add(1, &tsMsg{123, now}) + op1_2 := cg1.Add(2, &tsMsg{111, now}) + op2_1 := cg2.Add(1, &tsMsg{123, now}) + op2_2 := cg2.Add(2, &tsMsg{111, now}) + + window := NewOpWindow(context.Background(), 3, 3, winTime) + + err := window.Enqueue(op1_1.Key, op1_1) + assert.Equal(t, nil, err) + err = window.Enqueue(op2_1.Key, op2_1) + assert.Equal(t, nil, err) + err = window.Enqueue(op1_2.Key, op1_2) + assert.Equal(t, nil, err) + err = window.Enqueue(op2_2.Key, op2_2) + assert.Equal(t, nil, err) + + var ops uint64 + var closes uint64 + const workers int = 12 + for i := 0; i < workers; i++ { + go func() { + for { + e, ok := window.Dequeue() + if e != nil { + assert.True(t, ok) + atomic.AddUint64(&ops, 1) + } else { + assert.False(t, ok) + break + } + } + atomic.AddUint64(&closes, 1) + }() + } + + time.Sleep(1000 * time.Millisecond) + assert.Equal(t, uint64(0), atomic.LoadUint64(&ops)) // nothing should have been dequeued yet + + window.Close() + time.Sleep(1000 * time.Millisecond) + assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes)) + assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued +}