diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index ba0339aa365b7..46d0d31dfb583 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -24,12 +24,14 @@ import ( "errors" "fmt" "io" + "net/http" "slices" "strconv" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types" @@ -44,6 +46,7 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/defaults" awsutils "github.com/gravitational/teleport/lib/utils/aws" ) @@ -111,13 +114,26 @@ type s3downloader interface { } func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { - sqsClient := sqs.NewFromConfig(*cfg.PublisherConsumerAWSConfig) + // aggressively reuse connections to avoid choking up on TLS handshakes (the + // default value for MaxIdleConnsPerHost is 2) + sqsHTTPClient := awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) { + t.MaxIdleConns = defaults.HTTPMaxIdleConns + t.MaxIdleConnsPerHost = defaults.HTTPMaxIdleConnsPerHost + }) + sqsClient := sqs.NewFromConfig(*cfg.PublisherConsumerAWSConfig, func(o *sqs.Options) { o.HTTPClient = sqsHTTPClient }) + + s3HTTPClient := awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) { + t.MaxIdleConns = defaults.HTTPMaxIdleConns + t.MaxIdleConnsPerHost = defaults.HTTPMaxIdleConnsPerHost + }) + publisherS3Client := s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig, func(o *s3.Options) { o.HTTPClient = s3HTTPClient }) + storerS3Client := s3.NewFromConfig(*cfg.StorerQuerierAWSConfig, func(o *s3.Options) { o.HTTPClient = s3HTTPClient }) collectCfg := sqsCollectConfig{ sqsReceiver: sqsClient, queueURL: cfg.QueueURL, // TODO(nklaassen): use s3 manager from teleport observability. - payloadDownloader: manager.NewDownloader(s3.NewFromConfig(*cfg.PublisherConsumerAWSConfig)), + payloadDownloader: manager.NewDownloader(publisherS3Client), payloadBucket: cfg.largeEventsBucket, visibilityTimeout: int32(cfg.BatchMaxInterval.Seconds()), batchMaxItems: cfg.BatchMaxItems, @@ -146,7 +162,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { queueURL: cfg.QueueURL, perDateFileParquetWriter: func(ctx context.Context, date string) (io.WriteCloser, error) { key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString()) - fw, err := awsutils.NewS3V2FileWriter(ctx, s3.NewFromConfig(*cfg.StorerQuerierAWSConfig), cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) { + fw, err := awsutils.NewS3V2FileWriter(ctx, storerS3Client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) { // ChecksumAlgorithm is required for putting objects when object lock is enabled. poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256 }) @@ -429,9 +445,7 @@ func (s *sqsMessagesCollector) getEventsChan() <-chan eventAndAckID { // It runs until context is canceled (via timeout) or when maxItems is reached. // MaxItems is soft limit and can happen that it will return more items then MaxItems. func (s *sqsMessagesCollector) fromSQS(ctx context.Context) { - // Errors should be immediately process by error handling loop, so 10 size - // should be enough to not cause blocking. - errorsC := make(chan error, 10) + errorsC := make(chan error) defer close(errorsC) // errhandle loop for receiving single event errors. @@ -566,7 +580,11 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return collectedEventsMetadata{} } - errorsC <- trace.Wrap(err) + select { + case errorsC <- trace.Wrap(err): + case <-ctx.Done(): + return collectedEventsMetadata{} + } // We don't want to retry receiving message immediately to prevent huge load // on CPU if calls are contantly failing. @@ -584,7 +602,10 @@ func (s *sqsMessagesCollector) receiveMessagesAndSendOnChan(ctx context.Context, for _, msg := range sqsOut.Messages { event, err := s.auditEventFromSQSorS3(ctx, msg) if err != nil { - errorsC <- trace.Wrap(err) + select { + case errorsC <- trace.Wrap(err): + case <-ctx.Done(): + } continue } eventsC <- eventAndAckID{ diff --git a/lib/events/s3sessions/s3handler.go b/lib/events/s3sessions/s3handler.go index 089eb2a5e4691..a83d30b890de9 100644 --- a/lib/events/s3sessions/s3handler.go +++ b/lib/events/s3sessions/s3handler.go @@ -41,6 +41,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" s3metrics "github.com/gravitational/teleport/lib/observability/metrics/s3" "github.com/gravitational/teleport/lib/session" @@ -172,6 +173,11 @@ func (s *Config) CheckAndSetDefaults() error { if s.Credentials != nil { awsConfig.Credentials = s.Credentials } + hc, err := defaults.HTTPClient() + if err != nil { + return trace.Wrap(err) + } + awsConfig.HTTPClient = hc sess, err := awssession.NewSessionWithOptions(awssession.Options{ SharedConfigState: awssession.SharedConfigEnable,