Skip to content

Commit

Permalink
[v15] Scaling improvements for Athena event consumption and s3sessions (
Browse files Browse the repository at this point in the history
#42795)

* Avoid potential deadlock in athenaevents consumer

* Better HTTP client behavior in athena consumer

* Better HTTP client behavior in s3sessions

* Undo unintended change to the number of workers
  • Loading branch information
espadolini authored Jun 12, 2024
1 parent 85db324 commit e552d48
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
37 changes: 29 additions & 8 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"errors"
"fmt"
"io"
"net/http"
"slices"
"strconv"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
Expand All @@ -44,6 +46,7 @@ import (
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/defaults"
awsutils "github.com/gravitational/teleport/lib/utils/aws"
)

Expand Down Expand Up @@ -111,13 +114,26 @@ type s3downloader interface {
}

func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
sqsClient := sqs.NewFromConfig(*cfg.PublisherConsumerAWSConfig)
// aggressively reuse connections to avoid choking up on TLS handshakes (the
// default value for MaxIdleConnsPerHost is 2)
sqsHTTPClient := awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) {
t.MaxIdleConns = defaults.HTTPMaxIdleConns
t.MaxIdleConnsPerHost = defaults.HTTPMaxIdleConnsPerHost
})
sqsClient := sqs.NewFromConfig(*cfg.PublisherConsumerAWSConfig, func(o *sqs.Options) { o.HTTPClient = sqsHTTPClient })

s3HTTPClient := awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) {
t.MaxIdleConns = defaults.HTTPMaxIdleConns
t.MaxIdleConnsPerHost = defaults.HTTPMaxIdleConnsPerHost
})
publisherS3Client := s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig, func(o *s3.Options) { o.HTTPClient = s3HTTPClient })
storerS3Client := s3.NewFromConfig(*cfg.StorerQuerierAWSConfig, func(o *s3.Options) { o.HTTPClient = s3HTTPClient })

collectCfg := sqsCollectConfig{
sqsReceiver: sqsClient,
queueURL: cfg.QueueURL,
// TODO(nklaassen): use s3 manager from teleport observability.
payloadDownloader: manager.NewDownloader(s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig)),
payloadDownloader: manager.NewDownloader(publisherS3Client),
payloadBucket: cfg.largeEventsBucket,
visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()),
batchMaxItems: cfg.BatchMaxItems,
Expand Down Expand Up @@ -146,7 +162,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
queueURL: cfg.QueueURL,
perDateFileParquetWriter: func(ctx context.Context, date string) (io.WriteCloser, error) {
key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString())
fw, err := awsutils.NewS3V2FileWriter(ctx, s3.NewFromConfig(*cfg.StorerQuerierAWSConfig), cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
fw, err := awsutils.NewS3V2FileWriter(ctx, storerS3Client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
// ChecksumAlgorithm is required for putting objects when object lock is enabled.
poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256
})
Expand Down Expand Up @@ -429,9 +445,7 @@ func (s *sqsMessagesCollector) getEventsChan() <-chan eventAndAckID {
// It runs until context is canceled (via timeout) or when maxItems is reached.
// MaxItems is soft limit and can happen that it will return more items then MaxItems.
func (s *sqsMessagesCollector) fromSQS(ctx context.Context) {
// Errors should be immediately process by error handling loop, so 10 size
// should be enough to not cause blocking.
errorsC := make(chan error, 10)
errorsC := make(chan error)
defer close(errorsC)

// errhandle loop for receiving single event errors.
Expand Down Expand Up @@ -566,7 +580,11 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context,
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return collectedEventsMetadata{}
}
errorsC <- trace.Wrap(err)
select {
case errorsC <- trace.Wrap(err):
case <-ctx.Done():
return collectedEventsMetadata{}
}

// We don't want to retry receiving message immediately to prevent huge load
// on CPU if calls are contantly failing.
Expand All @@ -584,7 +602,10 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context,
for _, msg := range sqsOut.Messages {
event, err := s.auditEventFromSQSorS3(ctx, msg)
if err != nil {
errorsC <- trace.Wrap(err)
select {
case errorsC <- trace.Wrap(err):
case <-ctx.Done():
}
continue
}
eventsC <- eventAndAckID{
Expand Down
6 changes: 6 additions & 0 deletions lib/events/s3sessions/s3handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
s3metrics "github.com/gravitational/teleport/lib/observability/metrics/s3"
"github.com/gravitational/teleport/lib/session"
Expand Down Expand Up @@ -172,6 +173,11 @@ func (s *Config) CheckAndSetDefaults() error {
if s.Credentials != nil {
awsConfig.Credentials = s.Credentials
}
hc, err := defaults.HTTPClient()
if err != nil {
return trace.Wrap(err)
}
awsConfig.HTTPClient = hc

sess, err := awssession.NewSessionWithOptions(awssession.Options{
SharedConfigState: awssession.SharedConfigEnable,
Expand Down

0 comments on commit e552d48

Please sign in to comment.