Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move concurrent connection dial limit out of healthcheck. #16378

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/mysqlctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Flags:
--db_tls_min_version string Configures the minimal TLS version negotiated when SSL is enabled. Defaults to TLSv1.2. Options: TLSv1.0, TLSv1.1, TLSv1.2, TLSv1.3.
--dba_idle_timeout duration Idle timeout for dba connections (default 1m0s)
--dba_pool_size int Size of the connection pool for dba connections (default 20)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Flags:
--file_backup_storage_root string Root directory for the file backup storage.
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
1 change: 0 additions & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ Flags:
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--health_check_interval duration Interval between health checks (default 20s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
--heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctlclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Usage of vtctlclient:
--config-type string Config file type (omit to infer config type from file extension).
--datadog-agent-host string host to send spans to. if empty, no tracing will be done
--datadog-agent-port string port to send spans to. if empty, no tracing will be done
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Flags:
--file_backup_storage_root string Root directory for the file backup storage.
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down Expand Up @@ -85,7 +86,6 @@ Flags:
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
-h, --help help for vtctld
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Flags:
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
Expand Down Expand Up @@ -96,7 +97,6 @@ Flags:
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
-h, --help help for vtgate
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgateclienttest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Flags:
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--emit_stats If set, emit stats to push-based monitoring and stats backends
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Flags:
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--gh-ost-path string override default gh-ost binary full path (default "gh-ost")
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Flags:
--external_topo_implementation string the topology implementation to use for vtcombo process
--extra_my_cnf string extra files to add to the config, separated by ':'
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
10 changes: 1 addition & 9 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/google/safehtml/template"
"github.com/google/safehtml/template/uncheckedconversions"
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
Expand Down Expand Up @@ -92,9 +91,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand Down Expand Up @@ -177,7 +173,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -297,8 +292,6 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -333,7 +326,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -844,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(ctx, hc), nil
return thc.Connection(ctx), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
37 changes: 6 additions & 31 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
Expand All @@ -34,16 +33,12 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var withDialerContextOnce sync.Once

// tabletHealthCheck maintains the health status of a tablet. A map of this
// structure is maintained in HealthCheck.
type tabletHealthCheck struct {
Expand Down Expand Up @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx, hc)
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c
return err
}

func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked(ctx, hc)
}

func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
return thc.connectionLocked(ctx)
}

func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService {
if thc.Conn == nil {
withDialerContextOnce.Do(func() {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
})
conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
Loading
Loading