Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/lirm/aeron-go into loggin…
Browse files Browse the repository at this point in the history
…g-migration
  • Loading branch information
billsegall committed Jan 21, 2022
2 parents d416f5e + 083126f commit c5633e8
Show file tree
Hide file tree
Showing 25 changed files with 414 additions and 162 deletions.
23 changes: 10 additions & 13 deletions aeron/clientconductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package aeron
import (
"errors"
"fmt"
"io"
"log"
"runtime"
"sync"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (sub *subscriptionStateDefn) Init(ch string, regID int64, sID int32, now in

type lingerResourse struct {
lastTime int64
resource *Image
resource io.Closer
}

type ClientConductor struct {
Expand Down Expand Up @@ -258,12 +259,6 @@ func (cc *ClientConductor) AddPublication(channel string, streamID int32) int64
cc.adminLock.Lock()
defer cc.adminLock.Unlock()

for _, pub := range cc.pubs {
if pub.streamID == streamID && pub.channel == channel {
return pub.regID
}
}

now := time.Now().UnixNano()

regID := cc.driverProxy.AddPublication(channel, streamID)
Expand All @@ -285,12 +280,6 @@ func (cc *ClientConductor) AddExclusivePublication(channel string, streamID int3
cc.adminLock.Lock()
defer cc.adminLock.Unlock()

for _, pub := range cc.pubs {
if pub.streamID == streamID && pub.channel == channel {
return pub.regID
}
}

now := time.Now().UnixNano()

regID := cc.driverProxy.AddExclusivePublication(channel, streamID)
Expand Down Expand Up @@ -347,6 +336,8 @@ func (cc *ClientConductor) releasePublication(regID int64) {
cc.adminLock.Lock()
defer cc.adminLock.Unlock()

now := time.Now().UnixNano()

pubcnt := len(cc.pubs)
for i, pub := range cc.pubs {
if pub != nil && pub.regID == regID {
Expand All @@ -355,6 +346,10 @@ func (cc *ClientConductor) releasePublication(regID int64) {
cc.pubs[i] = cc.pubs[pubcnt-1]
cc.pubs[pubcnt-1] = nil
pubcnt--

if pub.buffers.DecRef() == 0 {
cc.lingeringResources <- lingerResourse{now, pub.buffers}
}
}
}
cc.pubs = cc.pubs[:pubcnt]
Expand Down Expand Up @@ -470,6 +465,7 @@ func (cc *ClientConductor) OnNewPublication(streamID int32, sessionID int32, pos
pubDef.posLimitCounterID = posLimitCounterID
pubDef.channelStatusIndicatorID = channelStatusIndicatorID
pubDef.buffers = logbuffer.Wrap(logFileName)
pubDef.buffers.IncRef()
pubDef.origRegID = origRegID

logger.Debugf("Updated publication: %v", pubDef)
Expand Down Expand Up @@ -498,6 +494,7 @@ func (cc *ClientConductor) OnNewExclusivePublication(streamID int32, sessionID i
pubDef.posLimitCounterID = posLimitCounterID
pubDef.channelStatusIndicatorID = channelStatusIndicatorID
pubDef.buffers = logbuffer.Wrap(logFileName)
pubDef.buffers.IncRef()
pubDef.origRegID = origRegID

logger.Debugf("Updated publication: %v", pubDef)
Expand Down
4 changes: 2 additions & 2 deletions aeron/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int {
return result
}

func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int {
func (image *Image) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int {
if image.IsClosed() {
return 0
}
Expand All @@ -128,7 +128,7 @@ func (image *Image) PollWithContext(handler term.FragmentHandlerWithContext, con
index := indexByPosition(position, image.positionBitsToShift)
termBuffer := image.termBuffers[index]

offset, result := term.ReadWithContext(context, termBuffer, termOffset, handler, fragmentLimit, &image.header)
offset, result := term.ReadWithContext(termBuffer, termOffset, handler, fragmentLimit, &image.header)

newPosition := position + int64(offset-termOffset)
if newPosition > position {
Expand Down
5 changes: 5 additions & 0 deletions aeron/logbuffer/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (hdr *Header) SetReservedValue(reservedValue int64) *Header {
return hdr
}

func (hdr *Header) SetSessionId(value int32) *Header {
hdr.buffer.PutInt32(sessionIdOffset(hdr.offset), value)
return hdr
}

// computePosition computes the current position in absolute number of bytes.
func computePosition(activeTermId int32, termOffset int32, positionBitsToShift int32, initialTermId int32) int64 {
termCount := activeTermId - initialTermId // copes with negative activeTermId on rollover
Expand Down
13 changes: 13 additions & 0 deletions aeron/logbuffer/logbuffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type LogBuffers struct {
mmapFiles []*memmap.File
buffers [PartitionCount + 1]atomic.Buffer
meta LogBufferMetaData
refCount int
}

// Wrap is the factory method wrapping the LogBuffers structure around memory mapped file
Expand Down Expand Up @@ -121,3 +122,15 @@ func (logBuffers *LogBuffers) Close() error {
logBuffers.mmapFiles = nil
return err
}

// IncRef increments the reference count. Returns the current reference count after increment.
func (logBuffers *LogBuffers) IncRef() int {
logBuffers.refCount++
return logBuffers.refCount
}

// DecRef decrements the reference count. Returns the current reference counter after decrement.
func (logBuffers *LogBuffers) DecRef() int {
logBuffers.refCount--
return logBuffers.refCount
}
7 changes: 2 additions & 5 deletions aeron/logbuffer/term/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,8 @@ func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler,
return termOffset, fragmentsRead
}

// FragmentHandlerWithContext provides a FragmentHandler with context
type FragmentHandlerWithContext func(context int64, buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)

// ReadWithContext as for Read() but woth a contextual argument
func ReadWithContext(context int64, termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandlerWithContext, fragmentsLimit int,
func ReadWithContext(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, fragmentsLimit int,
header *logbuffer.Header) (int32, int) {

capacity := termBuffer.Capacity()
Expand All @@ -77,7 +74,7 @@ func ReadWithContext(context int64, termBuffer *atomic.Buffer, termOffset int32,
if !logbuffer.IsPaddingFrame(termBuffer, fragmentOffset) {
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())
header.SetOffset(fragmentOffset)
handler(context, termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
frameLength-logbuffer.DataFrameHeader.Length, header)

fragmentsRead++
Expand Down
16 changes: 13 additions & 3 deletions aeron/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) i
}

// PollWithContext as for Poll() but provides an integer argument for passing contextual information
func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext, context int64, fragmentLimit int) int {
func (sub *Subscription) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int {

img := sub.images.Get()
length := len(img)
Expand All @@ -118,11 +118,11 @@ func (sub *Subscription) PollWithContext(handler term.FragmentHandlerWithContext
}

for i := startingIndex; i < length && fragmentsRead < fragmentLimit; i++ {
fragmentsRead += img[i].PollWithContext(handler, context, fragmentLimit-fragmentsRead)
fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead)
}

for i := 0; i < startingIndex && fragmentsRead < fragmentLimit; i++ {
fragmentsRead += img[i].PollWithContext(handler, context, fragmentLimit-fragmentsRead)
fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead)
}
}

Expand Down Expand Up @@ -171,6 +171,16 @@ func (sub *Subscription) RegistrationID() int64 {
return sub.registrationID
}

// IsConnected returns if this subscription is connected by having at least one open publication Image.
func (sub *Subscription) IsConnected() bool {
for _, image := range sub.images.Get() {
if !image.IsClosed() {
return true
}
}
return false
}

// HasImages is a helper method checking whether this subscription has any images associated with it.
func (sub *Subscription) HasImages() bool {
images := sub.images.Get()
Expand Down
10 changes: 6 additions & 4 deletions archive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ The actual semantics of the security are dependent upon which authenticator supp
* more testing
* archive-media-driver mocking/execution
* test cleanup in the media driver can be problematic
* The archive state is largely unused.
* Add? and use? IsOpen()
* Auth should provide some callout mechanism
* various FIXMEs

Expand All @@ -101,9 +99,13 @@ The actual semantics of the security are dependent upon which authenticator supp
## Release Notes

### 1.0b2
* Fix a race condition looking up correlationIDs
* Handle different archive clients using same channel/stream pairing
* Provide Subscription.IsConnected() and IsRecordingEventsConnected()
* Replace go-logging with zap to avoid reentrancy crashes in logging library
* Improve the logging by downgrading in severity and message tone some warning level messages
* Fix a race condition looking up correlationIDs on ControlResponse
* Fix a return code error in StopRecordingById()
* Fix unused argumentin StopRecording()
* Cosmetic improvements for golint and staticcheck
* Improve the logging by downgrading in severity and message some warning only messages
* Make the Listeners used for async events be per archive client instead of global

53 changes: 36 additions & 17 deletions archive/archive.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2021 Talos, Inc.
// Copyright (C) 2021-2022 Talos, Inc.
// Copyright (C) 2014-2021 Real Logic Limited.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,9 +17,11 @@
package archive

import (
"errors"
"fmt"
"github.com/lirm/aeron-go/aeron"
"github.com/lirm/aeron-go/aeron/atomic"
"github.com/lirm/aeron-go/aeron/logbuffer"
"github.com/lirm/aeron-go/aeron/logging"
"github.com/lirm/aeron-go/archive/codecs"
"sync"
Expand All @@ -35,6 +37,7 @@ type Archive struct {
Proxy *Proxy // For outgoing protocol messages (publish/request)
Control *Control // For incoming protocol messages (subscribe/reponse)
Events *RecordingEventsAdapter // For async recording events (must be enabled)
Listeners *ArchiveListeners // Per client event listeners for async callbacks
}

// Constant values used to control behaviour of StartReplay
Expand All @@ -44,7 +47,7 @@ const (
RecordingLengthMax = int64(2<<31 - 1) // Replay the whole stream
)

// replication flag used fir duplication instead of extension, see Replicate and variants
// replication flag used for duplication instead of extension, see Replicate and variants
const (
RecordingIdNullValue = int32(-1)
)
Expand All @@ -54,7 +57,6 @@ const (
// within the the FragmentAssemblers without any user data (or other
// context). Listeners.ErrorListener() if set will be called if for
// example protocol unmarshalling goes wrong.
var Listeners *ArchiveListeners

// ArchiveListeners contains all the callbacks
// By default only the ErrorListener is set to a logging listener. If
Expand Down Expand Up @@ -223,29 +225,29 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) {
archive.Events.archive = archive

// Create the listeners and populate
Listeners = new(ArchiveListeners)
Listeners.ErrorListener = LoggingErrorListener
archive.Listeners = new(ArchiveListeners)
archive.Listeners.ErrorListener = LoggingErrorListener
archive.SetAeronErrorHandler(LoggingErrorListener)

// In Debug mode initialize our listeners with simple loggers
// Note that these actually log at INFO so you can do this manually for INFO if you like
if logging.GetLevel("archive") >= logging.DEBUG {
logger.Debugf("Setting logging listeners")

Listeners.RecordingEventStartedListener = LoggingRecordingEventStartedListener
Listeners.RecordingEventProgressListener = LoggingRecordingEventProgressListener
Listeners.RecordingEventStoppedListener = LoggingRecordingEventStoppedListener
archive.Listeners.RecordingEventStartedListener = LoggingRecordingEventStartedListener
archive.Listeners.RecordingEventProgressListener = LoggingRecordingEventProgressListener
archive.Listeners.RecordingEventStoppedListener = LoggingRecordingEventStoppedListener

Listeners.RecordingSignalListener = LoggingRecordingSignalListener
archive.Listeners.RecordingSignalListener = LoggingRecordingSignalListener

Listeners.AvailableImageListener = LoggingAvailableImageListener
Listeners.UnavailableImageListener = LoggingUnavailableImageListener
archive.Listeners.AvailableImageListener = LoggingAvailableImageListener
archive.Listeners.UnavailableImageListener = LoggingUnavailableImageListener

Listeners.NewSubscriptionListener = LoggingNewSubscriptionListener
Listeners.NewPublicationListener = LoggingNewPublicationListener
archive.Listeners.NewSubscriptionListener = LoggingNewSubscriptionListener
archive.Listeners.NewPublicationListener = LoggingNewPublicationListener

archive.aeronContext.NewSubscriptionHandler(Listeners.NewSubscriptionListener)
archive.aeronContext.NewPublicationHandler(Listeners.NewPublicationListener)
archive.aeronContext.NewSubscriptionHandler(archive.Listeners.NewSubscriptionListener)
archive.aeronContext.NewPublicationHandler(archive.Listeners.NewPublicationListener)
}

// Connect the underlying aeron
Expand Down Expand Up @@ -283,8 +285,13 @@ func NewArchive(options *Options, context *aeron.Context) (*Archive, error) {
}

start := time.Now()
pollContext := PollContext{archive.Control, correlationID}

for archive.Control.State.state != ControlStateConnected && archive.Control.State.err == nil {
fragments := archive.Control.PollWithContext(ConnectionControlFragmentHandler, correlationID, 1)
fragments := archive.Control.PollWithContext(
func(buf *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) {
ConnectionControlFragmentHandler(&pollContext, buf, offset, length, header)
}, 1)
if fragments > 0 {
logger.Debugf("Read %d fragment(s)\n", fragments)
}
Expand Down Expand Up @@ -362,6 +369,15 @@ func (archive *Archive) EnableRecordingEvents() {
logger.Debugf("RecordingEvents subscription: %#v", archive.Events.Subscription)
}

// IsRecordingEventsConnected returns true if the recording events subscription
// is connected.
func (archive *Archive) IsRecordingEventsConnected() (bool, error) {
if !archive.Events.Enabled {
return false, errors.New("recording events not enabled")
}
return archive.Events.Subscription.IsConnected(), nil
}

// DisableRecordingEvents stops recording events flowing
func (archive *Archive) DisableRecordingEvents() {
archive.Events.Subscription.Close()
Expand All @@ -371,7 +387,7 @@ func (archive *Archive) DisableRecordingEvents() {

// RecordingEventsPoll is used to poll for recording events
func (archive *Archive) RecordingEventsPoll() int {
return archive.Events.Poll(nil, 1)
return archive.Events.PollWithContext(nil, 1)
}

// AddSubscription will add a new subscription to the driver.
Expand Down Expand Up @@ -468,6 +484,9 @@ func (archive *Archive) StopRecordingByIdentity(recordingID int64) (bool, error)
logger.Debugf("StopRecordingByIdentity result was %d\n", res)
}

if err != nil {
return false, err
}
return res >= 0, err
}

Expand Down
Loading

0 comments on commit c5633e8

Please sign in to comment.