Skip to content

Commit 07858c2

Browse files
authored
Revert GRPC context changes (#15780)
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
1 parent 8844aba commit 07858c2

File tree

33 files changed

+94
-71
lines changed

33 files changed

+94
-71
lines changed

examples/local/vstream_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func main() {
6767
Filter: "select * from customer",
6868
}},
6969
}
70-
conn, err := vtgateconn.Dial("localhost:15991")
70+
conn, err := vtgateconn.Dial(ctx, "localhost:15991")
7171
if err != nil {
7272
log.Fatal(err)
7373
}

go/test/endtoend/cluster/cluster_process.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin
920920
return nil, err
921921
}
922922

923-
conn, err := vtgateconn.Dial(addr)
923+
conn, err := vtgateconn.Dial(ctx, addr)
924924
if err != nil {
925925
return nil, err
926926
}

go/test/endtoend/cluster/cluster_util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,13 +482,13 @@ func WaitForHealthyShard(vtctldclient *VtctldClientProcess, keyspace, shard stri
482482
}
483483

484484
// DialVTGate returns a VTGate grpc connection.
485-
func DialVTGate(name, addr, username, password string) (*vtgateconn.VTGateConn, error) {
485+
func DialVTGate(ctx context.Context, name, addr, username, password string) (*vtgateconn.VTGateConn, error) {
486486
clientCreds := &grpcclient.StaticAuthClientCreds{Username: username, Password: password}
487487
creds := grpc.WithPerRPCCredentials(clientCreds)
488488
dialerFunc := grpcvtgateconn.Dial(creds)
489489
dialerName := name
490490
vtgateconn.RegisterDialer(dialerName, dialerFunc)
491-
return vtgateconn.DialProtocol(dialerName, addr)
491+
return vtgateconn.DialProtocol(ctx, dialerName, addr)
492492
}
493493

494494
// PrintFiles prints the files that are asked for. If no file is specified, all the files are printed.

go/test/endtoend/messaging/message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ func VtgateGrpcConn(ctx context.Context, cluster *cluster.LocalProcessCluster) (
573573
stream := new(VTGateStream)
574574
stream.ctx = ctx
575575
stream.host = fmt.Sprintf("%s:%d", cluster.Hostname, cluster.VtgateProcess.GrpcPort)
576-
conn, err := vtgateconn.Dial(stream.host)
576+
conn, err := vtgateconn.Dial(ctx, stream.host)
577577
// init components
578578
stream.respChan = make(chan *sqltypes.Result)
579579
stream.VTGateConn = conn

go/test/endtoend/recovery/unshardedrecovery/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func TestRecoveryImpl(t *testing.T) {
308308

309309
// Build vtgate grpc connection
310310
grpcAddress := fmt.Sprintf("%s:%d", localCluster.Hostname, localCluster.VtgateGrpcPort)
311-
vtgateConn, err := vtgateconn.Dial(grpcAddress)
311+
vtgateConn, err := vtgateconn.Dial(context.Background(), grpcAddress)
312312
assert.NoError(t, err)
313313
defer vtgateConn.Close()
314314
session := vtgateConn.Session("@replica", nil)

go/test/endtoend/tabletgateway/vtgate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ func TestStreamingRPCStuck(t *testing.T) {
302302
}
303303

304304
// Connect to vtgate and run a streaming query.
305-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "test_user", "")
305+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "")
306306
require.NoError(t, err)
307307
stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{})
308308
require.NoError(t, err)

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ func testVStreamCellFlag(t *testing.T) {
513513

514514
for _, tc := range vstreamTestCases {
515515
t.Run("VStreamCellsFlag/"+tc.cells, func(t *testing.T) {
516-
conn, err := vtgateconn.Dial(fmt.Sprintf("localhost:%d", vc.ClusterConfig.vtgateGrpcPort))
516+
conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("localhost:%d", vc.ClusterConfig.vtgateGrpcPort))
517517
require.NoError(t, err)
518518
defer conn.Close()
519519

go/test/endtoend/vreplication/vschema_load_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestVSchemaChangesUnderLoad(t *testing.T) {
9494
Filter: "select * from customer",
9595
}},
9696
}
97-
conn, err := vtgateconn.Dial(net.JoinHostPort("localhost", strconv.Itoa(vc.ClusterConfig.vtgateGrpcPort)))
97+
conn, err := vtgateconn.Dial(ctx, net.JoinHostPort("localhost", strconv.Itoa(vc.ClusterConfig.vtgateGrpcPort)))
9898
require.NoError(t, err)
9999
defer conn.Close()
100100

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
5858
testVStreamFrom(t, vtgate, "product", 2)
5959
})
6060
ctx := context.Background()
61-
vstreamConn, err := vtgateconn.Dial(fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
61+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
6262
if err != nil {
6363
log.Fatal(err)
6464
}
@@ -259,7 +259,7 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
259259
vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
260260

261261
ctx := context.Background()
262-
vstreamConn, err := vtgateconn.Dial(fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
262+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
263263
if err != nil {
264264
log.Fatal(err)
265265
}
@@ -398,7 +398,7 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
398398
require.NoError(t, err)
399399

400400
ctx := context.Background()
401-
vstreamConn, err := vtgateconn.Dial(fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
401+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
402402
if err != nil {
403403
log.Fatal(err)
404404
}
@@ -550,7 +550,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
550550
defer vtgateConn.Close()
551551
verifyClusterHealth(t, vc)
552552

553-
vstreamConn, err := vtgateconn.Dial(fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
553+
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
554554
require.NoError(t, err)
555555
defer vstreamConn.Close()
556556

go/test/endtoend/vtcombo/recreate/recreate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestMain(m *testing.M) {
9595

9696
func TestDropAndRecreateWithSameShards(t *testing.T) {
9797
ctx := context.Background()
98-
conn, err := vtgateconn.Dial(grpcAddress)
98+
conn, err := vtgateconn.Dial(ctx, grpcAddress)
9999
require.Nil(t, err)
100100
defer conn.Close()
101101

go/test/endtoend/vtcombo/vttest_sample_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func TestStandalone(t *testing.T) {
131131
require.Contains(t, tmp[0], "vtcombo")
132132

133133
ctx := context.Background()
134-
conn, err := vtgateconn.Dial(grpcAddress)
134+
conn, err := vtgateconn.Dial(ctx, grpcAddress)
135135
require.NoError(t, err)
136136
defer conn.Close()
137137

go/test/endtoend/vtgate/foreignkey/fk_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func TestUpdateWithFK(t *testing.T) {
182182

183183
// TestVstreamForFKBinLog tests that dml queries with fks are written with child row first approach in the binary logs.
184184
func TestVstreamForFKBinLog(t *testing.T) {
185-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "fk_user", "")
185+
vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "")
186186
require.NoError(t, err)
187187
defer vtgateConn.Close()
188188

go/test/endtoend/vtgate/grpc_api/acl_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestEffectiveCallerIDWithAccess(t *testing.T) {
3232
ctx, cancel := context.WithCancel(context.Background())
3333
defer cancel()
3434

35-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "some_other_user", "test_password")
35+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "some_other_user", "test_password")
3636
require.NoError(t, err)
3737
defer vtgateConn.Close()
3838

@@ -48,7 +48,7 @@ func TestEffectiveCallerIDWithNoAccess(t *testing.T) {
4848
ctx, cancel := context.WithCancel(context.Background())
4949
defer cancel()
5050

51-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "another_unrelated_user", "test_password")
51+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "another_unrelated_user", "test_password")
5252
require.NoError(t, err)
5353
defer vtgateConn.Close()
5454

@@ -66,7 +66,7 @@ func TestAuthenticatedUserWithAccess(t *testing.T) {
6666
ctx, cancel := context.WithCancel(context.Background())
6767
defer cancel()
6868

69-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
69+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
7070
require.NoError(t, err)
7171
defer vtgateConn.Close()
7272

@@ -81,7 +81,7 @@ func TestAuthenticatedUserNoAccess(t *testing.T) {
8181
ctx, cancel := context.WithCancel(context.Background())
8282
defer cancel()
8383

84-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "user_no_access", "test_password")
84+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_no_access", "test_password")
8585
require.NoError(t, err)
8686
defer vtgateConn.Close()
8787

@@ -98,7 +98,7 @@ func TestUnauthenticatedUser(t *testing.T) {
9898
ctx, cancel := context.WithCancel(context.Background())
9999
defer cancel()
100100

101-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "", "")
101+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "", "")
102102
require.NoError(t, err)
103103
defer vtgateConn.Close()
104104

go/test/endtoend/vtgate/grpc_api/execute_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestTransactionsWithGRPCAPI(t *testing.T) {
3838
ctx, cancel := context.WithCancel(context.Background())
3939
defer cancel()
4040

41-
vtgateConn, err := cluster.DialVTGate(t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
41+
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "user_with_access", "test_password")
4242
require.NoError(t, err)
4343
defer vtgateConn.Close()
4444

go/test/endtoend/vtgate/queries/reference/main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestMain(m *testing.M) {
156156
go func() {
157157
ctx := context.Background()
158158
vtgateAddr := fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateProcess.GrpcPort)
159-
vtgateConn, err := vtgateconn.Dial(vtgateAddr)
159+
vtgateConn, err := vtgateconn.Dial(ctx, vtgateAddr)
160160
if err != nil {
161161
done <- false
162162
return
@@ -234,7 +234,7 @@ func TestMain(m *testing.M) {
234234

235235
ctx := context.Background()
236236
vtgateAddr := fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateProcess.GrpcPort)
237-
vtgateConn, err := vtgateconn.Dial(vtgateAddr)
237+
vtgateConn, err := vtgateconn.Dial(ctx, vtgateAddr)
238238
if err != nil {
239239
return 1
240240
}

go/vt/grpcclient/client.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License.
1919
package grpcclient
2020

2121
import (
22+
"context"
2223
"crypto/tls"
2324
"sync"
2425
"time"
@@ -96,6 +97,16 @@ func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([
9697
// failFast is a non-optional parameter because callers are required to specify
9798
// what that should be.
9899
func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
100+
return DialContext(context.Background(), target, failFast, opts...)
101+
}
102+
103+
// DialContext creates a grpc connection to the given target. Setup steps are
104+
// covered by the context deadline, and, if WithBlock is specified in the dial
105+
// options, connection establishment steps are covered by the context as well.
106+
//
107+
// failFast is a non-optional parameter because callers are required to specify
108+
// what that should be.
109+
func DialContext(ctx context.Context, target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
99110
msgSize := grpccommon.MaxMessageSize()
100111
newopts := []grpc.DialOption{
101112
grpc.WithDefaultCallOptions(
@@ -138,7 +149,7 @@ func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.Clie
138149

139150
newopts = append(newopts, interceptors()...)
140151

141-
return grpc.Dial(target, newopts...)
152+
return grpc.DialContext(ctx, target, newopts...)
142153
}
143154

144155
func interceptors() []grpc.DialOption {

go/vt/grpcoptionaltls/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestOptionalTLS(t *testing.T) {
9797
testFunc := func(t *testing.T, dialOpt grpc.DialOption) {
9898
ctx, cancel := context.WithTimeout(testCtx, 5*time.Second)
9999
defer cancel()
100-
conn, err := grpc.NewClient(addr, dialOpt)
100+
conn, err := grpc.DialContext(ctx, addr, dialOpt)
101101
if err != nil {
102102
t.Fatalf("failed to connect to the server %v", err)
103103
}

go/vt/vitessdriver/driver.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ func (d drv) newConnector(cfg Configuration) (driver.Connector, error) {
174174
}
175175

176176
// Connect implements the database/sql/driver.Connector interface.
177-
func (c *connector) Connect(_ context.Context) (driver.Conn, error) {
177+
func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
178178
conn := &conn{
179179
cfg: c.cfg,
180180
convert: c.convert,
181181
}
182182

183-
if err := conn.dial(); err != nil {
183+
if err := conn.dial(ctx); err != nil {
184184
return nil, err
185185
}
186186

@@ -267,9 +267,9 @@ type conn struct {
267267
session *vtgateconn.VTGateSession
268268
}
269269

270-
func (c *conn) dial() error {
270+
func (c *conn) dial(ctx context.Context) error {
271271
var err error
272-
c.conn, err = vtgateconn.DialProtocol(c.cfg.Protocol, c.cfg.Address)
272+
c.conn, err = vtgateconn.DialProtocol(ctx, c.cfg.Protocol, c.cfg.Address)
273273
if err != nil {
274274
return err
275275
}

go/vt/vtadmin/grpcserver/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestServer(t *testing.T) {
6464
}
6565
close(readyCh)
6666

67-
conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
67+
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
6868
assert.NoError(t, err)
6969

7070
defer conn.Close()

go/vt/vtgate/endtoend/vstream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
)
4242

4343
func initialize(ctx context.Context, t *testing.T) (*vtgateconn.VTGateConn, *mysql.Conn, *mysql.Conn, func()) {
44-
gconn, err := vtgateconn.Dial(grpcAddress)
44+
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
4545
if err != nil {
4646
t.Fatal(err)
4747
}

go/vt/vtgate/fakerpcvtgateconn/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func RegisterFakeVTGateConnDialer() (*FakeVTGateConn, string) {
6161
impl := &FakeVTGateConn{
6262
execMap: make(map[string]*queryResponse),
6363
}
64-
vtgateconn.RegisterDialer(protocol, func(address string) (vtgateconn.Impl, error) {
64+
vtgateconn.RegisterDialer(protocol, func(ctx context.Context, address string) (vtgateconn.Impl, error) {
6565
return impl, nil
6666
})
6767
return impl, protocol

go/vt/vtgate/grpcvtgateconn/conn.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,21 @@ type vtgateConn struct {
7272
c vtgateservicepb.VitessClient
7373
}
7474

75-
func dial(addr string) (vtgateconn.Impl, error) {
76-
return Dial()(addr)
75+
func dial(ctx context.Context, addr string) (vtgateconn.Impl, error) {
76+
return Dial()(ctx, addr)
7777
}
7878

7979
// Dial produces a vtgateconn.DialerFunc with custom options.
8080
func Dial(opts ...grpc.DialOption) vtgateconn.DialerFunc {
81-
return func(address string) (vtgateconn.Impl, error) {
81+
return func(ctx context.Context, address string) (vtgateconn.Impl, error) {
8282
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
8383
if err != nil {
8484
return nil, err
8585
}
8686

8787
opts = append(opts, opt)
8888

89-
cc, err := grpcclient.Dial(address, grpcclient.FailFast(false), opts...)
89+
cc, err := grpcclient.DialContext(ctx, address, grpcclient.FailFast(false), opts...)
9090
if err != nil {
9191
return nil, err
9292
}
@@ -99,6 +99,14 @@ func Dial(opts ...grpc.DialOption) vtgateconn.DialerFunc {
9999
}
100100
}
101101

102+
// DialWithOpts allows for custom dial options to be set on a vtgateConn.
103+
//
104+
// Deprecated: the context parameter cannot be used by the returned
105+
// vtgateconn.DialerFunc and thus has no effect. Use Dial instead.
106+
func DialWithOpts(_ context.Context, opts ...grpc.DialOption) vtgateconn.DialerFunc {
107+
return Dial(opts...)
108+
}
109+
102110
func (conn *vtgateConn) Execute(ctx context.Context, session *vtgatepb.Session, query string, bindVars map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
103111
request := &vtgatepb.ExecuteRequest{
104112
CallerId: callerid.EffectiveCallerIDFromContext(ctx),

0 commit comments

Comments
 (0)