Skip to content

Commit

Permalink
fix NaN record marshaling; skip records with marshaling error
Browse files Browse the repository at this point in the history
  • Loading branch information
alexei-led committed May 21, 2023
1 parent 868c050 commit 6307b58
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func run(ctx context.Context, log *logrus.Entry, cfg config.Config) error {
return errors.Wrap(err, "initializing kubernetes client")
}

uploader, err := firehose.NewUploader(ctx, cfg.StreamName)
uploader, err := firehose.NewUploader(ctx, log, cfg.StreamName)
if err != nil {
return errors.Wrap(err, "initializing firehose uploader")
}
Expand Down
47 changes: 14 additions & 33 deletions internal/aws/firehose/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,32 @@ import (
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
"github.com/doitintl/eks-lens-agent/internal/usage"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const maxBatchSize = 500

type Uploader interface {
Upload(ctx context.Context, records []*usage.PodInfo) error
UploadOne(ctx context.Context, record *usage.PodInfo) error
}

type firehoseUploader struct {
Client *firehose.Client
StreamName string
client *firehose.Client
log *logrus.Entry
stream string
}

func NewUploader(ctx context.Context, streamName string) (Uploader, error) {
func NewUploader(ctx context.Context, log *logrus.Entry, streamName string) (Uploader, error) {
// create a new Amazon Kinesis Data Firehose client
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, errors.Wrap(err, "loading AWS config")
}
client := firehose.NewFromConfig(cfg)
return &firehoseUploader{
Client: client,
StreamName: streamName,
client: client,
log: log,
stream: streamName,
}, nil
}

Expand All @@ -53,12 +55,14 @@ func (u *firehoseUploader) Upload(ctx context.Context, records []*usage.PodInfo)
for k := i; k < j; k++ {
buffer, err := json.Marshal(records[k])
if err != nil {
return errors.Wrap(err, "marshaling record")
u.log.WithField("pod", records[k].Name).WithError(err).Error("marshaling record")
continue
}
dst := &bytes.Buffer{}
err = json.Compact(dst, buffer)
if err != nil {
return errors.Wrap(err, "compacting record")
u.log.WithField("pod", records[k].Name).WithError(err).Error("compacting record")
continue
}
batch = append(batch, types.Record{
Data: dst.Bytes(),
Expand All @@ -67,36 +71,13 @@ func (u *firehoseUploader) Upload(ctx context.Context, records []*usage.PodInfo)

// send records[i:j] to Amazon Kinesis Data Firehose
input := &firehose.PutRecordBatchInput{
DeliveryStreamName: aws.String(u.StreamName),
DeliveryStreamName: aws.String(u.stream),
Records: batch,
}
_, err := u.Client.PutRecordBatch(ctx, input)
_, err := u.client.PutRecordBatch(ctx, input)
if err != nil {
return errors.Wrap(err, "putting record batch to Amazon Kinesis Data Firehose")
}
}
return nil
}

func (u *firehoseUploader) UploadOne(ctx context.Context, record *usage.PodInfo) error {
buffer, err := json.Marshal(record)
if err != nil {
return errors.Wrap(err, "marshaling record")
}
dst := &bytes.Buffer{}
err = json.Compact(dst, buffer)
if err != nil {
return errors.Wrap(err, "compacting record")
}
input := &firehose.PutRecordInput{
DeliveryStreamName: aws.String(u.StreamName),
Record: &types.Record{
Data: dst.Bytes(),
},
}
_, err = u.Client.PutRecord(ctx, input)
if err != nil {
return errors.Wrap(err, "putting single record to Amazon Kinesis Data Firehose")
}
return nil
}
25 changes: 11 additions & 14 deletions internal/usage/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,28 +230,25 @@ func GetPodInfo(log *logrus.Entry, pod *v1.Pod, beginTime, endTime time.Time, no
}
record.Node = *node
// calculate pod's allocation requests as a percentage of node's allocatable resources
record.Allocations.Requests.CPU = float64(record.Resources.Requests.CPU) / float64(node.Allocatable.CPU) * 100 //nolint:gomnd
record.Allocations.Requests.Memory = float64(record.Resources.Requests.Memory) / float64(node.Allocatable.Memory) * 100 //nolint:gomnd
if node.Allocatable.CPU > 0 {
record.Allocations.Requests.CPU = float64(record.Resources.Requests.CPU) / float64(node.Allocatable.CPU) * 100 //nolint:gomnd
record.Allocations.Limits.CPU = float64(record.Resources.Limits.CPU) / float64(node.Allocatable.CPU) * 100 //nolint:gomnd
}
if node.Allocatable.Memory > 0 {
record.Allocations.Requests.Memory = float64(record.Resources.Requests.Memory) / float64(node.Allocatable.Memory) * 100 //nolint:gomnd
record.Allocations.Limits.Memory = float64(record.Resources.Limits.Memory) / float64(node.Allocatable.Memory) * 100 //nolint:gomnd
}
if node.Allocatable.GPU > 0 {
record.Allocations.Requests.GPU = float64(record.Resources.Requests.GPU) / float64(node.Allocatable.GPU) * 100 //nolint:gomnd
record.Allocations.Limits.GPU = float64(record.Resources.Limits.GPU) / float64(node.Allocatable.GPU) * 100 //nolint:gomnd
}
if node.Allocatable.Storage > 0 {
record.Allocations.Requests.Storage = float64(record.Resources.Requests.Storage) / float64(node.Allocatable.Storage) * 100 //nolint:gomnd
record.Allocations.Limits.Storage = float64(record.Resources.Limits.Storage) / float64(node.Allocatable.Storage) * 100 //nolint:gomnd
}
if node.Allocatable.StorageEphemeral > 0 {
record.Allocations.Requests.StorageEphemeral = float64(record.Resources.Requests.StorageEphemeral) / float64(node.Allocatable.StorageEphemeral) * 100 //nolint:gomnd
}
// calculate pod's allocation limits as a percentage of node's allocatable resources
record.Allocations.Limits.CPU = float64(record.Resources.Limits.CPU) / float64(node.Allocatable.CPU) * 100 //nolint:gomnd
record.Allocations.Limits.Memory = float64(record.Resources.Limits.Memory) / float64(node.Allocatable.Memory) * 100 //nolint:gomnd
if node.Allocatable.GPU > 0 {
record.Allocations.Limits.GPU = float64(record.Resources.Limits.GPU) / float64(node.Allocatable.GPU) * 100 //nolint:gomnd
}
if node.Allocatable.Storage > 0 {
record.Allocations.Limits.Storage = float64(record.Resources.Limits.Storage) / float64(node.Allocatable.Storage) * 100 //nolint:gomnd
}
if node.Allocatable.StorageEphemeral > 0 {
record.Allocations.Limits.StorageEphemeral = float64(record.Resources.Limits.StorageEphemeral) / float64(node.Allocatable.StorageEphemeral) * 100 //nolint:gomnd
record.Allocations.Limits.StorageEphemeral = float64(record.Resources.Limits.StorageEphemeral) / float64(node.Allocatable.StorageEphemeral) * 100 //nolint:gomnd
}
}
return record
Expand Down

0 comments on commit 6307b58

Please sign in to comment.