Skip to content

Commit f4debb9

Browse files
Limit concurrent creation of healthcheck gRPC connections (#15053)
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent 059e50d commit f4debb9

File tree

7 files changed

+54
-7
lines changed

7 files changed

+54
-7
lines changed

changelog/20.0/20.0.0/summary.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- [Delete with Subquery Support](#delete-subquery)
1313
- **[Flag changes](#flag-changes)**
1414
- [`pprof-http` default change](#pprof-http-default)
15+
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
1516
- **[Minor Changes](#minor-changes)**
1617
- **[New Stats](#new-stats)**
1718
- [VTTablet Query Cache Hits and Misses](#vttablet-query-cache-hits-and-misses)
@@ -72,6 +73,10 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h
7273
This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out.
7374
To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components.
7475

76+
#### <a id="healthcheck-dial-concurrency-flag"/>New `--healthcheck-dial-concurrency` flag
77+
78+
The new `--healthcheck-dial-concurrency` flag defines the maximum number of healthcheck connections that can open concurrently. This limit is to avoid hitting Go runtime panics on deployments watching enough tablets [to hit the runtime's maximum thread limit of `10000`](https://pkg.go.dev/runtime/debug#SetMaxThreads) due to blocking network syscalls. This flag applies to `vtcombo`, `vtctld` and `vtgate` only and a value less than the runtime max thread limit _(`10000`)_ is recommended.
79+
7580
## <a id="minor-changes"/>Minor Changes
7681

7782
### <a id="new-stats"/>New Stats

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ Flags:
167167
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
168168
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
169169
--health_check_interval duration Interval between health checks (default 20s)
170+
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
170171
--healthcheck_retry_delay duration health check retry delay (default 2ms)
171172
--healthcheck_timeout duration the health check timeout period (default 1m0s)
172173
--heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks.

go/flags/endtoend/vtctld.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ Flags:
8484
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
8585
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
8686
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
87+
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
8788
-h, --help help for vtctld
8889
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
8990
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)

go/flags/endtoend/vtgate.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ Flags:
9696
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
9797
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
9898
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
99+
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
99100
--healthcheck_retry_delay duration health check retry delay (default 2ms)
100101
--healthcheck_timeout duration the health check timeout period (default 1m0s)
101102
-h, --help help for vtgate

go/vt/discovery/healthcheck.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/google/safehtml/template"
4747
"github.com/google/safehtml/template/uncheckedconversions"
4848
"github.com/spf13/pflag"
49+
"golang.org/x/sync/semaphore"
4950

5051
"vitess.io/vitess/go/netutil"
5152
"vitess.io/vitess/go/stats"
@@ -87,6 +88,9 @@ var (
8788
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
8889
refreshKnownTablets = true
8990

91+
// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
92+
healthCheckDialConcurrency int64 = 1024
93+
9094
// How much to sleep between each check.
9195
waitAvailableTabletInterval = 100 * time.Millisecond
9296

@@ -168,6 +172,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
168172
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
169173
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
170174
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
175+
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
171176
ParseTabletURLTemplateFromFlag()
172177
}
173178

@@ -287,6 +292,8 @@ type HealthCheckImpl struct {
287292
subscribers map[chan *TabletHealth]struct{}
288293
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
289294
loadTabletsTrigger chan struct{}
295+
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
296+
healthCheckDialSem *semaphore.Weighted
290297
}
291298

292299
// NewHealthCheck creates a new HealthCheck object.
@@ -321,6 +328,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
321328
cell: localCell,
322329
retryDelay: retryDelay,
323330
healthCheckTimeout: healthCheckTimeout,
331+
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
324332
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
325333
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
326334
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
@@ -828,7 +836,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target
828836
// TODO: test that throws this error
829837
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
830838
}
831-
return thc.Connection(), nil
839+
return thc.Connection(hc), nil
832840
}
833841

834842
// getAliasByCell should only be called while holding hc.mu

go/vt/discovery/tablet_health_check.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package discovery
1919
import (
2020
"context"
2121
"fmt"
22+
"net"
2223
"strings"
2324
"sync"
2425
"sync/atomic"
@@ -33,12 +34,16 @@ import (
3334
"vitess.io/vitess/go/vt/vttablet/queryservice"
3435
"vitess.io/vitess/go/vt/vttablet/tabletconn"
3536

37+
"google.golang.org/grpc"
3638
"google.golang.org/protobuf/proto"
3739

3840
"vitess.io/vitess/go/vt/proto/query"
3941
"vitess.io/vitess/go/vt/proto/topodata"
4042
)
4143

44+
// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
45+
var withDialerContextOnce sync.Once
46+
4247
// tabletHealthCheck maintains the health status of a tablet. A map of this
4348
// structure is maintained in HealthCheck.
4449
type tabletHealthCheck struct {
@@ -122,8 +127,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
122127
}
123128

124129
// stream streams healthcheck responses to callback.
125-
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
126-
conn := thc.Connection()
130+
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
131+
conn := thc.Connection(hc)
127132
if conn == nil {
128133
// This signals the caller to retry
129134
return nil
@@ -136,14 +141,34 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S
136141
return err
137142
}
138143

139-
func (thc *tabletHealthCheck) Connection() queryservice.QueryService {
144+
func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService {
140145
thc.connMu.Lock()
141146
defer thc.connMu.Unlock()
142-
return thc.connectionLocked()
147+
return thc.connectionLocked(hc)
148+
}
149+
150+
func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
151+
return func(ctx context.Context, addr string) (net.Conn, error) {
152+
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
153+
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
154+
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
155+
// the panic: 'runtime: program exceeds 10000-thread limit'.
156+
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
157+
return nil, err
158+
}
159+
defer hc.healthCheckDialSem.Release(1)
160+
var dialer net.Dialer
161+
return dialer.DialContext(ctx, "tcp", addr)
162+
}
143163
}
144164

145-
func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService {
165+
func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService {
146166
if thc.Conn == nil {
167+
withDialerContextOnce.Do(func() {
168+
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
169+
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
170+
})
171+
})
147172
conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true))
148173
if err != nil {
149174
thc.LastError = err
@@ -272,7 +297,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
272297
}()
273298

274299
// Read stream health responses.
275-
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
300+
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
276301
// We received a message. Reset the back-off.
277302
retryDelay = hc.retryDelay
278303
// Don't block on send to avoid deadlocks.

go/vt/grpcclient/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package grpcclient
2121
import (
2222
"context"
2323
"crypto/tls"
24+
"sync"
2425
"time"
2526

2627
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -39,6 +40,7 @@ import (
3940
)
4041

4142
var (
43+
grpcDialOptionsMu sync.Mutex
4244
keepaliveTime = 10 * time.Second
4345
keepaliveTimeout = 10 * time.Second
4446
initialConnWindowSize int
@@ -86,6 +88,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error)
8688

8789
// RegisterGRPCDialOptions registers an implementation of AuthServer.
8890
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) {
91+
grpcDialOptionsMu.Lock()
92+
defer grpcDialOptionsMu.Unlock()
8993
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
9094
}
9195

@@ -134,12 +138,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...
134138

135139
newopts = append(newopts, opts...)
136140
var err error
141+
grpcDialOptionsMu.Lock()
137142
for _, grpcDialOptionInitializer := range grpcDialOptions {
138143
newopts, err = grpcDialOptionInitializer(newopts)
139144
if err != nil {
140145
log.Fatalf("There was an error initializing client grpc.DialOption: %v", err)
141146
}
142147
}
148+
grpcDialOptionsMu.Unlock()
143149

144150
newopts = append(newopts, interceptors()...)
145151

0 commit comments

Comments
 (0)