Skip to content

Commit

Permalink
Make code asynchronous (#2)
Browse files Browse the repository at this point in the history
* Made code asynchronous

* Make close block until finished processing messages

* Protect CloudWatchWriter.Err from race conditions

* make tests more readable

* made tests more reliable
  • Loading branch information
mec07 authored Aug 14, 2020
1 parent 63fb86e commit 733dd57
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 69 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@

All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.0] - 2020-08-14

### Changed

- Now sends logs in batches with a default batch interval of 5 seconds.

### Added

- Added Close method to CloudWatchWriter which blocks until all the messages have been sent.
- Added SetBatchInterval method which easily allows the user to change the interval between sending batches of logs to CloudWatch.

## [0.0.1] - 2020-08-12

### Added

- Basic synchronous implementation of NewWriter and NewWriterWithClient
- Basic synchronous implementation of NewWriter and NewWriterWithClient.

42 changes: 31 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,29 @@ const (
logStreamName = "log-stream-name"
)
func setupZerolog(accessKeyID, secretKey string) error {
// setupZerolog sets up the main zerolog logger to write to CloudWatch instead
// of stdout. It returns an error or the CloudWatchWriter.Close method which
// blocks until all the logs have been processed.
func setupZerolog(accessKeyID, secretKey string) (func(), error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretKey, ""),
})
if err != nil {
return log.Logger, fmt.Errorf("session.NewSession: %w", err)
return nil, fmt.Errorf("session.NewSession: %w", err)
}
cloudwatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
cloudWatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
if err != nil {
return log.Logger, fmt.Errorf("zerolog2cloudwatch.NewWriter: %w", err)
return nil, fmt.Errorf("zerolog2cloudwatch.NewWriter: %w", err)
}
log.Logger = log.Output(cloudwatchWriter)
log.Logger = log.Output(cloudWatchWriter)
return cloudWatchWriter.Close, nil
}
```
If you want to ensure that all your logs are sent to CloudWatch during the shut down sequence of your program then you can `defer` the `cloudWatchWriter.Close()` function in main.
The `Close()` function blocks until all the logs have been processed.
If you prefer to use AWS IAM credentials that are saved in the usual location on your computer then you don't have to specify the credentials, e.g.:
```
sess, err := session.NewSession(&aws.Config{
Expand All @@ -63,33 +69,47 @@ See the example directory for a working example.
### Write to CloudWatch and the console
What I personally prefer is to write to both CloudWatch and the console, e.g.
```
cloudwatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
cloudWatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
if err != nil {
return fmt.Errorf("zerolog2cloudwatch.NewWriter: %w", err)
}
consoleWriter := zerolog.ConsoleWriter{Out: os.Stdout}
log.Logger = log.Output(zerolog.MultiLevelWriter(consoleWriter, cloudwatchWriter))
log.Logger = log.Output(zerolog.MultiLevelWriter(consoleWriter, cloudWatchWriter))
```

### Create a new zerolog Logger
Of course, you can create a new `zerolog.Logger` using this too:
```
cloudwatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
cloudWatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
if err != nil {
return fmt.Errorf("zerolog2cloudwatch.NewWriter: %w", err)
}
logger := zerolog.New(cloudwatchWriter).With().Timestamp().Logger()
logger := zerolog.New(cloudWatchWriter).With().Timestamp().Logger()
```
and of course you can create a new `zerolog.Logger` which can write to both CloudWatch and the console:
```
cloudwatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
cloudWatchWriter, err := zerolog2cloudwatch.NewWriter(sess, logGroupName, logStreamName)
if err != nil {
return fmt.Errorf("zerolog2cloudwatch.NewWriter: %w", err)
}
consoleWriter := zerolog.ConsoleWriter{Out: os.Stdout}
logger := zerolog.New(zerolog.MultiLevelWriter(consoleWriter, cloudwatchWriter)).With().Timestamp().Logger()
logger := zerolog.New(zerolog.MultiLevelWriter(consoleWriter, cloudWatchWriter)).With().Timestamp().Logger()
```

### Changing the default settings

#### Batch interval
The logs are sent in batches because AWS has a maximum of 5 PutLogEvents requests per second per log stream (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html).
The default value of the batch period is 5 seconds, which means it will send the a batch of logs at least once every 5 seconds.
Batches of logs will be sent earlier if the size of the collected logs exceeds 1MB (another AWS restriction).
To change the batch frequency, you can set the time interval between batches to a smaller or larger value, e.g. 1 second:
```
err := cloudWatchWriter.SetBatchInterval(time.Second)
```
If you set it below 200 milliseconds it will return an error.
This interval is not guaranteed as a long running request to CloudWatch could delay to the next batch.
This is because CloudWatch expects to receive logs in sequence and not in parallel, so this has been written to send them in sequence.


## Acknowledgements
Much thanks has to go to the creator of `zerolog` (https://github.com/rs/zerolog), for creating such a good logger.
Expand Down
185 changes: 166 additions & 19 deletions cloudwatch_writer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package zerolog2cloudwatch

import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/pkg/errors"
"gopkg.in/oleiade/lane.v1"
)

const (
// minBatchInterval is 200 ms as the maximum rate of PutLogEvents is 5
// requests per second.
minBatchInterval time.Duration = 200000000
// defaultBatchInterval is 5 seconds.
defaultBatchInterval time.Duration = 5000000000
// batchSizeLimit is 1MB in bytes, the limit imposed by AWS CloudWatch Logs
// on the size the batch of logs we send.
batchSizeLimit = 1048576
// additionalBytesPerLogEvent is the number of additional bytes per log
// event, other than the length of the log message.
additionalBytesPerLogEvent = 36
)

// CloudWatchLogsClient represents the AWS cloudwatchlogs client that we need to talk to CloudWatch
Expand All @@ -20,61 +36,192 @@ type CloudWatchLogsClient interface {

// CloudWatchWriter can be inserted into zerolog to send logs to CloudWatch.
type CloudWatchWriter struct {
client CloudWatchLogsClient
logGroupName *string
logStreamName *string
sequenceToken *string
sync.RWMutex
client CloudWatchLogsClient
batchInterval time.Duration
queue *lane.Queue
err error
logGroupName *string
logStreamName *string
nextSequenceToken *string
closing bool
done chan struct{}
}

// NewWriter returns a pointer to a CloudWatchWriter struct, or an error.
func NewWriter(sess *session.Session, logGroupName, logStreamName string) (*CloudWatchWriter, error) {
return NewWriterWithClient(cloudwatchlogs.New(sess), logGroupName, logStreamName)
return NewWriterWithClient(cloudwatchlogs.New(sess), defaultBatchInterval, logGroupName, logStreamName)
}

// NewWriterWithClient returns a pointer to a CloudWatchWriter struct, or an error.
func NewWriterWithClient(client CloudWatchLogsClient, logGroupName, logStreamName string) (*CloudWatchWriter, error) {
// NewWriterWithClient returns a pointer to a CloudWatchWriter struct, or an
// error.
func NewWriterWithClient(client CloudWatchLogsClient, batchInterval time.Duration, logGroupName, logStreamName string) (*CloudWatchWriter, error) {
writer := &CloudWatchWriter{
client: client,
queue: lane.NewQueue(),
logGroupName: aws.String(logGroupName),
logStreamName: aws.String(logStreamName),
done: make(chan struct{}),
}

err := writer.SetBatchInterval(batchInterval)
if err != nil {
return nil, errors.Wrapf(err, "set batch interval: %v", batchInterval)
}

logStream, err := writer.getOrCreateLogStream()
if err != nil {
return nil, err
}
writer.nextSequenceToken = logStream.UploadSequenceToken

writer.sequenceToken = logStream.UploadSequenceToken
go writer.queueMonitor()

return writer, nil
}

// SetBatchInterval sets the maximum time between batches of logs sent to
// CloudWatch.
func (c *CloudWatchWriter) SetBatchInterval(interval time.Duration) error {
if interval < minBatchInterval {
return errors.New("supplied batch interval is less than the minimum")
}

c.setBatchInterval(interval)
return nil
}

func (c *CloudWatchWriter) setBatchInterval(interval time.Duration) {
c.Lock()
defer c.Unlock()

c.batchInterval = interval
}

func (c *CloudWatchWriter) getBatchInterval() time.Duration {
c.RLock()
defer c.RUnlock()

return c.batchInterval
}

func (c *CloudWatchWriter) setErr(err error) {
c.Lock()
defer c.Unlock()

c.err = err
}

func (c *CloudWatchWriter) getErr() error {
c.RLock()
defer c.RUnlock()

return c.err
}

// Write implements the io.Writer interface.
func (c *CloudWatchWriter) Write(log []byte) (int, error) {
var logEvents []*cloudwatchlogs.InputLogEvent
logEvents = append(logEvents, &cloudwatchlogs.InputLogEvent{
event := &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(log)),
// Timestamp has to be in milliseconds since the epoch
Timestamp: aws.Int64(time.Now().UTC().UnixNano() / int64(time.Millisecond)),
})
}
c.queue.Enqueue(event)

// report last sending error
lastErr := c.getErr()
if lastErr != nil {
c.setErr(nil)
return 0, lastErr
}
return len(log), nil
}

func (c *CloudWatchWriter) queueMonitor() {
var batch []*cloudwatchlogs.InputLogEvent
batchSize := 0
nextSendTime := time.Now().Add(c.getBatchInterval())

for {
if time.Now().After(nextSendTime) {
c.sendBatch(batch)
batch = nil
batchSize = 0
nextSendTime.Add(c.getBatchInterval())
}

item := c.queue.Dequeue()
if item == nil {
// Empty queue, means no logs to process
if c.isClosing() && batch == nil {
// at this point we've processed all the logs and can safely
// close.
close(c.done)
return
}
time.Sleep(time.Millisecond)
continue
}

logEvent, ok := item.(*cloudwatchlogs.InputLogEvent)
if !ok || logEvent.Message == nil {
// This should not happen!
continue
}

messageSize := len(*logEvent.Message) + additionalBytesPerLogEvent
// Send the batch before adding the next message, if the message would
// push it over the 1MB limit on batch size.
if batchSize+messageSize > batchSizeLimit {
c.sendBatch(batch)
batch = nil
batchSize = 0
}

batch = append(batch, logEvent)
batchSize += messageSize
}
}

func (c *CloudWatchWriter) sendBatch(batch []*cloudwatchlogs.InputLogEvent) {
if len(batch) == 0 {
return
}

input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: logEvents,
LogEvents: batch,
LogGroupName: c.logGroupName,
LogStreamName: c.logStreamName,
SequenceToken: c.sequenceToken,
SequenceToken: c.nextSequenceToken,
}

resp, err := c.client.PutLogEvents(input)
output, err := c.client.PutLogEvents(input)
if err != nil {
return 0, errors.Wrap(err, "cloudwatchlogs.Client.PutLogEvents")
c.setErr(err)
return
}
c.nextSequenceToken = output.NextSequenceToken
}

if resp != nil {
c.sequenceToken = resp.NextSequenceToken
}
// Close blocks until the writer has completed writing the logs to CloudWatch.
func (c *CloudWatchWriter) Close() {
c.setClosing()
// block until the done channel is closed
<-c.done
}

return len(log), nil
func (c *CloudWatchWriter) isClosing() bool {
c.RLock()
defer c.RUnlock()

return c.closing
}

func (c *CloudWatchWriter) setClosing() {
c.Lock()
defer c.Unlock()

c.closing = true
}

// getOrCreateLogStream gets info on the log stream for the log group and log
Expand Down
Loading

0 comments on commit 733dd57

Please sign in to comment.