-
Notifications
You must be signed in to change notification settings - Fork 5
/
firehose.go
58 lines (47 loc) · 1.42 KB
/
firehose.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main
import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/firehose"
"log"
)
type FirehoseWriter interface {
write(entries [][]byte) (failedPutCount int64, err error)
}
type defaultFirehoseWriter struct {
FirehoseWriter
deliveryStreamName string
firehoseSvc *firehose.Firehose
}
func NewDefaultFirehoseWriter(cfg aws.Config, deliveryStreamName string) FirehoseWriter {
return &defaultFirehoseWriter{
deliveryStreamName: deliveryStreamName,
firehoseSvc: firehose.New(cfg),
}
}
func (writer *defaultFirehoseWriter) write(entries [][]byte) (failedPutCount int64, err error) {
var records []firehose.Record
for _, entry := range entries {
records = append(records, firehose.Record{Data: entry})
}
input := firehose.PutRecordBatchInput{
DeliveryStreamName: &writer.deliveryStreamName,
Records: records,
}
request := writer.firehoseSvc.PutRecordBatchRequest(&input)
output, err := request.Send()
if output != nil && output.FailedPutCount != nil {
failedPutCount = *output.FailedPutCount
}
log.Printf("sent %v records to %v, %v failed", len(entries), writer.deliveryStreamName, failedPutCount)
return
}
type mockFirehoseWriter struct {
records [][]byte
}
func (writer *mockFirehoseWriter) write(records [][]byte) (int64, error) {
writer.records = append(writer.records, records...)
return 0, nil
}
func NewMockFirehoseWriter() *mockFirehoseWriter {
return &mockFirehoseWriter{}
}