From 435fa22dd40b4860ec06d1074dc3c864f8968416 Mon Sep 17 00:00:00 2001 From: Marc Coury Date: Fri, 7 May 2021 12:57:27 +0100 Subject: [PATCH] only retry sending a batch after an invalid sequence token once (#8) * only retry sending a batch after an invalid sequence token once * minor tweak * Try editing github workflow yml * try one more time to fix linter * Update changelog --- .github/workflows/golangci-lint.yml | 4 ++-- CHANGELOG.md | 6 ++++++ cloudwatch_writer.go | 15 ++++++++------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 94acc07..90026fc 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -22,6 +22,6 @@ jobs: - name: Report coverage run: bash <(curl -s https://codecov.io/bash) -t 50f54c52-6302-41a7-a8f7-9835c21b53f6 - name: golangci-lint - uses: golangci/golangci-lint-action@v1 + uses: golangci/golangci-lint-action@v2.3.0 with: - version: v1.27 \ No newline at end of file + version: v1.35 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c125806..4361a56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.4] - 2021-05-07 + +### Fixed + +- Only allow one retry after an out of sequence token error. + ## [0.2.3] - 2020-08-17 ### Fixed diff --git a/cloudwatch_writer.go b/cloudwatch_writer.go index fa4a613..f23a3f6 100644 --- a/cloudwatch_writer.go +++ b/cloudwatch_writer.go @@ -163,7 +163,7 @@ func (c *CloudWatchWriter) queueMonitor() { for { if time.Now().After(nextSendTime) { - c.sendBatch(batch) + c.sendBatch(batch, 0) batch = nil batchSize = 0 nextSendTime.Add(c.getBatchInterval()) @@ -173,7 +173,7 @@ func (c *CloudWatchWriter) queueMonitor() { if item == nil { // Empty queue, means no logs to process if c.isClosing() { - c.sendBatch(batch) + c.sendBatch(batch, 0) // At this point we've processed all the logs and can safely // close. close(c.done) @@ -193,7 +193,7 @@ func (c *CloudWatchWriter) queueMonitor() { // Send the batch before adding the next message, if the message would // push it over the 1MB limit on batch size. if batchSize+messageSize > batchSizeLimit { - c.sendBatch(batch) + c.sendBatch(batch, 0) batch = nil batchSize = 0 nextSendTime = time.Now().Add(c.getBatchInterval()) @@ -203,7 +203,7 @@ func (c *CloudWatchWriter) queueMonitor() { batchSize += messageSize if len(batch) >= maxNumLogEvents { - c.sendBatch(batch) + c.sendBatch(batch, 0) batch = nil batchSize = 0 nextSendTime = time.Now().Add(c.getBatchInterval()) @@ -211,8 +211,9 @@ func (c *CloudWatchWriter) queueMonitor() { } } -func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent) { - if len(batch) == 0 { +// Only allow 1 retry of an invalid sequence token. +func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent, retryNum int) { + if retryNum > 1 || len(batch) == 0 { return } @@ -227,7 +228,7 @@ func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent) { if err != nil { if invalidSequenceTokenErr, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok { c.setNextSequenceToken(invalidSequenceTokenErr.ExpectedSequenceToken) - c.sendBatch(batch) + c.sendBatch(batch, retryNum+1) return } c.setErr(err)