Skip to content

Commit

Permalink
add in max number of log events per batch (#3)
Browse files Browse the repository at this point in the history
* add back in max number of log events per batch

* hit 10k limit

* Fix test

* update nextSendTime in queueMonitor after sending a batch

* Make the tests more robust

* update changelog

* Improve 1MB test case

* Update filename in test
  • Loading branch information
mec07 authored Aug 14, 2020
1 parent 733dd57 commit 7f49945
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.1] - 2020-08-14

### Changed

- The writer now respects the 10k limit set by AWS for batch size (as well as the 1MB limit).

## [0.1.0] - 2020-08-14

### Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ logger := zerolog.New(zerolog.MultiLevelWriter(consoleWriter, cloudWatchWriter))
### Changing the default settings

#### Batch interval
The logs are sent in batches because AWS has a maximum of 5 PutLogEvents requests per second per log stream (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html).
The logs are sent in batches because AWS has a maximum of 5 PutLogEvents requests per second per log stream (https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html).
The default value of the batch period is 5 seconds, which means it will send the a batch of logs at least once every 5 seconds.
Batches of logs will be sent earlier if the size of the collected logs exceeds 1MB (another AWS restriction).
To change the batch frequency, you can set the time interval between batches to a smaller or larger value, e.g. 1 second:
Expand Down
20 changes: 17 additions & 3 deletions cloudwatch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ const (
// defaultBatchInterval is 5 seconds.
defaultBatchInterval time.Duration = 5000000000
// batchSizeLimit is 1MB in bytes, the limit imposed by AWS CloudWatch Logs
// on the size the batch of logs we send.
// on the size the batch of logs we send, see:
// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
batchSizeLimit = 1048576
// maxNumLogEvents is the maximum number of messages that can be sent in one
// batch, also an AWS limitation, see:
// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
maxNumLogEvents = 10000
// additionalBytesPerLogEvent is the number of additional bytes per log
// event, other than the length of the log message.
additionalBytesPerLogEvent = 36
// event, other than the length of the log message, see:
// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
additionalBytesPerLogEvent = 26
)

// CloudWatchLogsClient represents the AWS cloudwatchlogs client that we need to talk to CloudWatch
Expand Down Expand Up @@ -176,10 +182,18 @@ func (c *CloudWatchWriter) queueMonitor() {
c.sendBatch(batch)
batch = nil
batchSize = 0
nextSendTime = time.Now().Add(c.getBatchInterval())
}

batch = append(batch, logEvent)
batchSize += messageSize

if len(batch) >= maxNumLogEvents {
c.sendBatch(batch)
batch = nil
batchSize = 0
nextSendTime = time.Now().Add(c.getBatchInterval())
}
}
}

Expand Down
60 changes: 53 additions & 7 deletions cloudwatch_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ func TestCloudWatchWriterBatchInterval(t *testing.T) {
assert.Equal(t, 1, client.numLogs())
}

// Hit the 1MB log limit to trigger an earlier batch
func TestCloudWatchWriterBulk(t *testing.T) {
// Hit the 1MB limit on batch size of logs to trigger an earlier batch
func TestCloudWatchWriterHit1MBLimit(t *testing.T) {
client := &mockClient{}

cloudWatchWriter, err := zerolog2cloudwatch.NewWriterWithClient(client, 200*time.Millisecond, "logGroup", "logStream")
Expand All @@ -290,20 +290,24 @@ func TestCloudWatchWriterBulk(t *testing.T) {
}
defer cloudWatchWriter.Close()

// give the queueMonitor goroutine time to start up
time.Sleep(time.Millisecond)

logs := logsContainer{}
numLogs := 10000
numLogs := 9999
for i := 0; i < numLogs; i++ {
aLog := exampleLog{
Time: "2009-11-10T23:00:02.043123061Z",
Message: fmt.Sprintf("Test message %d", i),
Filename: "filename",
Message: fmt.Sprintf("longggggggggggggggggggggggggggg test message %d", i),
Filename: "/home/deadpool/src/github.com/superlongfilenameblahblahblahblah.txt",
Port: 666,
}
helperWriteLogs(t, cloudWatchWriter, aLog)
logs.addLog(aLog)
}

// Main assertion is that we are triggering a batch early as we're sending so much data
// Main assertion is that we are triggering a batch early as we're sending
// so much data
assert.True(t, client.numLogs() > 0)

if err = client.waitForLogs(numLogs, 200*time.Millisecond); err != nil {
Expand All @@ -318,6 +322,48 @@ func TestCloudWatchWriterBulk(t *testing.T) {
assertEqualLogMessages(t, expectedLogs, client.getLogEvents())
}

// Hit the 10k limit on number of logs to trigger an earlier batch
func TestCloudWatchWriterHit10kLimit(t *testing.T) {
client := &mockClient{}

cloudWatchWriter, err := zerolog2cloudwatch.NewWriterWithClient(client, 200*time.Millisecond, "logGroup", "logStream")
if err != nil {
t.Fatalf("NewWriterWithClient: %v", err)
}
defer cloudWatchWriter.Close()

// give the queueMonitor goroutine time to start up
time.Sleep(time.Millisecond)

var expectedLogs []*cloudwatchlogs.InputLogEvent
numLogs := 10000
for i := 0; i < numLogs; i++ {
message := fmt.Sprintf("hello %d", i)
_, err = cloudWatchWriter.Write([]byte(message))
if err != nil {
t.Fatalf("cloudWatchWriter.Write: %v", err)
}
expectedLogs = append(expectedLogs, &cloudwatchlogs.InputLogEvent{
Message: aws.String(message),
Timestamp: aws.Int64(time.Now().UTC().UnixNano() / int64(time.Millisecond)),
})
}

// give the queueMonitor goroutine time to catch-up (sleep is far less than
// the minimum of 200 milliseconds)
time.Sleep(10 * time.Millisecond)

// Main assertion is that we are triggering a batch early as we're sending
// so many logs
assert.True(t, client.numLogs() > 0)

if err = client.waitForLogs(numLogs, 200*time.Millisecond); err != nil {
t.Fatal(err)
}

assertEqualLogMessages(t, expectedLogs, client.getLogEvents())
}

func TestCloudWatchWriterParallel(t *testing.T) {
client := &mockClient{}

Expand All @@ -343,7 +389,7 @@ func TestCloudWatchWriterParallel(t *testing.T) {
}

// allow more time as there are a lot of goroutines to set off!
if err = client.waitForLogs(numLogs, 2*time.Second); err != nil {
if err = client.waitForLogs(numLogs, 4*time.Second); err != nil {
t.Fatal(err)
}

Expand Down

0 comments on commit 7f49945

Please sign in to comment.