Skip to content

Commit

Permalink
reduce fluentd connection churn during high event-handler load (#48909)…
Browse files Browse the repository at this point in the history
… (#49037)
  • Loading branch information
fspmarshall authored Nov 15, 2024
1 parent 4b54c19 commit 9506918
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 38 deletions.
11 changes: 11 additions & 0 deletions integrations/event-handler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type FluentdConfig struct {

// FluentdCA is a path to fluentd CA
FluentdCA string `help:"fluentd TLS CA file" type:"existingfile" env:"FDWRD_FLUENTD_CA"`

// FluentdMaxConnections caps the number of connections to fluentd. Defaults to a dynamic value
// calculated relative to app-level concurrency.
FluentdMaxConnections int `help:"Maximum number of connections to fluentd" env:"FDWRD_MAX_CONNECTIONS"`
}

// TeleportConfig is Teleport instance configuration
Expand Down Expand Up @@ -238,13 +242,19 @@ func (c *StartCmdConfig) Validate() error {
c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw)
c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw)

if c.FluentdMaxConnections < 1 {
// 2x concurrency is effectively uncapped.
c.FluentdMaxConnections = c.Concurrency * 2
}

return nil
}

// Dump dumps configuration values to the log
func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) {
// Log configuration variables
log.InfoContext(ctx, "Using batch size", "batch", c.BatchSize)
log.InfoContext(ctx, "Using concurrency", "concurrency", c.Concurrency)
log.InfoContext(ctx, "Using type filter", "types", c.Types)
log.InfoContext(ctx, "Using type exclude filter", "skip_event_types", c.SkipEventTypes)
log.InfoContext(ctx, "Skipping session events of type", "types", c.SkipSessionTypes)
Expand All @@ -255,6 +265,7 @@ func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) {
log.InfoContext(ctx, "Using Fluentd ca", "ca", c.FluentdCA)
log.InfoContext(ctx, "Using Fluentd cert", "cert", c.FluentdCert)
log.InfoContext(ctx, "Using Fluentd key", "key", c.FluentdKey)
log.InfoContext(ctx, "Using Fluentd max connections", "max_connections", c.FluentdMaxConnections)
log.InfoContext(ctx, "Using window size", "window_size", c.WindowSize)

if c.TeleportIdentityFile != "" {
Expand Down
33 changes: 18 additions & 15 deletions integrations/event-handler/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: false,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -83,11 +84,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -121,11 +123,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: filepath.Join(wd, "testdata", "fake-file"),
FluentdKey: filepath.Join(wd, "testdata", "fake-file"),
FluentdCA: filepath.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down
7 changes: 4 additions & 3 deletions integrations/event-handler/fake_fluentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (f *FakeFluentd) createServer() error {
// GetClientConfig returns FlientdConfig to connect to this fake fluentd server instance
func (f *FakeFluentd) GetClientConfig() FluentdConfig {
return FluentdConfig{
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdMaxConnections: 3,
}
}

Expand Down
35 changes: 27 additions & 8 deletions integrations/event-handler/fluentd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/gravitational/trace"
"golang.org/x/net/http2"

tlib "github.com/gravitational/teleport/integrations/lib"
)
Expand All @@ -41,6 +42,7 @@ type FluentdClient struct {
// client HTTP client to send requests
client *http.Client
log *slog.Logger
sem chan struct{}
}

// NewFluentdClient creates new FluentdClient
Expand All @@ -56,22 +58,34 @@ func NewFluentdClient(c *FluentdConfig, log *slog.Logger) (*FluentdClient, error
return nil, trace.BadParameter("both fluentd_cert and fluentd_key should be specified")
}

if c.FluentdMaxConnections <= 0 {
return nil, trace.BadParameter("fluentd_max_connections should be greater than 0")
}

ca, err := getCertPool(c)
if err != nil {
return nil, trace.Wrap(err)
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
transport := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
Timeout: httpTimeout,
MaxIdleConnsPerHost: c.FluentdMaxConnections,
IdleConnTimeout: httpTimeout,
}

if err := http2.ConfigureTransport(transport); err != nil {
return nil, trace.Wrap(err)
}

return &FluentdClient{client: client, log: log}, nil
client := &http.Client{
Transport: transport,
Timeout: httpTimeout,
}

return &FluentdClient{client: client, log: log, sem: make(chan struct{}, c.FluentdMaxConnections)}, nil
}

// getCertPool reads CA certificate and returns CA cert pool if passed
Expand All @@ -92,6 +106,11 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) {

// Send sends event to fluentd
func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error {
f.sem <- struct{}{}
defer func() {
<-f.sem
}()

f.log.DebugContext(ctx, "Sending event to Fluentd", "payload", string(b))

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
Expand Down
8 changes: 4 additions & 4 deletions integrations/event-handler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/peterbourgon/diskv/v3 v3.0.1
github.com/sethvargo/go-limiter v1.0.0
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.30.0
golang.org/x/time v0.6.0
google.golang.org/protobuf v1.35.1
)
Expand Down Expand Up @@ -280,14 +281,13 @@ require (
go.opentelemetry.io/otel/trace v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/api v0.197.0 // indirect
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
16 changes: 8 additions & 8 deletions integrations/event-handler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1650,8 +1650,8 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1777,8 +1777,8 @@ golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1920,8 +1920,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1938,8 +1938,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down

0 comments on commit 9506918

Please sign in to comment.