From 5047763dd891dd1866e5777817527aeb581c9ea2 Mon Sep 17 00:00:00 2001 From: Ethan Feldman Date: Fri, 23 Jul 2021 22:19:34 -0400 Subject: [PATCH 01/14] Fix for leaking publication log buffers after close --- aeron/clientconductor.go | 23 ++++++++++------------- aeron/logbuffer/logbuffers.go | 13 +++++++++++++ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/aeron/clientconductor.go b/aeron/clientconductor.go index f73ff5c0..fd19839d 100644 --- a/aeron/clientconductor.go +++ b/aeron/clientconductor.go @@ -19,6 +19,7 @@ package aeron import ( "errors" "fmt" + "io" "log" "runtime" "sync" @@ -99,7 +100,7 @@ func (sub *subscriptionStateDefn) Init(ch string, regID int64, sID int32, now in type lingerResourse struct { lastTime int64 - resource *Image + resource io.Closer } type ClientConductor struct { @@ -258,12 +259,6 @@ func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64 cc.adminLock.Lock() defer cc.adminLock.Unlock() - for _, pub := range cc.pubs { - if pub.streamID == streamID && pub.channel == channel { - return pub.regID - } - } - now := time.Now().UnixNano() regID := cc.driverProxy.AddPublication(channel, streamID) @@ -285,12 +280,6 @@ func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int3 cc.adminLock.Lock() defer cc.adminLock.Unlock() - for _, pub := range cc.pubs { - if pub.streamID == streamID && pub.channel == channel { - return pub.regID - } - } - now := time.Now().UnixNano() regID := cc.driverProxy.AddExclusivePublication(channel, streamID) @@ -347,6 +336,8 @@ func (cc *ClientConductor) releasePublication(regID int64) { cc.adminLock.Lock() defer cc.adminLock.Unlock() + now := time.Now().UnixNano() + pubcnt := len(cc.pubs) for i, pub := range cc.pubs { if pub != nil && pub.regID == regID { @@ -355,6 +346,10 @@ func (cc *ClientConductor) releasePublication(regID int64) { cc.pubs[i] = cc.pubs[pubcnt-1] cc.pubs[pubcnt-1] = nil pubcnt-- + + if pub.buffers.DecRef() == 0 { + cc.lingeringResources <- lingerResourse{now, pub.buffers} + } } } cc.pubs = cc.pubs[:pubcnt] @@ -470,6 +465,7 @@ func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, pos pubDef.posLimitCounterID = posLimitCounterID pubDef.channelStatusIndicatorID = channelStatusIndicatorID pubDef.buffers = logbuffer.Wrap(logFileName) + pubDef.buffers.IncRef() pubDef.origRegID = origRegID logger.Debugf("Updated publication: %v", pubDef) @@ -498,6 +494,7 @@ func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID i pubDef.posLimitCounterID = posLimitCounterID pubDef.channelStatusIndicatorID = channelStatusIndicatorID pubDef.buffers = logbuffer.Wrap(logFileName) + pubDef.buffers.IncRef() pubDef.origRegID = origRegID logger.Debugf("Updated publication: %v", pubDef) diff --git a/aeron/logbuffer/logbuffers.go b/aeron/logbuffer/logbuffers.go index 3500312b..6db4a71e 100644 --- a/aeron/logbuffer/logbuffers.go +++ b/aeron/logbuffer/logbuffers.go @@ -30,6 +30,7 @@ type LogBuffers struct { mmapFiles []*memmap.File buffers [PartitionCount + 1]atomic.Buffer meta LogBufferMetaData + refCount int } // Wrap is the factory method wrapping the LogBuffers structure around memory mapped file @@ -121,3 +122,15 @@ func (logBuffers *LogBuffers) Close() error { logBuffers.mmapFiles = nil return err } + +// IncRef increments the reference count. Returns the current reference count after increment. +func (logBuffers *LogBuffers) IncRef() int { + logBuffers.refCount++ + return logBuffers.refCount +} + +// DecRef decrements the reference count. Returns the current reference counter after decrement. +func (logBuffers *LogBuffers) DecRef() int { + logBuffers.refCount-- + return logBuffers.refCount +} From f3410b83e8b1c559143322fe91534f2dc217990b Mon Sep 17 00:00:00 2001 From: Ethan Feldman Date: Sat, 14 Aug 2021 16:17:59 -0400 Subject: [PATCH 02/14] Add Header.SetSessionId --- aeron/logbuffer/header.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aeron/logbuffer/header.go b/aeron/logbuffer/header.go index 207fd7e0..b4d3a257 100644 --- a/aeron/logbuffer/header.go +++ b/aeron/logbuffer/header.go @@ -94,6 +94,11 @@ func (hdr *Header) SetReservedValue(reservedValue int64) *Header { return hdr } +func (hdr *Header) SetSessionId(value int32) *Header { + hdr.buffer.PutInt32(sessionIdOffset(hdr.offset), value) + return hdr +} + // computePosition computes the current position in absolute number of bytes. func computePosition(activeTermId int32, termOffset int32, positionBitsToShift int32, initialTermId int32) int64 { termCount := activeTermId - initialTermId // copes with negative activeTermId on rollover From 9161a4d5703c2e3d422bf359854e4f0b0c2b0a94 Mon Sep 17 00:00:00 2001 From: Ethan Feldman Date: Fri, 8 Oct 2021 05:14:06 -0400 Subject: [PATCH 03/14] Upgrade HdrHstogram and testify dependencies, fixes #43. --- examples/ping/ping.go | 2 +- go.mod | 4 ++-- go.sum | 55 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/examples/ping/ping.go b/examples/ping/ping.go index f18210da..a870c0f5 100644 --- a/examples/ping/ping.go +++ b/examples/ping/ping.go @@ -19,7 +19,7 @@ package main import ( "flag" "fmt" - "github.com/codahale/hdrhistogram" + "github.com/HdrHistogram/hdrhistogram-go" "github.com/lirm/aeron-go/aeron" "github.com/lirm/aeron-go/aeron/atomic" "github.com/lirm/aeron-go/aeron/logbuffer" diff --git a/go.mod b/go.mod index e0eb9d8d..ead6d570 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/lirm/aeron-go go 1.12 require ( - github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd + github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/edsrzf/mmap-go v1.0.0 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 - github.com/stretchr/testify v1.3.0 + github.com/stretchr/testify v1.7.0 golang.org/x/sys v0.0.0-20190609082536-301114b31cce // indirect ) diff --git a/go.sum b/go.sum index 565481c9..476e1636 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,24 @@ -github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= -github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= +dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= +github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -12,5 +27,41 @@ github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190609082536-301114b31cce h1:CQakrGkKbydnUmt7cFIlmQ4lNQiqdTPt6xzXij4nYCc= golang.org/x/sys v0.0.0-20190609082536-301114b31cce/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= From 62e2784834851cff1531986d27bc5950590d9fe1 Mon Sep 17 00:00:00 2001 From: billsegall Date: Thu, 16 Dec 2021 15:37:52 +1000 Subject: [PATCH 04/14] Poll() callback changes Switch the poll() context from being an int64 to being an arbitary interface{} Use that to pass a new PollContext struct so that we can reference the individual archive's control structure as well as the correlationID. This will provide us with more ability to pass events to per-archive Listeners in the future amongst other things. --- aeron/image.go | 2 +- aeron/logbuffer/term/reader.go | 4 +- aeron/subscription.go | 2 +- archive/archive.go | 4 +- archive/control.go | 75 +++++++++++++++++++++++----------- 5 files changed, 58 insertions(+), 29 deletions(-) diff --git a/aeron/image.go b/aeron/image.go index ef4e27d1..59030ea0 100644 --- a/aeron/image.go +++ b/aeron/image.go @@ -118,7 +118,7 @@ func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int { return result } -func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int { +func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, context interface{}, fragmentLimit int) int { if image.IsClosed() { return 0 } diff --git a/aeron/logbuffer/term/reader.go b/aeron/logbuffer/term/reader.go index 60666c46..d39ead79 100644 --- a/aeron/logbuffer/term/reader.go +++ b/aeron/logbuffer/term/reader.go @@ -56,10 +56,10 @@ func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, } // FragmentHandlerWithContext provides a FragmentHandler with context -type FragmentHandlerWithContext func(context int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) +type FragmentHandlerWithContext func(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) // ReadWithContext as for Read() but woth a contextual argument -func ReadWithContext(context int64, termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandlerWithContext, fragmentsLimit int, +func ReadWithContext(context interface{}, termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandlerWithContext, fragmentsLimit int, header *logbuffer.Header) (int32, int) { capacity := termBuffer.Capacity() diff --git a/aeron/subscription.go b/aeron/subscription.go index 1214aa43..ed33fa96 100644 --- a/aeron/subscription.go +++ b/aeron/subscription.go @@ -103,7 +103,7 @@ func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) i } // PollWithContext as for Poll() but provides an integer argument for passing contextual information -func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int { +func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext, context interface{}, fragmentLimit int) int { img := sub.images.Get() length := len(img) diff --git a/archive/archive.go b/archive/archive.go index 1839fc8d..032430ee 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -283,8 +283,10 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) { } start := time.Now() + pollContext := PollContext{archive.Control, correlationID} + for archive.Control.State.state != ControlStateConnected && archive.Control.State.err == nil { - fragments := archive.Control.PollWithContext(ConnectionControlFragmentHandler, correlationID, 1) + fragments := archive.Control.PollWithContext(ConnectionControlFragmentHandler, &pollContext, 1) if fragments > 0 { logger.Debugf("Read %d fragment(s)\n", fragments) } diff --git a/archive/control.go b/archive/control.go index dff10f2e..d59b252f 100644 --- a/archive/control.go +++ b/archive/control.go @@ -48,6 +48,14 @@ type ControlResults struct { FragmentsReceived int } +// PollContext contains the information we'll need in the image Poll() +// callback to match against our request or for async events to invoke +// the appropriate listener +type PollContext struct { + control *Control + correlationID int64 +} + // Archive Connection State used internally for connection establishment const ( ControlStateError = -1 @@ -102,8 +110,14 @@ func init() { // The current subscription handler doesn't provide a mechanism for passing context // so we return data via the control's Results -func controlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { - logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", correlationID, offset, length, header) +func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + pollContext, ok := context.(*PollContext) + if !ok { + logger.Errorf("context conversion failed") + return + } + + logger.Debugf("controlFragmentHandler: correlationID:%d offset:%d length:%d header:%#v", pollContext.correlationID, offset, length, header) var hdr codecs.SbeGoMessageHeader buf := new(bytes.Buffer) @@ -121,13 +135,13 @@ func controlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset i } // Look up our control - c, ok := correlations.Load(correlationID) + c, ok := correlations.Load(pollContext.correlationID) if !ok { // something has gone horribly wrong and we can't correlate if Listeners.ErrorListener != nil { - Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", correlationID)) + Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) } - logger.Debugf("failed to locate control via correlationID %d", correlationID) + logger.Debugf("failed to locate control via correlationID %d", pollContext.correlationID) return } control := c.(*Control) @@ -147,7 +161,7 @@ func controlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset i } // Check this was for us - if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == correlationID { + if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == pollContext.correlationID { // Set our state to let the caller of Poll() which triggered this know they have something // We're basically finished so prepare our OOB return values and log some info if we can logger.Debugf("controlFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId) @@ -185,8 +199,13 @@ func controlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset i // ConnectionControlFragmentHandler is the connection handling specific fragment handler. // This mechanism only alows us to pass results back via global state which we do in control.State -func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { - logger.Debugf("ConnectionControlFragmentHandler: correlationID:%d offset:%d length: %d header: %#v", correlationID, offset, length, header) +func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + pollContext, ok := context.(*PollContext) + if !ok { + logger.Errorf("Context conversion failed") + return + } + logger.Debugf("ConnectionControlFragmentHandler: correlationID:%d offset:%d length: %d header: %#v", pollContext.correlationID, offset, length, header) var hdr codecs.SbeGoMessageHeader @@ -217,7 +236,7 @@ func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer return } - // Look it up + // Look this message up c, ok := correlations.Load(controlResponse.CorrelationId) if !ok { logger.Debugf("connectionControlFragmentHandler/controlResponse: ignoring correlationID=%d [%s]\n%#v", controlResponse.CorrelationId, string(controlResponse.ErrorMessage), controlResponse) @@ -226,7 +245,7 @@ func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer control := c.(*Control) // Check this was for us - if controlResponse.CorrelationId == correlationID { + if controlResponse.CorrelationId == pollContext.correlationID { // Check result if controlResponse.Code != codecs.ControlResponseCode.OK { control.State.state = ControlStateError @@ -269,7 +288,7 @@ func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer logger.Infof("ControlFragmentHandler: challenge:%s, session:%d, correlationID:%d", challenge.EncodedChallenge, challenge.ControlSessionId, challenge.CorrelationId) - // Look it up + // Look this message up c, ok := correlations.Load(challenge.CorrelationId) if !ok { logger.Debugf("connectionControlFragmentHandler/controlResponse: ignoring correlationID=%d", challenge.CorrelationId) @@ -278,7 +297,7 @@ func ConnectionControlFragmentHandler(correlationID int64, buffer *atomic.Buffer control := c.(*Control) // Check this was for us - if challenge.CorrelationId == correlationID { + if challenge.CorrelationId == pollContext.correlationID { // Check the challenge is expected iff our option for this is not nil if control.archive.Options.AuthChallenge != nil { @@ -327,7 +346,7 @@ func (control *Control) Poll(handler term.FragmentHandler, fragmentLimit int) in } // PollWithContext provides a Poll() with a context argument -func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int { +func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, context *PollContext, fragmentLimit int) int { // Update our globals in case they've changed so we use the current state in our callback rangeChecking = control.archive.Options.RangeChecking @@ -354,9 +373,10 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i // continually poll until we get our response or timeout // without having completed and treat that as an error start := time.Now() + context := PollContext{control, correlationID} for { - ret := control.PollWithContext(controlFragmentHandler, correlationID, 1) + ret := control.PollWithContext(controlFragmentHandler, &context, 1) // Check result if control.Results.IsPollComplete { @@ -390,8 +410,14 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i // DescriptorFragmentHandler is used to poll for descriptors (both recording and subscription) // The current subscription handler doesn't provide a mechanism for passing a context // so we return data via the control's Results -func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { - // logger.Debugf("DescriptorFragmentHandler: correlationID:%d offset:%d length: %d header: %#v\n", correlationID, offset, length, header) +func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + pollContext, ok := context.(*PollContext) + if !ok { + logger.Errorf("context conversion failed") + return + } + + // logger.Debugf("DescriptorFragmentHandler: correlationID:%d offset:%d length: %d header: %#v\n", pollContext.correlationID, offset, length, header) var hdr codecs.SbeGoMessageHeader @@ -410,13 +436,13 @@ func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offse } // Look up our control - c, ok := correlations.Load(correlationID) + c, ok := correlations.Load(pollContext.correlationID) if !ok { // something has gone horribly wrong and we can't correlate if Listeners.ErrorListener != nil { - Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", correlationID)) + Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) } - logger.Debugf("failed to locate control via correlationID %d", correlationID) + logger.Debugf("failed to locate control via correlationID %d", pollContext.correlationID) return } control := c.(*Control) @@ -436,12 +462,12 @@ func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offse logger.Debugf("RecordingDescriptor: %#v", recordingDescriptor) // Check this was for us - if recordingDescriptor.ControlSessionId == control.archive.SessionID && recordingDescriptor.CorrelationId == correlationID { + if recordingDescriptor.ControlSessionId == control.archive.SessionID && recordingDescriptor.CorrelationId == pollContext.correlationID { // Set our state to let the caller of Poll() which triggered this know they have something control.Results.RecordingDescriptors = append(control.Results.RecordingDescriptors, recordingDescriptor) control.Results.FragmentsReceived++ } else { - logger.Debugf("descriptorFragmentHandler/recordingDescriptor ignoring sessionID:%d, correlationID:%d", recordingDescriptor.ControlSessionId, recordingDescriptor.CorrelationId) + logger.Debugf("descriptorFragmentHandler/recordingDescriptor ignoring sessionID:%d, pollContext.correlationID:%d", recordingDescriptor.ControlSessionId, recordingDescriptor.CorrelationId) } case codecIds.recordingSubscriptionDescriptor: @@ -457,7 +483,7 @@ func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offse } // Check this was for us - if recordingSubscriptionDescriptor.ControlSessionId == control.archive.SessionID && recordingSubscriptionDescriptor.CorrelationId == correlationID { + if recordingSubscriptionDescriptor.ControlSessionId == control.archive.SessionID && recordingSubscriptionDescriptor.CorrelationId == pollContext.correlationID { // Set our state to let the caller of Poll() which triggered this know they have something control.Results.RecordingSubscriptionDescriptors = append(control.Results.RecordingSubscriptionDescriptors, recordingSubscriptionDescriptor) control.Results.FragmentsReceived++ @@ -478,7 +504,7 @@ func DescriptorFragmentHandler(correlationID int64, buffer *atomic.Buffer, offse } // Check this was for us - if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == correlationID { + if controlResponse.ControlSessionId == control.archive.SessionID && controlResponse.CorrelationId == pollContext.correlationID { // Set our state to let the caller of Poll() which triggered this know they have something // We're basically finished so prepare our OOB return values and log some info if we can logger.Debugf("descriptorFragmentHandler/controlResponse: received for sessionID:%d, correlationID:%d", controlResponse.ControlSessionId, controlResponse.CorrelationId) @@ -534,10 +560,11 @@ func (control *Control) PollForDescriptors(correlationID int64, sessionID int64, start := time.Now() descriptorCount := 0 + pollContext := PollContext{control, correlationID} for !control.Results.IsPollComplete { logger.Debugf("PollForDescriptors(%d:%d, %d)", correlationID, sessionID, int(fragmentsWanted)-descriptorCount) - fragments := control.PollWithContext(DescriptorFragmentHandler, correlationID, int(fragmentsWanted)-descriptorCount) + fragments := control.PollWithContext(DescriptorFragmentHandler, &pollContext, int(fragmentsWanted)-descriptorCount) logger.Debugf("PollWithContext(%d:%d) returned %d fragments", correlationID, sessionID, fragments) descriptorCount = len(control.Results.RecordingDescriptors) + len(control.Results.RecordingSubscriptionDescriptors) From e3db2e7a107f46aaffc6d0dede7248fc247f7529 Mon Sep 17 00:00:00 2001 From: billsegall Date: Thu, 16 Dec 2021 16:39:28 +1000 Subject: [PATCH 05/14] Have the Listeners be local to an archive clients. This allows a process to with multiple archive clients to separate their asynchronous callbacks per-client. We do this by switching the recordingevents adapter over to using the new PollWithContext using the ArchiveListeners as the context. --- archive/README.md | 3 +- archive/archive.go | 28 ++++++++-------- archive/control.go | 68 +++++++++++++++++++------------------- archive/recordingevents.go | 42 +++++++++++++---------- 4 files changed, 74 insertions(+), 67 deletions(-) diff --git a/archive/README.md b/archive/README.md index a86d667c..20e7dbf1 100755 --- a/archive/README.md +++ b/archive/README.md @@ -106,4 +106,5 @@ The actual semantics of the security are dependent upon which authenticator supp * Fix a return code error in StopRecordingById() * Fix unused argumentin StopRecording() * Cosmetic improvements for golint and staticcheck - * Improve the logging by downgrading in severity and message some warning only messages \ No newline at end of file + * Improve the logging by downgrading in severity and message some warning only messages + * Make the Listeners used for async events be per archive client instead of global \ No newline at end of file diff --git a/archive/archive.go b/archive/archive.go index 032430ee..4b2ef1d2 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -35,6 +35,7 @@ type Archive struct { Proxy *Proxy // For outgoing protocol messages (publish/request) Control *Control // For incoming protocol messages (subscribe/reponse) Events *RecordingEventsAdapter // For async recording events (must be enabled) + Listeners *ArchiveListeners // Per client event listeners for async callbacks } // Constant values used to control behaviour of StartReplay @@ -54,7 +55,6 @@ const ( // within the the FragmentAssemblers without any user data (or other // context). Listeners.ErrorListener() if set will be called if for // example protocol unmarshalling goes wrong. -var Listeners *ArchiveListeners // ArchiveListeners contains all the callbacks // By default only the ErrorListener is set to a logging listener. If @@ -223,8 +223,8 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) { archive.Events.archive = archive // Create the listeners and populate - Listeners = new(ArchiveListeners) - Listeners.ErrorListener = LoggingErrorListener + archive.Listeners = new(ArchiveListeners) + archive.Listeners.ErrorListener = LoggingErrorListener archive.SetAeronErrorHandler(LoggingErrorListener) // In Debug mode initialize our listeners with simple loggers @@ -232,20 +232,20 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) { if logging.GetLevel("archive") >= logging.DEBUG { logger.Debugf("Setting logging listeners") - Listeners.RecordingEventStartedListener = LoggingRecordingEventStartedListener - Listeners.RecordingEventProgressListener = LoggingRecordingEventProgressListener - Listeners.RecordingEventStoppedListener = LoggingRecordingEventStoppedListener + archive.Listeners.RecordingEventStartedListener = LoggingRecordingEventStartedListener + archive.Listeners.RecordingEventProgressListener = LoggingRecordingEventProgressListener + archive.Listeners.RecordingEventStoppedListener = LoggingRecordingEventStoppedListener - Listeners.RecordingSignalListener = LoggingRecordingSignalListener + archive.Listeners.RecordingSignalListener = LoggingRecordingSignalListener - Listeners.AvailableImageListener = LoggingAvailableImageListener - Listeners.UnavailableImageListener = LoggingUnavailableImageListener + archive.Listeners.AvailableImageListener = LoggingAvailableImageListener + archive.Listeners.UnavailableImageListener = LoggingUnavailableImageListener - Listeners.NewSubscriptionListener = LoggingNewSubscriptionListener - Listeners.NewPublicationListener = LoggingNewPublicationListener + archive.Listeners.NewSubscriptionListener = LoggingNewSubscriptionListener + archive.Listeners.NewPublicationListener = LoggingNewPublicationListener - archive.aeronContext.NewSubscriptionHandler(Listeners.NewSubscriptionListener) - archive.aeronContext.NewPublicationHandler(Listeners.NewPublicationListener) + archive.aeronContext.NewSubscriptionHandler(archive.Listeners.NewSubscriptionListener) + archive.aeronContext.NewPublicationHandler(archive.Listeners.NewPublicationListener) } // Connect the underlying aeron @@ -373,7 +373,7 @@ func (archive *Archive) DisableRecordingEvents() { // RecordingEventsPoll is used to poll for recording events func (archive *Archive) RecordingEventsPoll() int { - return archive.Events.Poll(nil, 1) + return archive.Events.PollWithContext(nil, 1) } // AddSubscription will add a new subscription to the driver. diff --git a/archive/control.go b/archive/control.go index d59b252f..7aa076fe 100644 --- a/archive/control.go +++ b/archive/control.go @@ -128,8 +128,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i // Not much to be done here as we can't really tell what went wrong err2 := fmt.Errorf("controlFragmentHandler() failed to decode control message header: %w", err) // Call the global error handler, ugly but it's all we've got - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -138,8 +138,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i c, ok := correlations.Load(pollContext.correlationID) if !ok { // something has gone horribly wrong and we can't correlate - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) } logger.Debugf("failed to locate control via correlationID %d", pollContext.correlationID) return @@ -154,8 +154,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i // Not much to be done here as we can't see what's gone wrong err2 := fmt.Errorf("controlFragmentHandler failed to decode control response:%w", err) // Call the global error handler, ugly but it's all we've got - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -177,12 +177,12 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i if err := recordingSignalEvent.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't really tell what went wrong err2 := fmt.Errorf("ControlFragmentHandler failed to decode recording signal: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } } - if Listeners.RecordingSignalListener != nil { - Listeners.RecordingSignalListener(recordingSignalEvent) + if pollContext.control.archive.Listeners.RecordingSignalListener != nil { + pollContext.control.archive.Listeners.RecordingSignalListener(recordingSignalEvent) } // These can happen when testing/reconnecting or if multiple clients are on the same channel/stream @@ -217,8 +217,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer // Not much to be done here as we can't correlate err2 := fmt.Errorf("ConnectionControlFragmentHandler() failed to decode control message header: %w", err) // Call the global error handler, ugly but it's all we've got - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } } @@ -229,8 +229,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if err := controlResponse.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("ConnectionControlFragmentHandler failed to decode control response: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } logger.Debugf("ConnectionControlFragmentHandler failed to decode control response: %w", err) return @@ -250,8 +250,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if controlResponse.Code != codecs.ControlResponseCode.OK { control.State.state = ControlStateError control.State.err = fmt.Errorf("Control Response failure: %s", controlResponse.ErrorMessage) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(control.State.err) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(control.State.err) } return } @@ -260,8 +260,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if control.State.state != ControlStateConnectRequestSent { control.State.state = ControlStateError control.State.err = fmt.Errorf("Control Response not expecting response") - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(control.State.err) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(control.State.err) } } @@ -281,8 +281,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if err := challenge.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("ControlFragmentHandler failed to decode challenge: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } } @@ -429,8 +429,8 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse // Not much to be done here as we can't correlate err2 := fmt.Errorf("DescriptorFragmentHandler() failed to decode control message header: %w", err) // Call the global error handler, ugly but it's all we've got - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -439,8 +439,8 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse c, ok := correlations.Load(pollContext.correlationID) if !ok { // something has gone horribly wrong and we can't correlate - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(fmt.Errorf("failed to locate control via correlationID %d", pollContext.correlationID)) } logger.Debugf("failed to locate control via correlationID %d", pollContext.correlationID) return @@ -454,8 +454,8 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse if err := recordingDescriptor.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("failed to decode RecordingDescriptor: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -476,8 +476,8 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse if err := recordingSubscriptionDescriptor.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("failed to decode RecordingSubscriptioDescriptor: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -497,8 +497,8 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse if err := controlResponse.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("failed to decode control response: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } @@ -531,14 +531,14 @@ func DescriptorFragmentHandler(context interface{}, buffer *atomic.Buffer, offse if err := recordingSignalEvent.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("failed to decode recording signal: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if pollContext.control.archive.Listeners.ErrorListener != nil { + pollContext.control.archive.Listeners.ErrorListener(err2) } return } - if Listeners.RecordingSignalListener != nil { - Listeners.RecordingSignalListener(recordingSignalEvent) + if pollContext.control.archive.Listeners.RecordingSignalListener != nil { + pollContext.control.archive.Listeners.RecordingSignalListener(recordingSignalEvent) } default: diff --git a/archive/recordingevents.go b/archive/recordingevents.go index cde900d6..69c2bc35 100644 --- a/archive/recordingevents.go +++ b/archive/recordingevents.go @@ -31,10 +31,10 @@ type RecordingEventsAdapter struct { archive *Archive // link to parent } -// Poll the aeron subscription handler. +// PollWithContext the aeron subscription handler. // If you pass it a nil handler it will use the builtin and call the Listeners // If you ask for 0 fragments it will only return one fragment (if available) -func (rea *RecordingEventsAdapter) Poll(handler term.FragmentHandler, fragmentLimit int) int { +func (rea *RecordingEventsAdapter) PollWithContext(handler term.FragmentHandlerWithContext, fragmentLimit int) int { // Update our globals in case they've changed so we use the current state in our callback rangeChecking = rea.archive.Options.RangeChecking @@ -45,10 +45,16 @@ func (rea *RecordingEventsAdapter) Poll(handler term.FragmentHandler, fragmentLi if fragmentLimit == 0 { fragmentLimit = 1 } - return rea.Subscription.Poll(handler, fragmentLimit) + return rea.Subscription.PollWithContext(handler, rea.archive.Listeners, fragmentLimit) } -func reFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { +func reFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + listeners, ok := context.(*ArchiveListeners) + if !ok { + logger.Errorf("context conversion failed") + return + } + var hdr codecs.SbeGoMessageHeader buf := new(bytes.Buffer) @@ -60,8 +66,8 @@ func reFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header // Not much to be done here as we can't correlate err2 := fmt.Errorf("reFragmentHandler() failed to decode control message header: %w", err) // Call the global error handler, ugly but it's all we've got - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if listeners.ErrorListener != nil { + listeners.ErrorListener(err2) } } @@ -71,14 +77,14 @@ func reFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header logger.Debugf("Received RecordingStarted: length %d", buf.Len()) if err := recordingStarted.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { err2 := fmt.Errorf("Decode() of RecordingStarted failed: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if listeners.ErrorListener != nil { + listeners.ErrorListener(err2) } } else { // logger.Debugf("RecordingStarted: %#v\n", recordingStarted) // Call the Listener - if Listeners.RecordingEventStartedListener != nil { - Listeners.RecordingEventStartedListener(recordingStarted) + if listeners.RecordingEventStartedListener != nil { + listeners.RecordingEventStartedListener(recordingStarted) } } @@ -87,14 +93,14 @@ func reFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header logger.Debugf("Received RecordingProgress: length %d", buf.Len()) if err := recordingProgress.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { err2 := fmt.Errorf("Decode() of RecordingProgress failed: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if listeners.ErrorListener != nil { + listeners.ErrorListener(err2) } } else { logger.Debugf("RecordingProgress: %#v\n", recordingProgress) // Call the Listener - if Listeners.RecordingEventProgressListener != nil { - Listeners.RecordingEventProgressListener(recordingProgress) + if listeners.RecordingEventProgressListener != nil { + listeners.RecordingEventProgressListener(recordingProgress) } } @@ -103,14 +109,14 @@ func reFragmentHandler(buffer *atomic.Buffer, offset int32, length int32, header logger.Debugf("Received RecordingStopped: length %d", buf.Len()) if err := recordingStopped.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { err2 := fmt.Errorf("Decode() of RecordingStopped failed: %w", err) - if Listeners.ErrorListener != nil { - Listeners.ErrorListener(err2) + if listeners.ErrorListener != nil { + listeners.ErrorListener(err2) } } else { logger.Debugf("RecordingStopped: %#v\n", recordingStopped) // Call the Listener - if Listeners.RecordingEventStoppedListener != nil { - Listeners.RecordingEventStoppedListener(recordingStopped) + if listeners.RecordingEventStoppedListener != nil { + listeners.RecordingEventStoppedListener(recordingStopped) } } From b122e6adb653b21cb64d80422a1d78efec45d90f Mon Sep 17 00:00:00 2001 From: billsegall Date: Fri, 17 Dec 2021 11:03:02 +1000 Subject: [PATCH 06/14] minor clarification --- archive/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archive/README.md b/archive/README.md index 20e7dbf1..c733c0ad 100755 --- a/archive/README.md +++ b/archive/README.md @@ -101,7 +101,7 @@ The actual semantics of the security are dependent upon which authenticator supp ## Release Notes ### 1.0b2 - * Fix a race condition looking up correlationIDs + * Fix a race condition looking up correlationIDs on ControlResponse * Handle different archive clients using same channel/stream pairing * Fix a return code error in StopRecordingById() * Fix unused argumentin StopRecording() From 54b27a84d5f5885ceb27193e0057f8bdb5098ea9 Mon Sep 17 00:00:00 2001 From: billsegall Date: Fri, 17 Dec 2021 11:04:13 +1000 Subject: [PATCH 07/14] Tweak the fragmentCount for ControlResponses. Set this to ten (matching Java) so we handle any RecordingSignals that appear at the same time. It's not definite this will always work as there may be timing issues but in the general instance this will work as expected. Add a test case which also uses and tests the RecordingEvents which is a reliable way of checking for changes to recordings. --- archive/archive_test.go | 124 ++++++++++++++++++++++++++++++++++++++++ archive/control.go | 2 +- 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/archive/archive_test.go b/archive/archive_test.go index 013bf217..b3931054 100644 --- a/archive/archive_test.go +++ b/archive/archive_test.go @@ -18,6 +18,7 @@ import ( "flag" "github.com/lirm/aeron-go/aeron" "github.com/lirm/aeron-go/aeron/idlestrategy" + "github.com/lirm/aeron-go/archive/codecs" logging "github.com/op/go-logging" "log" "math/rand" @@ -46,6 +47,32 @@ var testCases = []TestCases{ {int32(*TestConfig.SampleStream), *TestConfig.SampleChannel, int32(*TestConfig.ReplayStream), *TestConfig.ReplayChannel}, } +// For testing async events +type Counters struct { + recordingSignalCount int + recordingEventStartedCount int + recordingEventProgressCount int + recordingEventStoppedCount int +} + +var counters Counters + +func RecordingSignalListener(rse *codecs.RecordingSignalEvent) { + counters.recordingSignalCount++ +} + +func RecordingEventStartedListener(rs *codecs.RecordingStarted) { + counters.recordingEventStartedCount++ +} + +func RecordingEventProgressListener(rp *codecs.RecordingProgress) { + counters.recordingEventProgressCount++ +} + +func RecordingEventStoppedListener(rs *codecs.RecordingStopped) { + counters.recordingEventStoppedCount++ +} + func TestMain(m *testing.M) { flag.Parse() @@ -137,6 +164,103 @@ func TestKeepAlive(t *testing.T) { } } +// Helper to check values of counters +func CounterValuesMatch(c Counters, signals int, started int, progress int, stopped int, t *testing.T) bool { + if counters.recordingSignalCount != signals { + t.Logf("counters.recordingSignalCount[%d] != signals[%d]", counters.recordingSignalCount, signals) + return false + } + if counters.recordingEventStartedCount != started { + t.Logf("counters.recordingEventStartedCount[%d] != started[%d]", counters.recordingEventStartedCount, started) + return false + } + if counters.recordingEventProgressCount != progress { + t.Logf("counters.recordingEventProgressCount[%d] != progress[%d]", counters.recordingEventProgressCount, progress) + return false + } + if counters.recordingEventStoppedCount != stopped { + t.Logf("counters.recordingEventStoppedCount[%d] != stopped[%d]", counters.recordingEventStoppedCount, stopped) + return false + } + return true +} + +// Test the recording event signals appear +func TestAsyncEvents(t *testing.T) { + if !haveArchive { + return + } + + if testing.Verbose() && DEBUG { + logging.SetLevel(logging.DEBUG, "archive") + } + + archive.Listeners.RecordingSignalListener = RecordingSignalListener + archive.Listeners.RecordingEventStartedListener = RecordingEventStartedListener + archive.Listeners.RecordingEventProgressListener = RecordingEventProgressListener + archive.Listeners.RecordingEventStoppedListener = RecordingEventStoppedListener + + counters = Counters{0, 0, 0, 0} + if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + archive.EnableRecordingEvents() + archive.RecordingEventsPoll() + + if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + publication, err := archive.AddRecordedPublication(testCases[0].sampleChannel, testCases[0].sampleStream) + if err != nil { + t.Log(err) + t.FailNow() + } + + archive.RecordingEventsPoll() + if !CounterValuesMatch(counters, 1, 1, 0, 0, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + // Delay a little to get the publication is established + idler := idlestrategy.Sleeping{SleepFor: time.Millisecond * 100} + idler.Idle(0) + + if err := archive.StopRecordingByPublication(*publication); err != nil { + t.Log(err) + t.FailNow() + } + + if !CounterValuesMatch(counters, 2, 1, 0, 0, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + archive.RecordingEventsPoll() + if !CounterValuesMatch(counters, 2, 1, 0, 1, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + // Cleanup + archive.DisableRecordingEvents() + archive.Listeners.RecordingSignalListener = nil + archive.Listeners.RecordingEventStartedListener = nil + archive.Listeners.RecordingEventProgressListener = nil + archive.Listeners.RecordingEventStoppedListener = nil + counters = Counters{0, 0, 0, 0} + if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + t.Log("Async event counters mismatch") + t.FailNow() + } + + publication.Close() +} + // Test adding a recording and then removing it - by Publication (session specific) func TestStartStopRecordingByPublication(t *testing.T) { if !haveArchive { diff --git a/archive/control.go b/archive/control.go index 7aa076fe..462ed807 100644 --- a/archive/control.go +++ b/archive/control.go @@ -376,7 +376,7 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i context := PollContext{control, correlationID} for { - ret := control.PollWithContext(controlFragmentHandler, &context, 1) + ret := control.PollWithContext(controlFragmentHandler, &context, 10) // Check result if control.Results.IsPollComplete { From 1cdb423f2e66bddd6fb74d12fd5cf3dd629e6b2c Mon Sep 17 00:00:00 2001 From: billsegall Date: Mon, 20 Dec 2021 17:26:19 +1000 Subject: [PATCH 08/14] Return an error from PollForResponse() if the result != OK This fixes a bug introduced in #0b1a83f when the polling was reworked. --- archive/control.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/archive/control.go b/archive/control.go index 462ed807..531c7519 100644 --- a/archive/control.go +++ b/archive/control.go @@ -358,7 +358,7 @@ func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, } // PollForResponse polls for a specific correlationID -// Returns nil, relevantId on success, error, 0 failure +// Returns (relevantId, nil) on success, (0 or relevantId, error) on failure // More complex responses are contained in Control.ControlResponse after the call func (control *Control) PollForResponse(correlationID int64, sessionID int64) (int64, error) { logger.Debugf("PollForResponse(%d:%d)", correlationID, sessionID) @@ -383,7 +383,8 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i logger.Debugf("PollForResponse(%d:%d) complete, result is %d", correlationID, sessionID, control.Results.ControlResponse.Code) if control.Results.ControlResponse.Code != codecs.ControlResponseCode.OK { err := fmt.Errorf("Control Response failure: %s", control.Results.ControlResponse.ErrorMessage) - logger.Debug(err) + logger.Debug(err) // log it in debug mode as an aid to diagnosis + return control.Results.ControlResponse.RelevantId, err } // logger.Debugf("PollForResponse(%d:%d) success", correlationID, sessionID) return control.Results.ControlResponse.RelevantId, nil From 49a5fbe5b889556dd4c0173a7a517d486d9ce983 Mon Sep 17 00:00:00 2001 From: billsegall Date: Mon, 10 Jan 2022 09:47:06 +1000 Subject: [PATCH 09/14] Add a test and ensure RPC failures correctly return errors. --- archive/archive.go | 3 +++ archive/archive_test.go | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/archive/archive.go b/archive/archive.go index 4b2ef1d2..6ef78712 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -470,6 +470,9 @@ func (archive *Archive) StopRecordingByIdentity(recordingID int64) (bool, error) logger.Debugf("StopRecordingByIdentity result was %d\n", res) } + if err != nil { + return false, err + } return res >= 0, err } diff --git a/archive/archive_test.go b/archive/archive_test.go index b3931054..23cdbd72 100644 --- a/archive/archive_test.go +++ b/archive/archive_test.go @@ -185,6 +185,25 @@ func CounterValuesMatch(c Counters, signals int, started int, progress int, stop return true } +// Test that Archive RPCs will fail correctly +func TestRPCFailure(t *testing.T) { + if !haveArchive { + return + } + + if testing.Verbose() && DEBUG { + logging.SetLevel(logging.DEBUG, "archive") + } + + // Ask to stop a bogus recording + res, err := archive.StopRecordingByIdentity(0xdeadbeef) + if err == nil || res { + t.Logf("RPC succeeded and should have failed") + t.FailNow() + } + +} + // Test the recording event signals appear func TestAsyncEvents(t *testing.T) { if !haveArchive { From 3319d8714a2386ea758fe7121f9ee4f86fb78776 Mon Sep 17 00:00:00 2001 From: Peter Date: Fri, 17 Dec 2021 16:00:07 -0500 Subject: [PATCH 10/14] Remove FragmentHandlerWithContext and pass in the context early using closures instead. --- aeron/image.go | 4 +-- aeron/logbuffer/term/reader.go | 7 ++--- aeron/subscription.go | 6 ++--- archive/archive.go | 6 ++++- archive/control.go | 47 +++++++++++++++++----------------- archive/recordingevents.go | 19 +++++++------- 6 files changed, 45 insertions(+), 44 deletions(-) diff --git a/aeron/image.go b/aeron/image.go index 59030ea0..7ef2b180 100644 --- a/aeron/image.go +++ b/aeron/image.go @@ -118,7 +118,7 @@ func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int { return result } -func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, context interface{}, fragmentLimit int) int { +func (image *Image) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int { if image.IsClosed() { return 0 } @@ -128,7 +128,7 @@ func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, con index := indexByPosition(position, image.positionBitsToShift) termBuffer := image.termBuffers[index] - offset, result := term.ReadWithContext(context, termBuffer, termOffset, handler, fragmentLimit, &image.header) + offset, result := term.ReadWithContext(termBuffer, termOffset, handler, fragmentLimit, &image.header) newPosition := position + int64(offset-termOffset) if newPosition > position { diff --git a/aeron/logbuffer/term/reader.go b/aeron/logbuffer/term/reader.go index d39ead79..6ccdcf66 100644 --- a/aeron/logbuffer/term/reader.go +++ b/aeron/logbuffer/term/reader.go @@ -55,11 +55,8 @@ func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, return termOffset, fragmentsRead } -// FragmentHandlerWithContext provides a FragmentHandler with context -type FragmentHandlerWithContext func(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) - // ReadWithContext as for Read() but woth a contextual argument -func ReadWithContext(context interface{}, termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandlerWithContext, fragmentsLimit int, +func ReadWithContext(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, fragmentsLimit int, header *logbuffer.Header) (int32, int) { capacity := termBuffer.Capacity() @@ -77,7 +74,7 @@ func ReadWithContext(context interface{}, termBuffer *atomic.Buffer, termOffset if !logbuffer.IsPaddingFrame(termBuffer, fragmentOffset) { header.Wrap(termBuffer.Ptr(), termBuffer.Capacity()) header.SetOffset(fragmentOffset) - handler(context, termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length, + handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length, frameLength-logbuffer.DataFrameHeader.Length, header) fragmentsRead++ diff --git a/aeron/subscription.go b/aeron/subscription.go index ed33fa96..a39f0931 100644 --- a/aeron/subscription.go +++ b/aeron/subscription.go @@ -103,7 +103,7 @@ func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) i } // PollWithContext as for Poll() but provides an integer argument for passing contextual information -func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext, context interface{}, fragmentLimit int) int { +func (sub *Subscription) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int { img := sub.images.Get() length := len(img) @@ -118,11 +118,11 @@ func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext } for i := startingIndex; i < length && fragmentsRead < fragmentLimit; i++ { - fragmentsRead += img[i].PollWithContext(handler, context, fragmentLimit-fragmentsRead) + fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead) } for i := 0; i < startingIndex && fragmentsRead < fragmentLimit; i++ { - fragmentsRead += img[i].PollWithContext(handler, context, fragmentLimit-fragmentsRead) + fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead) } } diff --git a/archive/archive.go b/archive/archive.go index 6ef78712..e40947dc 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/lirm/aeron-go/aeron" "github.com/lirm/aeron-go/aeron/atomic" + "github.com/lirm/aeron-go/aeron/logbuffer" "github.com/lirm/aeron-go/archive/codecs" logging "github.com/op/go-logging" "sync" @@ -286,7 +287,10 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) { pollContext := PollContext{archive.Control, correlationID} for archive.Control.State.state != ControlStateConnected && archive.Control.State.err == nil { - fragments := archive.Control.PollWithContext(ConnectionControlFragmentHandler, &pollContext, 1) + fragments := archive.Control.PollWithContext( + func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + ConnectionControlFragmentHandler(&pollContext, buf, offset, length, header) + }, 1) if fragments > 0 { logger.Debugf("Read %d fragment(s)\n", fragments) } diff --git a/archive/control.go b/archive/control.go index 531c7519..e573c475 100644 --- a/archive/control.go +++ b/archive/control.go @@ -199,13 +199,8 @@ func controlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset i // ConnectionControlFragmentHandler is the connection handling specific fragment handler. // This mechanism only alows us to pass results back via global state which we do in control.State -func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { - pollContext, ok := context.(*PollContext) - if !ok { - logger.Errorf("Context conversion failed") - return - } - logger.Debugf("ConnectionControlFragmentHandler: correlationID:%d offset:%d length: %d header: %#v", pollContext.correlationID, offset, length, header) +func ConnectionControlFragmentHandler(context *PollContext, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + logger.Debugf("ConnectionControlFragmentHandler: correlationID:%d offset:%d length: %d header: %#v", context.correlationID, offset, length, header) var hdr codecs.SbeGoMessageHeader @@ -217,8 +212,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer // Not much to be done here as we can't correlate err2 := fmt.Errorf("ConnectionControlFragmentHandler() failed to decode control message header: %w", err) // Call the global error handler, ugly but it's all we've got - if pollContext.control.archive.Listeners.ErrorListener != nil { - pollContext.control.archive.Listeners.ErrorListener(err2) + if context.control.archive.Listeners.ErrorListener != nil { + context.control.archive.Listeners.ErrorListener(err2) } } @@ -229,8 +224,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if err := controlResponse.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("ConnectionControlFragmentHandler failed to decode control response: %w", err) - if pollContext.control.archive.Listeners.ErrorListener != nil { - pollContext.control.archive.Listeners.ErrorListener(err2) + if context.control.archive.Listeners.ErrorListener != nil { + context.control.archive.Listeners.ErrorListener(err2) } logger.Debugf("ConnectionControlFragmentHandler failed to decode control response: %w", err) return @@ -245,13 +240,13 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer control := c.(*Control) // Check this was for us - if controlResponse.CorrelationId == pollContext.correlationID { + if controlResponse.CorrelationId == context.correlationID { // Check result if controlResponse.Code != codecs.ControlResponseCode.OK { control.State.state = ControlStateError control.State.err = fmt.Errorf("Control Response failure: %s", controlResponse.ErrorMessage) - if pollContext.control.archive.Listeners.ErrorListener != nil { - pollContext.control.archive.Listeners.ErrorListener(control.State.err) + if context.control.archive.Listeners.ErrorListener != nil { + context.control.archive.Listeners.ErrorListener(control.State.err) } return } @@ -260,8 +255,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if control.State.state != ControlStateConnectRequestSent { control.State.state = ControlStateError control.State.err = fmt.Errorf("Control Response not expecting response") - if pollContext.control.archive.Listeners.ErrorListener != nil { - pollContext.control.archive.Listeners.ErrorListener(control.State.err) + if context.control.archive.Listeners.ErrorListener != nil { + context.control.archive.Listeners.ErrorListener(control.State.err) } } @@ -281,8 +276,8 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer if err := challenge.Decode(marshaller, buf, hdr.Version, hdr.BlockLength, rangeChecking); err != nil { // Not much to be done here as we can't correlate err2 := fmt.Errorf("ControlFragmentHandler failed to decode challenge: %w", err) - if pollContext.control.archive.Listeners.ErrorListener != nil { - pollContext.control.archive.Listeners.ErrorListener(err2) + if context.control.archive.Listeners.ErrorListener != nil { + context.control.archive.Listeners.ErrorListener(err2) } } @@ -297,7 +292,7 @@ func ConnectionControlFragmentHandler(context interface{}, buffer *atomic.Buffer control := c.(*Control) // Check this was for us - if challenge.CorrelationId == pollContext.correlationID { + if challenge.CorrelationId == context.correlationID { // Check the challenge is expected iff our option for this is not nil if control.archive.Options.AuthChallenge != nil { @@ -346,7 +341,7 @@ func (control *Control) Poll(handler term.FragmentHandler, fragmentLimit int) in } // PollWithContext provides a Poll() with a context argument -func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, context *PollContext, fragmentLimit int) int { +func (control *Control) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int { // Update our globals in case they've changed so we use the current state in our callback rangeChecking = control.archive.Options.RangeChecking @@ -354,7 +349,7 @@ func (control *Control) PollWithContext(handler term.FragmentHandlerWithContext, control.Results.ControlResponse = nil // Clear old results control.Results.IsPollComplete = false // Clear completion flag - return control.Subscription.PollWithContext(handler, context, fragmentLimit) + return control.Subscription.PollWithContext(handler, fragmentLimit) } // PollForResponse polls for a specific correlationID @@ -376,7 +371,10 @@ func (control *Control) PollForResponse(correlationID int64, sessionID int64) (i context := PollContext{control, correlationID} for { - ret := control.PollWithContext(controlFragmentHandler, &context, 10) + ret := control.PollWithContext( + func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + controlFragmentHandler(&context, buf, offset, length, header) + }, 10) // Check result if control.Results.IsPollComplete { @@ -565,7 +563,10 @@ func (control *Control) PollForDescriptors(correlationID int64, sessionID int64, for !control.Results.IsPollComplete { logger.Debugf("PollForDescriptors(%d:%d, %d)", correlationID, sessionID, int(fragmentsWanted)-descriptorCount) - fragments := control.PollWithContext(DescriptorFragmentHandler, &pollContext, int(fragmentsWanted)-descriptorCount) + fragments := control.PollWithContext( + func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + DescriptorFragmentHandler(&pollContext, buf, offset, length, header) + }, int(fragmentsWanted)-descriptorCount) logger.Debugf("PollWithContext(%d:%d) returned %d fragments", correlationID, sessionID, fragments) descriptorCount = len(control.Results.RecordingDescriptors) + len(control.Results.RecordingSubscriptionDescriptors) diff --git a/archive/recordingevents.go b/archive/recordingevents.go index 69c2bc35..83409f9f 100644 --- a/archive/recordingevents.go +++ b/archive/recordingevents.go @@ -20,7 +20,6 @@ import ( "github.com/lirm/aeron-go/aeron" "github.com/lirm/aeron-go/aeron/atomic" "github.com/lirm/aeron-go/aeron/logbuffer" - "github.com/lirm/aeron-go/aeron/logbuffer/term" "github.com/lirm/aeron-go/archive/codecs" ) @@ -31,10 +30,13 @@ type RecordingEventsAdapter struct { archive *Archive // link to parent } +// FragmentHandlerWithListeners provides a FragmentHandler with ArchiveListeners +type FragmentHandlerWithListeners func(listeners *ArchiveListeners, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) + // PollWithContext the aeron subscription handler. // If you pass it a nil handler it will use the builtin and call the Listeners // If you ask for 0 fragments it will only return one fragment (if available) -func (rea *RecordingEventsAdapter) PollWithContext(handler term.FragmentHandlerWithContext, fragmentLimit int) int { +func (rea *RecordingEventsAdapter) PollWithContext(handler FragmentHandlerWithListeners, fragmentLimit int) int { // Update our globals in case they've changed so we use the current state in our callback rangeChecking = rea.archive.Options.RangeChecking @@ -45,16 +47,13 @@ func (rea *RecordingEventsAdapter) PollWithContext(handler term.FragmentHandlerW if fragmentLimit == 0 { fragmentLimit = 1 } - return rea.Subscription.PollWithContext(handler, rea.archive.Listeners, fragmentLimit) + return rea.Subscription.PollWithContext( + func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { + handler(rea.archive.Listeners, buf, offset, length, header) + }, fragmentLimit) } -func reFragmentHandler(context interface{}, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { - listeners, ok := context.(*ArchiveListeners) - if !ok { - logger.Errorf("context conversion failed") - return - } - +func reFragmentHandler(listeners *ArchiveListeners, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) { var hdr codecs.SbeGoMessageHeader buf := new(bytes.Buffer) From 0cf17522a6e931708cae4d191183c86a7401f5cc Mon Sep 17 00:00:00 2001 From: Peter Date: Mon, 20 Dec 2021 16:40:09 -0500 Subject: [PATCH 11/14] Add IsConnected() to Subscription. Add IsRecordingEventsConnected() to Archive. --- aeron/subscription.go | 10 ++++++++++ archive/archive.go | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/aeron/subscription.go b/aeron/subscription.go index a39f0931..760639f8 100644 --- a/aeron/subscription.go +++ b/aeron/subscription.go @@ -171,6 +171,16 @@ func (sub *Subscription) RegistrationID() int64 { return sub.registrationID } +// IsConnected returns if this subscription is connected by having at least one open publication Image. +func (sub *Subscription) IsConnected() bool { + for _, image := range sub.images.Get() { + if !image.IsClosed() { + return true + } + } + return false +} + // HasImages is a helper method checking whether this subscription has any images associated with it. func (sub *Subscription) HasImages() bool { images := sub.images.Get() diff --git a/archive/archive.go b/archive/archive.go index e40947dc..d1cb30c8 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -17,6 +17,7 @@ package archive import ( + "errors" "fmt" "github.com/lirm/aeron-go/aeron" "github.com/lirm/aeron-go/aeron/atomic" @@ -368,6 +369,15 @@ func (archive *Archive) EnableRecordingEvents() { logger.Debugf("RecordingEvents subscription: %#v", archive.Events.Subscription) } +// IsRecordingEventsConnected returns true if the recording events subscription +// is connected. +func (archive *Archive) IsRecordingEventsConnected() (bool, error) { + if !archive.Events.Enabled { + return false, errors.New("recording events not enabled") + } + return archive.Events.Subscription.IsConnected(), nil +} + // DisableRecordingEvents stops recording events flowing func (archive *Archive) DisableRecordingEvents() { archive.Events.Subscription.Close() From e9b8ca04fc0a70d8379616e03af3fa29d3984667 Mon Sep 17 00:00:00 2001 From: billsegall Date: Thu, 13 Jan 2022 13:23:42 +1000 Subject: [PATCH 12/14] Add notes on logging changes and IsConnected() --- archive/README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/archive/README.md b/archive/README.md index c733c0ad..40ef44f8 100755 --- a/archive/README.md +++ b/archive/README.md @@ -85,8 +85,6 @@ The actual semantics of the security are dependent upon which authenticator supp * more testing * archive-media-driver mocking/execution * test cleanup in the media driver can be problematic - * The archive state is largely unused. - * Add? and use? IsOpen() * Auth should provide some callout mechanism * various FIXMEs @@ -101,10 +99,13 @@ The actual semantics of the security are dependent upon which authenticator supp ## Release Notes ### 1.0b2 - * Fix a race condition looking up correlationIDs on ControlResponse * Handle different archive clients using same channel/stream pairing + * Provide Subscription.IsConnected() and IsRecordingEventsConnected() + * Replace go-logging with zap to avoid reentrancy crashes in logging library + * Improve the logging by downgrading in severity and message tone some warning level messages + * Fix a race condition looking up correlationIDs on ControlResponse * Fix a return code error in StopRecordingById() * Fix unused argumentin StopRecording() * Cosmetic improvements for golint and staticcheck - * Improve the logging by downgrading in severity and message some warning only messages - * Make the Listeners used for async events be per archive client instead of global \ No newline at end of file + * Make the Listeners used for async events be per archive client instead of global + \ No newline at end of file From 7ea6da37fa3f82eb50fa9025c699d29fd8c85557 Mon Sep 17 00:00:00 2001 From: billsegall Date: Thu, 13 Jan 2022 13:25:54 +1000 Subject: [PATCH 13/14] typo --- archive/archive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/archive/archive.go b/archive/archive.go index d1cb30c8..138c9e8f 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -47,7 +47,7 @@ const ( RecordingLengthMax = int64(2<<31 - 1) // Replay the whole stream ) -// replication flag used fir duplication instead of extension, see Replicate and variants +// replication flag used for duplication instead of extension, see Replicate and variants const ( RecordingIdNullValue = int32(-1) ) From 25181233c91710375eac7fb519c8a2af8f378f64 Mon Sep 17 00:00:00 2001 From: billsegall Date: Thu, 13 Jan 2022 13:45:30 +1000 Subject: [PATCH 14/14] 2022 copyright updates --- archive/archive.go | 2 +- archive/archive_test.go | 56 +++++++++---------- archive/benchmarks_test.go | 2 +- archive/codecs/encoders.go | 2 +- archive/codecs/encoders_test.go | 2 +- archive/codecs/generate.sh | 2 +- archive/codecs/semanticversion.go | 2 +- archive/config_test.go | 2 +- archive/control.go | 2 +- .../basic_recording_publisher.go | 2 +- .../basic_replayed_subscriber.go | 2 +- archive/examples/examplesconfig.go | 28 +++++----- archive/options.go | 2 +- archive/proxy.go | 2 +- archive/recordingevents.go | 2 +- 15 files changed, 54 insertions(+), 56 deletions(-) diff --git a/archive/archive.go b/archive/archive.go index 138c9e8f..3d68f8db 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // Copyright (C) 2014-2021 Real Logic Limited. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/archive/archive_test.go b/archive/archive_test.go index 23cdbd72..4e442d94 100644 --- a/archive/archive_test.go +++ b/archive/archive_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,29 +48,29 @@ var testCases = []TestCases{ } // For testing async events -type Counters struct { +type TestCounters struct { recordingSignalCount int recordingEventStartedCount int recordingEventProgressCount int recordingEventStoppedCount int } -var counters Counters +var testCounters TestCounters func RecordingSignalListener(rse *codecs.RecordingSignalEvent) { - counters.recordingSignalCount++ + testCounters.recordingSignalCount++ } func RecordingEventStartedListener(rs *codecs.RecordingStarted) { - counters.recordingEventStartedCount++ + testCounters.recordingEventStartedCount++ } func RecordingEventProgressListener(rp *codecs.RecordingProgress) { - counters.recordingEventProgressCount++ + testCounters.recordingEventProgressCount++ } func RecordingEventStoppedListener(rs *codecs.RecordingStopped) { - counters.recordingEventStoppedCount++ + testCounters.recordingEventStoppedCount++ } func TestMain(m *testing.M) { @@ -165,21 +165,21 @@ func TestKeepAlive(t *testing.T) { } // Helper to check values of counters -func CounterValuesMatch(c Counters, signals int, started int, progress int, stopped int, t *testing.T) bool { - if counters.recordingSignalCount != signals { - t.Logf("counters.recordingSignalCount[%d] != signals[%d]", counters.recordingSignalCount, signals) +func CounterValuesMatch(c TestCounters, signals int, started int, progress int, stopped int, t *testing.T) bool { + if testCounters.recordingSignalCount != signals { + t.Logf("testCounters.recordingSignalCount[%d] != signals[%d]", testCounters.recordingSignalCount, signals) return false } - if counters.recordingEventStartedCount != started { - t.Logf("counters.recordingEventStartedCount[%d] != started[%d]", counters.recordingEventStartedCount, started) + if testCounters.recordingEventStartedCount != started { + t.Logf("testCounters.recordingEventStartedCount[%d] != started[%d]", testCounters.recordingEventStartedCount, started) return false } - if counters.recordingEventProgressCount != progress { - t.Logf("counters.recordingEventProgressCount[%d] != progress[%d]", counters.recordingEventProgressCount, progress) + if testCounters.recordingEventProgressCount != progress { + t.Logf("testCounters.recordingEventProgressCount[%d] != progress[%d]", testCounters.recordingEventProgressCount, progress) return false } - if counters.recordingEventStoppedCount != stopped { - t.Logf("counters.recordingEventStoppedCount[%d] != stopped[%d]", counters.recordingEventStoppedCount, stopped) + if testCounters.recordingEventStoppedCount != stopped { + t.Logf("testCounters.recordingEventStoppedCount[%d] != stopped[%d]", testCounters.recordingEventStoppedCount, stopped) return false } return true @@ -219,8 +219,8 @@ func TestAsyncEvents(t *testing.T) { archive.Listeners.RecordingEventProgressListener = RecordingEventProgressListener archive.Listeners.RecordingEventStoppedListener = RecordingEventStoppedListener - counters = Counters{0, 0, 0, 0} - if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + testCounters = TestCounters{0, 0, 0, 0} + if !CounterValuesMatch(testCounters, 0, 0, 0, 0, t) { t.Log("Async event counters mismatch") t.FailNow() } @@ -228,7 +228,7 @@ func TestAsyncEvents(t *testing.T) { archive.EnableRecordingEvents() archive.RecordingEventsPoll() - if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + if !CounterValuesMatch(testCounters, 0, 0, 0, 0, t) { t.Log("Async event counters mismatch") t.FailNow() } @@ -240,7 +240,7 @@ func TestAsyncEvents(t *testing.T) { } archive.RecordingEventsPoll() - if !CounterValuesMatch(counters, 1, 1, 0, 0, t) { + if !CounterValuesMatch(testCounters, 1, 1, 0, 0, t) { t.Log("Async event counters mismatch") t.FailNow() } @@ -254,13 +254,13 @@ func TestAsyncEvents(t *testing.T) { t.FailNow() } - if !CounterValuesMatch(counters, 2, 1, 0, 0, t) { + if !CounterValuesMatch(testCounters, 2, 1, 0, 0, t) { t.Log("Async event counters mismatch") t.FailNow() } archive.RecordingEventsPoll() - if !CounterValuesMatch(counters, 2, 1, 0, 1, t) { + if !CounterValuesMatch(testCounters, 2, 1, 0, 1, t) { t.Log("Async event counters mismatch") t.FailNow() } @@ -271,8 +271,8 @@ func TestAsyncEvents(t *testing.T) { archive.Listeners.RecordingEventStartedListener = nil archive.Listeners.RecordingEventProgressListener = nil archive.Listeners.RecordingEventStoppedListener = nil - counters = Counters{0, 0, 0, 0} - if !CounterValuesMatch(counters, 0, 0, 0, 0, t) { + testCounters = TestCounters{0, 0, 0, 0} + if !CounterValuesMatch(testCounters, 0, 0, 0, 0, t) { t.Log("Async event counters mismatch") t.FailNow() } @@ -500,9 +500,9 @@ func TestStartStopReplay(t *testing.T) { } t.Logf("ListRecording(%d) returned %#v", recordingID, *recording) - // ListRecording should not find one with a bad Id - badId := int64(-127) - recording, err = archive.ListRecording(badId) + // ListRecording should not find one with a bad ID + badID := int64(-127) + recording, err = archive.ListRecording(badID) if err != nil { t.Log(err) t.FailNow() @@ -511,7 +511,7 @@ func TestStartStopReplay(t *testing.T) { t.Log("ListRecording returned a record descriptor and should not have") t.FailNow() } - t.Logf("ListRecording(%d) correctly returned nil", badId) + t.Logf("ListRecording(%d) correctly returned nil", badID) // While we're here, check ListRecordingSubscription is working descriptors, err := archive.ListRecordingSubscriptions(0, 10, false, 0, "aeron") diff --git a/archive/benchmarks_test.go b/archive/benchmarks_test.go index 3c3b5126..d679e00e 100644 --- a/archive/benchmarks_test.go +++ b/archive/benchmarks_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/codecs/encoders.go b/archive/codecs/encoders.go index d669429a..59f3cc75 100644 --- a/archive/codecs/encoders.go +++ b/archive/codecs/encoders.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/codecs/encoders_test.go b/archive/codecs/encoders_test.go index 947e988f..2cd68d0c 100644 --- a/archive/codecs/encoders_test.go +++ b/archive/codecs/encoders_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/codecs/generate.sh b/archive/codecs/generate.sh index 6f8628ad..60f39d65 100755 --- a/archive/codecs/generate.sh +++ b/archive/codecs/generate.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -# Copyright (C) 2020-2021 Talos, Inc. +# Copyright (C) 2020-2022 Talos, Inc. # This isn't that useful without modification or an identical layout to the # defaults below so you might consider it more documentation than working diff --git a/archive/codecs/semanticversion.go b/archive/codecs/semanticversion.go index a5d925b0..10e13396 100644 --- a/archive/codecs/semanticversion.go +++ b/archive/codecs/semanticversion.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/config_test.go b/archive/config_test.go index b16d507a..e0def0c7 100644 --- a/archive/config_test.go +++ b/archive/config_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/control.go b/archive/control.go index e573c475..fb051e22 100644 --- a/archive/control.go +++ b/archive/control.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/examples/basic_recording_publisher/basic_recording_publisher.go b/archive/examples/basic_recording_publisher/basic_recording_publisher.go index 933b8374..334b40fc 100644 --- a/archive/examples/basic_recording_publisher/basic_recording_publisher.go +++ b/archive/examples/basic_recording_publisher/basic_recording_publisher.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/examples/basic_replayed_subscriber/basic_replayed_subscriber.go b/archive/examples/basic_replayed_subscriber/basic_replayed_subscriber.go index 1214373f..d82a0332 100644 --- a/archive/examples/basic_replayed_subscriber/basic_replayed_subscriber.go +++ b/archive/examples/basic_replayed_subscriber/basic_replayed_subscriber.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/examples/examplesconfig.go b/archive/examples/examplesconfig.go index fe7b70c3..fe96a449 100644 --- a/archive/examples/examplesconfig.go +++ b/archive/examples/examplesconfig.go @@ -1,18 +1,16 @@ -/* -// Copyright (C) 2020-2021 Talos, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ +// Copyright (C) 2021-2022 Talos, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package examples diff --git a/archive/options.go b/archive/options.go index 94aa2506..f31ddca0 100644 --- a/archive/options.go +++ b/archive/options.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/proxy.go b/archive/proxy.go index e7addeb8..37282102 100644 --- a/archive/proxy.go +++ b/archive/proxy.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/archive/recordingevents.go b/archive/recordingevents.go index 83409f9f..483e87d6 100644 --- a/archive/recordingevents.go +++ b/archive/recordingevents.go @@ -1,4 +1,4 @@ -// Copyright (C) 2021 Talos, Inc. +// Copyright (C) 2021-2022 Talos, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.