From a5316d74617f5bf75341bed234688555df5cab68 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 11 Feb 2024 01:50:10 +0100 Subject: [PATCH] Limit concurrent creation of healthcheck gRPC connections Signed-off-by: Tim Vaillancourt --- go/flags/endtoend/vtcombo.txt | 1 + go/flags/endtoend/vtctld.txt | 1 + go/flags/endtoend/vtgate.txt | 1 + go/vt/discovery/healthcheck.go | 10 +++++++- go/vt/discovery/tablet_health_check.go | 32 +++++++++++++++++++++----- go/vt/grpcclient/client.go | 6 +++++ 6 files changed, 44 insertions(+), 7 deletions(-) diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 1712f2181f1..dff912b9e6a 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -167,6 +167,7 @@ Flags: --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. --health_check_interval duration Interval between health checks (default 20s) + --healthcheck-dial-concurrency int Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks. diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index 32102ed8e4f..fdeae1c5776 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -84,6 +84,7 @@ Flags: --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) + --healthcheck-dial-concurrency int Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) -h, --help help for vtctld --jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done --keep_logs duration keep logs for this long (using ctime) (zero to keep forever) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 7585c71be6f..d9da4311b31 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -96,6 +96,7 @@ Flags: --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck-dial-concurrency int Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) -h, --help help for vtgate diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5d6a5e32662..6e35f367f4c 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -46,6 +46,7 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" @@ -87,6 +88,9 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true + // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. + healthCheckDialConcurrency int64 = 1024 + // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -168,6 +172,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") + fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -287,6 +292,8 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} + // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. + healthCheckDialSem *semaphore.Weighted } // NewHealthCheck creates a new HealthCheck object. @@ -321,6 +328,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -850,7 +858,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(), nil + return thc.Connection(hc), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 24496155e74..28c42e5f49e 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,6 +19,7 @@ package discovery import ( "context" "fmt" + "net" "strings" "sync" "sync/atomic" @@ -33,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" @@ -122,8 +124,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection() +func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(hc) if conn == nil { // This signals the caller to retry return nil @@ -136,14 +138,32 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S return err } -func (thc *tabletHealthCheck) Connection() queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked() + return thc.connectionLocked(hc) } -func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService { +func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { + return func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer hc.healthCheckDialSem.Release(1) + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + } +} + +func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService { if thc.Conn == nil { + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil + }) conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -272,7 +292,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b2ef0d4fb28..7524298514e 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,6 +21,7 @@ package grpcclient import ( "context" "crypto/tls" + "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -39,6 +40,7 @@ import ( ) var ( + grpcDialOptionsMu sync.Mutex keepaliveTime = 10 * time.Second keepaliveTimeout = 10 * time.Second initialConnWindowSize int @@ -86,6 +88,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error) // RegisterGRPCDialOptions registers an implementation of AuthServer. func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) { + grpcDialOptionsMu.Lock() + defer grpcDialOptionsMu.Unlock() grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc) } @@ -134,12 +138,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, opts...) var err error + grpcDialOptionsMu.Lock() for _, grpcDialOptionInitializer := range grpcDialOptions { newopts, err = grpcDialOptionInitializer(newopts) if err != nil { log.Fatalf("There was an error initializing client grpc.DialOption: %v", err) } } + grpcDialOptionsMu.Unlock() newopts = append(newopts, interceptors()...)