diff --git a/README.md b/README.md index 75c176a..7283267 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. | PluginTagAttribute | attribute name of the message tag | no | | QueueMessageGroupId | the group id required for fifo queues | fifo-only | | ProxyUrl | the proxy address between fluentbit and sqs (if exists) | no | +| BatchSize | set amount of messages to be sent in a batch request | yes | ```conf [SERVICE] @@ -32,6 +33,7 @@ FluntBit custom output plugin which allows sending messages to AWS-SQS. Match * QueueUrl http://aws-sqs-url.com QueueRegion eu-central-1 + BatchSize 10 ``` ## Installation diff --git a/out_sqs.go b/out_sqs.go index 33d9081..05604a2 100644 --- a/out_sqs.go +++ b/out_sqs.go @@ -4,9 +4,10 @@ import ( "C" "errors" "fmt" + "os" + "strconv" "time" "unsafe" - "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -24,7 +25,7 @@ import ( // integer representation for this plugin log level // 0 - debug // 1 - info -// 2 - error +// 2 - error var sqsOutLogLevel int // MessageCounter is used for count the current SQS Batch messages @@ -39,6 +40,7 @@ type sqsConfig struct { mySQS *sqs.SQS pluginTagAttribute string proxyURL string + batchSize int } //export FLBPluginRegister @@ -54,12 +56,14 @@ func FLBPluginInit(plugin unsafe.Pointer) int { queueMessageGroupID := output.FLBPluginConfigKey(plugin, "QueueMessageGroupId") pluginTagAttribute := output.FLBPluginConfigKey(plugin, "PluginTagAttribute") proxyURL := output.FLBPluginConfigKey(plugin, "ProxyUrl") + batchSizeString := output.FLBPluginConfigKey(plugin, "BatchSize") writeInfoLog(fmt.Sprintf("QueueUrl is: %s", queueURL)) writeInfoLog(fmt.Sprintf("QueueRegion is: %s", queueRegion)) writeInfoLog(fmt.Sprintf("QueueMessageGroupId is: %s", queueMessageGroupID)) writeInfoLog(fmt.Sprintf("pluginTagAttribute is: %s", pluginTagAttribute)) writeInfoLog(fmt.Sprintf("ProxyUrl is: %s", proxyURL)) + writeInfoLog(fmt.Sprintf("BatchSize is: %s", batchSizeString)) if queueURL == "" { writeErrorLog(errors.New("QueueUrl configuration key is mandatory")) @@ -78,6 +82,12 @@ func FLBPluginInit(plugin unsafe.Pointer) int { } } + batchSize, err := strconv.Atoi(batchSizeString) + if err != nil || (0 > batchSize && batchSize > 10) { + writeErrorLog(errors.New("BatchSize should be integer value between 1 and 10")) + return output.FLB_ERROR + } + writeInfoLog("retrieving aws credentials from environment variables") awsCredentials := credentials.NewEnvCredentials() var myAWSSession *session.Session @@ -126,6 +136,7 @@ func FLBPluginInit(plugin unsafe.Pointer) int { queueMessageGroupID: queueMessageGroupID, mySQS: sqs.New(myAWSSession), pluginTagAttribute: pluginTagAttribute, + batchSize: batchSize, }) return output.FLB_OK @@ -212,7 +223,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int SqsRecords = append(SqsRecords, sqsRecord) - if MessageCounter == 10 { + if MessageCounter == sqsConf.batchSize { err := sendBatchToSqs(sqsConf, SqsRecords) SqsRecords = nil