Skip to content

Commit caf76b0

Browse files
authored
[Release 17.0]: Online DDL: timeouts for all gRPC calls (#14182) (#14190)
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
1 parent 063dc77 commit caf76b0

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

go/vt/vttablet/onlineddl/executor.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ const (
125125
readyToCompleteHint = "ready_to_complete"
126126
databasePoolSize = 3
127127
qrBufferExtraTimeout = 5 * time.Second
128+
grpcTimeout = 30 * time.Second
128129
vreplicationTestSuiteWaitSeconds = 5
129130
)
130131

@@ -737,9 +738,6 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos mysql.Position, err
737738

738739
// terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
739740
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error {
740-
tmClient := e.tabletManagerClient()
741-
defer tmClient.Close()
742-
743741
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
744742
if err != nil {
745743
return err
@@ -918,11 +916,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
918916

919917
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, timeout, bufferQueries)
920918
if !bufferQueries {
919+
grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
920+
defer cancel()
921921
// called after new table is in place.
922922
// unbuffer existing queries:
923923
bufferingContextCancel()
924924
// force re-read of tables
925-
if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil {
925+
if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil {
926926
return err
927927
}
928928
}
@@ -3754,7 +3754,10 @@ func (e *Executor) vreplicationExec(ctx context.Context, tablet *topodatapb.Tabl
37543754
tmClient := e.tabletManagerClient()
37553755
defer tmClient.Close()
37563756

3757-
return tmClient.VReplicationExec(ctx, tablet, query)
3757+
grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
3758+
defer cancel()
3759+
3760+
return tmClient.VReplicationExec(grpcCtx, tablet, query)
37583761
}
37593762

37603763
// reloadSchema issues a ReloadSchema on this tablet
@@ -3766,7 +3769,11 @@ func (e *Executor) reloadSchema(ctx context.Context) error {
37663769
if err != nil {
37673770
return err
37683771
}
3769-
return tmClient.ReloadSchema(ctx, tablet.Tablet, "")
3772+
3773+
grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
3774+
defer cancel()
3775+
3776+
return tmClient.ReloadSchema(grpcCtx, tablet.Tablet, "")
37703777
}
37713778

37723779
// deleteVReplicationEntry cleans up a _vt.vreplication entry; this function is called as part of

0 commit comments

Comments
 (0)