Skip to content

Commit 083126f

Browse files
authored
Merge pull request #46 from billsegall/archive-beta2
Archive beta2
2 parents ed3a94e + 2518123 commit 083126f

21 files changed

+1228
-901
lines changed

aeron/image.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,25 @@ func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int {
118118
return result
119119
}
120120

121+
func (image *Image) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int {
122+
if image.IsClosed() {
123+
return 0
124+
}
125+
126+
position := image.subscriberPosition.get()
127+
termOffset := int32(position) & image.termLengthMask
128+
index := indexByPosition(position, image.positionBitsToShift)
129+
termBuffer := image.termBuffers[index]
130+
131+
offset, result := term.ReadWithContext(termBuffer, termOffset, handler, fragmentLimit, &image.header)
132+
133+
newPosition := position + int64(offset-termOffset)
134+
if newPosition > position {
135+
image.subscriberPosition.set(newPosition)
136+
}
137+
return result
138+
}
139+
121140
// Position returns the position this Image has been consumed to by the subscriber.
122141
func (image *Image) Position() int64 {
123142
if image.IsClosed() {

aeron/logbuffer/term/reader.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,32 @@ func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler,
5454

5555
return termOffset, fragmentsRead
5656
}
57+
58+
// ReadWithContext as for Read() but woth a contextual argument
59+
func ReadWithContext(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, fragmentsLimit int,
60+
header *logbuffer.Header) (int32, int) {
61+
62+
capacity := termBuffer.Capacity()
63+
64+
var fragmentsRead int
65+
for fragmentsRead < fragmentsLimit && termOffset < capacity {
66+
frameLength := logbuffer.GetFrameLength(termBuffer, termOffset)
67+
if frameLength <= 0 {
68+
break
69+
}
70+
71+
fragmentOffset := termOffset
72+
termOffset += util.AlignInt32(frameLength, logbuffer.FrameAlignment)
73+
74+
if !logbuffer.IsPaddingFrame(termBuffer, fragmentOffset) {
75+
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())
76+
header.SetOffset(fragmentOffset)
77+
handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
78+
frameLength-logbuffer.DataFrameHeader.Length, header)
79+
80+
fragmentsRead++
81+
}
82+
}
83+
84+
return termOffset, fragmentsRead
85+
}

aeron/subscription.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,33 @@ func (sub *Subscription) Poll(handler term.FragmentHandler, fragmentLimit int) i
102102
return fragmentsRead
103103
}
104104

105+
// PollWithContext as for Poll() but provides an integer argument for passing contextual information
106+
func (sub *Subscription) PollWithContext(handler term.FragmentHandler, fragmentLimit int) int {
107+
108+
img := sub.images.Get()
109+
length := len(img)
110+
var fragmentsRead int
111+
112+
if length > 0 {
113+
startingIndex := sub.roundRobinIndex
114+
sub.roundRobinIndex++
115+
if startingIndex >= length {
116+
sub.roundRobinIndex = 0
117+
startingIndex = 0
118+
}
119+
120+
for i := startingIndex; i < length && fragmentsRead < fragmentLimit; i++ {
121+
fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead)
122+
}
123+
124+
for i := 0; i < startingIndex && fragmentsRead < fragmentLimit; i++ {
125+
fragmentsRead += img[i].PollWithContext(handler, fragmentLimit-fragmentsRead)
126+
}
127+
}
128+
129+
return fragmentsRead
130+
}
131+
105132
func (sub *Subscription) hasImage(sessionID int32) bool {
106133
img := sub.images.Get()
107134
for _, image := range img {
@@ -144,6 +171,16 @@ func (sub *Subscription) RegistrationID() int64 {
144171
return sub.registrationID
145172
}
146173

174+
// IsConnected returns if this subscription is connected by having at least one open publication Image.
175+
func (sub *Subscription) IsConnected() bool {
176+
for _, image := range sub.images.Get() {
177+
if !image.IsClosed() {
178+
return true
179+
}
180+
}
181+
return false
182+
}
183+
147184
// HasImages is a helper method checking whether this subscription has any images associated with it.
148185
func (sub *Subscription) HasImages() bool {
149186
images := sub.images.Get()

archive/README.md

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ protocol](http://github.com/real-logic/aeron/blob/master/aeron-archive/src/main/
77
is specified in xml using the [Simple Binary Encoding (SBE)](https://github.com/real-logic/simple-binary-encoding)
88

99
## Current State
10-
The implementation is the first beta release. The API will be changed only if required for bugfixes.
10+
The implementation is the second beta release. The API may still be changed for bug fixes or significant issues.
1111

1212
# Design
1313

@@ -22,21 +22,21 @@ the archive library is a layering on top of that.
2222

2323
Finally golang idioms are used where reasonable.
2424

25-
The archive library does not lock and concurrent calls to archive
26-
library calls that invoke the aeron-archive protocol calls should be
27-
externally locked to ensure only one concuurrent access.
25+
The archive library does not lock and concurrent calls on an archive
26+
client to invoke the aeron-archive protocol calls should be externally
27+
locked to ensure only one concurrent access.
2828

2929
### Naming and other choices
3030

3131
Function names used in archive.go which contains the main API are
3232
based on the Java names so that developers can more easily switch
33-
between langugages and so that any API documentation is more useful
33+
between languages and so that any API documentation is more useful
3434
across implementations. Some differences exist due to capitalization
3535
requirements, lack of polymorphism, etc.
3636

3737
Function names used in encoders.go and proxy.go are based on the
38-
protocol specification. Where the protocol specifies a type that cab
39-
ne naturally repreented in golang, the golang type is used used where
38+
protocol specification. Where the protocol specifies a type that can
39+
be naturally represented in golang, the golang type is used used where
4040
possible until encoding. Examples include the use of `bool` rather than
4141
`BooleanType` and `string` over `[]uint8`
4242

@@ -72,28 +72,40 @@ operations in progress when polling.
7272

7373
## Examples
7474

75-
Examples are provided for a [basic_recording_publisher](examples/basic_recording_publisher/basic_recording_publisher.go) and [basic_replayed_subscriber](examples/basic_replayed_subscriber/basic_replayed_subscriber.go) that interoperate with the Java examples
75+
Examples are provided for a [basic_recording_publisher](examples/basic_recording_publisher/basic_recording_publisher.go) and [basic_replayed_subscriber](examples/basic_replayed_subscriber/basic_replayed_subscriber.go) that interoperate with the Java examples.
7676

7777
## Security
7878

7979
Enabling security is done via setting the various auth options. [config_test.go](config_test.go) and [archive_test.go](archive_test.go) provide an example.
8080

81-
The actual semantics of the security are dependent upon which authenticator aupplier you use and is tested agains [secure-logging-archiving-media-driver](secure-logging-archiving-media-driver).
81+
The actual semantics of the security are dependent upon which authenticator supplier you use and is tested against [secure-logging-archiving-media-driver](secure-logging-archiving-media-driver).
8282

8383
# Backlog
8484
* godoc improvements
8585
* more testing
8686
* archive-media-driver mocking/execution
8787
* test cleanup in the media driver can be problematic
88-
* The archive state is largely unused.
89-
* Add? and use? IsOpen()
9088
* Auth should provide some callout mechanism
9189
* various FIXMEs
9290

9391
# Bigger picture issues
9492
* Decided not to do locking in sync api, could subsequently add locks, or just async with locks.
9593
It may be that the control marshaller should be parameterized for this.
9694
* Java and C++ poll the counters to determine when a recording has actually started but the counters are not
97-
availabe in go. As a result we use delays and 'hope' which isn't ideal.
95+
available in go. As a result we use delays and 'hope' which isn't ideal.
9896
* It would be nice to silence the OnAvailableCounter noise
9997
* Within aeron-go there are cases of Log.Fatalf(), see for example trying to add a publication on a "bogus" channel.
98+
99+
## Release Notes
100+
101+
### 1.0b2
102+
* Handle different archive clients using same channel/stream pairing
103+
* Provide Subscription.IsConnected() and IsRecordingEventsConnected()
104+
* Replace go-logging with zap to avoid reentrancy crashes in logging library
105+
* Improve the logging by downgrading in severity and message tone some warning level messages
106+
* Fix a race condition looking up correlationIDs on ControlResponse
107+
* Fix a return code error in StopRecordingById()
108+
* Fix unused argumentin StopRecording()
109+
* Cosmetic improvements for golint and staticcheck
110+
* Make the Listeners used for async events be per archive client instead of global
111+

0 commit comments

Comments
 (0)