Skip to content

Commit

Permalink
Fix the invalid sequence token exception (#7)
Browse files Browse the repository at this point in the history
* Fix the invalid sequence token exception

* Update changelog

* Put in a proper test for an invalid sequence token exception
  • Loading branch information
mec07 authored Aug 17, 2020
1 parent e19201c commit 044b560
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.3] - 2020-08-17

### Fixed

- Fix the problem of getting out of sequence from the next sequence token.

## [0.2.2] - 2020-08-14

### Changed
Expand Down
25 changes: 22 additions & 3 deletions cloudwatch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewWithClient(client CloudWatchLogsClient, batchInterval time.Duration, log
if err != nil {
return nil, err
}
writer.nextSequenceToken = logStream.UploadSequenceToken
writer.setNextSequenceToken(logStream.UploadSequenceToken)

go writer.queueMonitor()

Expand Down Expand Up @@ -124,6 +124,20 @@ func (c *CloudWatchWriter) getErr() error {
return c.err
}

func (c *CloudWatchWriter) setNextSequenceToken(next *string) {
c.Lock()
defer c.Unlock()

c.nextSequenceToken = next
}

func (c *CloudWatchWriter) getNextSequenceToken() *string {
c.RLock()
defer c.RUnlock()

return c.nextSequenceToken
}

// Write implements the io.Writer interface.
func (c *CloudWatchWriter) Write(log []byte) (int, error) {
event := &cloudwatchlogs.InputLogEvent{
Expand Down Expand Up @@ -206,15 +220,20 @@ func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent) {
LogEvents: batch,
LogGroupName: c.logGroupName,
LogStreamName: c.logStreamName,
SequenceToken: c.nextSequenceToken,
SequenceToken: c.getNextSequenceToken(),
}

output, err := c.client.PutLogEvents(input)
if err != nil {
if invalidSequenceTokenErr, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok {
c.setNextSequenceToken(invalidSequenceTokenErr.ExpectedSequenceToken)
c.sendBatch(batch)
return
}
c.setErr(err)
return
}
c.nextSequenceToken = output.NextSequenceToken
c.setNextSequenceToken(output.NextSequenceToken)
}

// Close blocks until the writer has completed writing the logs to CloudWatch.
Expand Down
70 changes: 68 additions & 2 deletions cloudwatch_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (
"github.com/stretchr/testify/assert"
)

const sequenceToken = "next-sequence-token"

type mockClient struct {
sync.RWMutex
putLogEventsShouldError bool
logEvents []*cloudwatchlogs.InputLogEvent
logGroupName *string
logStreamName *string
expectedSequenceToken *string
}

func (c *mockClient) DescribeLogStreams(*cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) {
Expand All @@ -35,7 +38,8 @@ func (c *mockClient) DescribeLogStreams(*cloudwatchlogs.DescribeLogStreamsInput)
var streams []*cloudwatchlogs.LogStream
if c.logStreamName != nil {
streams = append(streams, &cloudwatchlogs.LogStream{
LogStreamName: c.logStreamName,
LogStreamName: c.logStreamName,
UploadSequenceToken: c.expectedSequenceToken,
})
}

Expand Down Expand Up @@ -71,9 +75,22 @@ func (c *mockClient) PutLogEvents(putLogEvents *cloudwatchlogs.PutLogEventsInput
return nil, errors.New("received nil *cloudwatchlogs.PutLogEventsInput")
}

// At the first PutLogEvents c.expectedSequenceToken should be nil, as we
// set it in this call. If it is not nil then we can compare the received
// sequence token and the expected one.
if c.expectedSequenceToken != nil {
if putLogEvents.SequenceToken == nil || *putLogEvents.SequenceToken != *c.expectedSequenceToken {
return nil, &cloudwatchlogs.InvalidSequenceTokenException{
ExpectedSequenceToken: c.expectedSequenceToken,
}
}
} else {
c.expectedSequenceToken = aws.String(sequenceToken)
}

c.logEvents = append(c.logEvents, putLogEvents.LogEvents...)
output := &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String("next sequence token"),
NextSequenceToken: c.expectedSequenceToken,
}
return output, nil
}
Expand All @@ -88,6 +105,13 @@ func (c *mockClient) getLogEvents() []*cloudwatchlogs.InputLogEvent {
return logEvents
}

func (c *mockClient) setExpectedSequenceToken(token *string) {
c.Lock()
defer c.Unlock()

c.expectedSequenceToken = token
}

func (c *mockClient) waitForLogs(numberOfLogs int, timeout time.Duration) error {
endTime := time.Now().Add(timeout)
for {
Expand Down Expand Up @@ -462,3 +486,45 @@ func TestCloudWatchWriterReportError(t *testing.T) {
t.Fatal("expected the last error from PutLogEvents to appear here")
}
}

func TestCloudWatchWriterReceiveInvalidSequenceTokenException(t *testing.T) {
// Setup
client := &mockClient{}

cloudWatchWriter, err := cloudwatchwriter.NewWithClient(client, 200*time.Millisecond, "logGroup", "logStream")
if err != nil {
t.Fatalf("NewWithClient: %v", err)
}
defer cloudWatchWriter.Close()

// give the queueMonitor goroutine time to start up
time.Sleep(time.Millisecond)

// At this point the cloudWatchWriter should have the normal sequence token.
// So we change the mock cloudwatch client to expect a different sequence
// token:
client.setExpectedSequenceToken(aws.String("new sequence token"))

// Action
logs := logsContainer{}
log := exampleLog{
Time: "2009-11-10T23:00:02.043123061Z",
Message: "Test message 1",
Filename: "filename",
Port: 666,
}

helperWriteLogs(t, cloudWatchWriter, log)
logs.addLog(log)

// Result
if err = client.waitForLogs(1, 201*time.Millisecond); err != nil {
t.Fatal(err)
}

expectedLogs, err := logs.getLogEvents()
if err != nil {
t.Fatal(err)
}
assertEqualLogMessages(t, expectedLogs, client.getLogEvents())
}

0 comments on commit 044b560

Please sign in to comment.