Skip to content

Commit

Permalink
slack-vitess-r14.0.5: sideport vitessio#15053 to limit vtgate conns
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Feb 12, 2024
1 parent 79ce741 commit 9ca4b87
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ Usage of vtgate:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_use_effective_callerid
If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int
Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration
health check retry delay (default 2ms)
--healthcheck_timeout duration
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ Usage of vttablet:
gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--health_check_interval duration
Interval between health checks (default 20s)
--healthcheck-dial-concurrency int
Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--heartbeat_enable
If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.
--heartbeat_interval duration
Expand Down
8 changes: 7 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -81,6 +82,8 @@ var (
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency = flag.Int("healthcheck-dial-concurrency", 1024, "Maxiumum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -260,6 +263,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// healthCheckDialSem is used to limit how many healthchecks initiate in parallel.
healthCheckDialSem *sync2.Semaphore
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -294,6 +299,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: sync2.NewSemaphore(*healthCheckDialConcurrency, 0),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -780,7 +786,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(), nil
return thc.Connection(hc), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
30 changes: 24 additions & 6 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
Expand All @@ -34,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -123,8 +125,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection()
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(hc)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -137,14 +139,30 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S
return err
}

func (thc *tabletHealthCheck) Connection() queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked()
return thc.connectionLocked(hc)
}

func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService {
func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
hc.healthCheckDialSem.Acquire()
defer hc.healthCheckDialSem.Release()
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
}

func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService {
if thc.Conn == nil {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -273,7 +291,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"crypto/tls"
"flag"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -39,6 +40,7 @@ import (
)

var (
grpcDialOptionsMu sync.Mutex
keepaliveTime = flag.Duration("grpc_keepalive_time", 10*time.Second, "After a duration of this time, if the client doesn't see any activity, it pings the server to see if the transport is still alive.")
keepaliveTimeout = flag.Duration("grpc_keepalive_timeout", 10*time.Second, "After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that the connection is closed.")
initialConnWindowSize = flag.Int("grpc_initial_conn_window_size", 0, "gRPC initial connection window size")
Expand All @@ -53,6 +55,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error)

// RegisterGRPCDialOptions registers an implementation of AuthServer.
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) {
grpcDialOptionsMu.Lock()
defer grpcDialOptionsMu.Unlock()
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
}

Expand Down Expand Up @@ -101,12 +105,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...

newopts = append(newopts, opts...)
var err error
grpcDialOptionsMu.Lock()
for _, grpcDialOptionInitializer := range grpcDialOptions {
newopts, err = grpcDialOptionInitializer(newopts)
if err != nil {
log.Fatalf("There was an error initializing client grpc.DialOption: %v", err)
}
}
grpcDialOptionsMu.Unlock()

newopts = append(newopts, interceptors()...)

Expand Down

0 comments on commit 9ca4b87

Please sign in to comment.