Skip to content

Commit

Permalink
Add SetErrorHandler for user defined error handling (#11)
Browse files Browse the repository at this point in the history
* Add SetErrorHandler for user defined error handling

* Make the tests more robust

* Update the github action

* Remove check of error being nil as it's unnecessary

* Make tests even more robust

* Make tests even more robust as race can really mess with them
  • Loading branch information
mec07 authored Sep 16, 2022
1 parent 435fa22 commit 5c55755
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 25 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Install Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.14.x
go-version: 1.17
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Coverage
run: go test -v -race -covermode=atomic -coverprofile=cover.out -timeout 10s ./...
- name: Report coverage
run: bash <(curl -s https://codecov.io/bash) -t 50f54c52-6302-41a7-a8f7-9835c21b53f6
- name: golangci-lint
uses: golangci/golangci-lint-action@v2.3.0
uses: golangci/golangci-lint-action@v3
with:
version: v1.35
version: v1.49
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.5] - 2022-09-16

### Added

- Added a SetErrorHandler method on the CloudWatchWriter to allow user defined error handling.

## [0.2.4] - 2021-05-07

### Fixed
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,17 @@ The batch interval is not guaranteed as two things can alter how often the batch
- as soon as 1MB of logs or 10k logs have accumulated, they are sent (due to AWS restrictions on batch size);
- we have to send the batches in sequence (an AWS restriction) so a long running request to CloudWatch can delay the next batch.

#### Error Handler
Sometimes there are problems communicating with CloudWatch and you may want to do something in these circumstances (e.g. you could even create a new writer, Close() the old one, and attach the new one to your logger).
To facilitate user defined error handling you can set an error handler on the CloudWatchWriter:
```
cloudWatchWriter.SetErrorHandler(func(err error) {
// do something
})
```


## Acknowledgements
Much thanks has to go to the creator of `zerolog` (https://github.com/rs/zerolog), for creating such a good logger.
Thanks must go to the writer of `logrus-cloudwatchlogs` (https://github.com/kdar/logrus-cloudwatchlogs) as I found it a helpful resource for interfacing with `cloudwatchlogs`.
Thanks also goes to the writer of this: https://gist.github.com/asdine/f821abe6189a04250ae61b77a3048bd9, which I also found helpful for extracting logs from `zerolog`.
Thanks also goes to the writer of this: https://gist.github.com/asdine/f821abe6189a04250ae61b77a3048bd9, which I also found helpful for extracting logs from `zerolog`.
39 changes: 36 additions & 3 deletions cloudwatch_writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudwatchwriter

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -52,6 +53,7 @@ type CloudWatchWriter struct {
nextSequenceToken *string
closing bool
done chan struct{}
errorHandler func(error)
}

// New returns a pointer to a CloudWatchWriter struct, or an error.
Expand Down Expand Up @@ -89,13 +91,19 @@ func NewWithClient(client CloudWatchLogsClient, batchInterval time.Duration, log
// CloudWatch.
func (c *CloudWatchWriter) SetBatchInterval(interval time.Duration) error {
if interval < minBatchInterval {
return errors.New("supplied batch interval is less than the minimum")
return fmt.Errorf("supplied batch interval, %dms, is less than the minimum, %dms", interval.Milliseconds(), minBatchInterval.Milliseconds())
}

c.setBatchInterval(interval)
return nil
}

// SetErrorHandler adds a function to be run every time there is an error
// sending logs to CloudWatch.
func (c *CloudWatchWriter) SetErrorHandler(handler func(error)) {
c.setErrorHandler(handler)
}

func (c *CloudWatchWriter) setBatchInterval(interval time.Duration) {
c.Lock()
defer c.Unlock()
Expand All @@ -110,6 +118,27 @@ func (c *CloudWatchWriter) getBatchInterval() time.Duration {
return c.batchInterval
}

func (c *CloudWatchWriter) setErrorHandler(handler func(error)) {
c.Lock()
defer c.Unlock()

c.errorHandler = handler
}

func (c *CloudWatchWriter) handleError(err error) {
if errHandler := c.getErrorHandler(); errHandler != nil {
errHandler(err)
}
c.setErr(err)
}

func (c *CloudWatchWriter) getErrorHandler() func(error) {
c.RLock()
defer c.RUnlock()

return c.errorHandler
}

func (c *CloudWatchWriter) setErr(err error) {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -213,7 +242,7 @@ func (c *CloudWatchWriter) queueMonitor() {

// Only allow 1 retry of an invalid sequence token.
func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent, retryNum int) {
if retryNum > 1 || len(batch) == 0 {
if len(batch) == 0 {
return
}

Expand All @@ -228,10 +257,14 @@ func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent, retr
if err != nil {
if invalidSequenceTokenErr, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok {
c.setNextSequenceToken(invalidSequenceTokenErr.ExpectedSequenceToken)
if retryNum >= 1 {
c.handleError(err)
return
}
c.sendBatch(batch, retryNum+1)
return
}
c.setErr(err)
c.handleError(err)
return
}
c.setNextSequenceToken(output.NextSequenceToken)
Expand Down
84 changes: 68 additions & 16 deletions cloudwatch_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestCloudWatchWriter(t *testing.T) {
t.Fatal(err)
}

if err = client.waitForLogs(2, 201*time.Millisecond); err != nil {
if err = client.waitForLogs(2, 210*time.Millisecond); err != nil {
t.Fatal(err)
}

Expand All @@ -274,7 +274,7 @@ func TestCloudWatchWriterBatchInterval(t *testing.T) {
}

// setting it to a value greater than or equal to 200 is OK
err = cloudWatchWriter.SetBatchInterval(200 * time.Millisecond)
err = cloudWatchWriter.SetBatchInterval(300 * time.Millisecond)
if err != nil {
t.Fatalf("CloudWatchWriter.SetBatchInterval: %v", err)
}
Expand All @@ -291,17 +291,14 @@ func TestCloudWatchWriterBatchInterval(t *testing.T) {

helperWriteLogs(t, cloudWatchWriter, aLog)

// The client shouldn't have received any logs at this time
assert.Equal(t, 0, client.numLogs())

// Still no logs after 100 milliseconds
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 0, client.numLogs())

// The client should have received the log after another 101 milliseconds
// (as that is a total sleep time of 201 milliseconds)
time.Sleep(101 * time.Millisecond)
assert.Equal(t, 1, client.numLogs())
startTime := time.Now()
if err := client.waitForLogs(1, 310*time.Millisecond); err != nil {
t.Fatal(err)
}
timeTaken := time.Since(startTime)
if timeTaken < 290*time.Millisecond {
t.Fatalf("expected batch interval time to be approximately 300 milliseconds, found: %dms", timeTaken.Milliseconds())
}
}

// Hit the 1MB limit on batch size of logs to trigger an earlier batch
Expand Down Expand Up @@ -334,7 +331,7 @@ func TestCloudWatchWriterHit1MBLimit(t *testing.T) {
// so much data
assert.True(t, client.numLogs() > 0)

if err = client.waitForLogs(numLogs, 400*time.Millisecond); err != nil {
if err = client.waitForLogs(numLogs, 210*time.Millisecond); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -381,7 +378,7 @@ func TestCloudWatchWriterHit10kLimit(t *testing.T) {
// so many logs
assert.True(t, client.numLogs() > 0)

if err = client.waitForLogs(numLogs, 200*time.Millisecond); err != nil {
if err = client.waitForLogs(numLogs, 210*time.Millisecond); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -518,7 +515,7 @@ func TestCloudWatchWriterReceiveInvalidSequenceTokenException(t *testing.T) {
logs.addLog(log)

// Result
if err = client.waitForLogs(1, 201*time.Millisecond); err != nil {
if err = client.waitForLogs(1, 210*time.Millisecond); err != nil {
t.Fatal(err)
}

Expand All @@ -528,3 +525,58 @@ func TestCloudWatchWriterReceiveInvalidSequenceTokenException(t *testing.T) {
}
assertEqualLogMessages(t, expectedLogs, client.getLogEvents())
}

type protectedObject struct {
sync.Mutex
haveBeenCalled bool
}

func (p *protectedObject) setCalled() {
p.Lock()
defer p.Unlock()

p.haveBeenCalled = true
}

func (p *protectedObject) getCalled() bool {
p.Lock()
defer p.Unlock()

return p.haveBeenCalled
}

func TestCloudWatchWriterErrorHandler(t *testing.T) {
objectUnderObservation := protectedObject{}
handler := func(error) {
objectUnderObservation.setCalled()
}

client := &mockClient{
putLogEventsShouldError: true,
}

cloudWatchWriter, err := cloudwatchwriter.NewWithClient(client, 200*time.Millisecond, "logGroup", "logStream")
if err != nil {
t.Fatalf("NewWithClient: %v", err)
}
defer cloudWatchWriter.Close()

cloudWatchWriter.SetErrorHandler(handler)

// give the queueMonitor goroutine time to start up
time.Sleep(time.Millisecond)

aLog := exampleLog{
Time: "2009-11-10T23:00:02.043123061Z",
Message: "Test message",
Filename: "filename",
Port: 666,
}

helperWriteLogs(t, cloudWatchWriter, aLog)

// give the cloudWatchWriter time to call PutLogEvents
time.Sleep(201 * time.Millisecond)

assert.True(t, objectUnderObservation.getCalled())
}

0 comments on commit 5c55755

Please sign in to comment.