From 92c3641478ed4f52769a5997e484669aaf0d0333 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 19 Mar 2024 14:49:32 +0530 Subject: [PATCH 01/12] test: add failing test for full status connection pooling Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/general/vtorc_test.go | 73 ++++++++++++++++++++ go/test/endtoend/vtorc/utils/utils.go | 2 +- 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index d79e2964f3e..38bc5f34df9 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -495,3 +495,76 @@ func TestDurabilityPolicySetLater(t *testing.T) { assert.NotNil(t, primary, "should have elected a primary") utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } + +func TestFullStatusConnectionPooling(t *testing.T) { + defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) + defer cluster.PanicHandler(t) + utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 4, 0, []string{ + "--tablet_manager_grpc_concurrency=1", + }, cluster.VTOrcConfiguration{ + PreventCrossDataCenterPrimaryFailover: true, + }, 1, "") + keyspace := &clusterInfo.ClusterInstance.Keyspaces[0] + shard0 := &keyspace.Shards[0] + vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0] + + // find primary from topo + curPrimary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0) + assert.NotNil(t, curPrimary, "should have elected a primary") + vtOrcProcess := clusterInfo.ClusterInstance.VTOrcProcesses[0] + utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.ElectNewPrimaryRecoveryName, 1) + utils.WaitForSuccessfulPRSCount(t, vtOrcProcess, keyspace.Name, shard0.Name, 1) + + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp := utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports and restart it. + curPrimary.VttabletProcess.Port = clusterInfo.ClusterInstance.GetAndReservePort() + curPrimary.VttabletProcess.GrpcPort = clusterInfo.ClusterInstance.GetAndReservePort() + err := curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) + + // REPEATED + // Kill the current primary. + _ = curPrimary.VttabletProcess.Kill() + + // Wait until VTOrc notices some problems + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response == "null" + }) + assert.Equal(t, 200, status) + assert.Contains(t, resp, "UnreachablePrimary") + + time.Sleep(1 * time.Minute) + + // Change the primaries ports back to original and restart it. + curPrimary.VttabletProcess.Port = curPrimary.HTTPPort + curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort + err = curPrimary.VttabletProcess.Setup() + require.NoError(t, err) + + // See that VTOrc eventually reports no errors. + // Wait until there are no problems and the api endpoint returns null + status, resp = utils.MakeAPICallRetry(t, vtorc, "/api/replication-analysis", func(_ int, response string) bool { + return response != "null" + }) + assert.Equal(t, 200, status) + assert.Equal(t, "null", resp) +} diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index dca2c7b1e26..00f75740338 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -733,7 +733,7 @@ func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status // The function provided takes in the status and response and returns if we should continue to retry or not func MakeAPICallRetry(t *testing.T, vtorc *cluster.VTOrcProcess, url string, retry func(int, string) bool) (status int, response string) { t.Helper() - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) for { select { case <-timeout: From 6b09b843b0e9648254143db20ada8144661a1450 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:51:15 +0200 Subject: [PATCH 02/12] TabletManagetClient: dialPool invalidator Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 47 ++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 4c1a8f1e41f..171f0335c8f 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/pflag" "google.golang.org/grpc" + "google.golang.org/grpc/status" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/callerid" @@ -108,7 +109,7 @@ type dialer interface { } type poolDialer interface { - dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) + dialPool(ctx context.Context, tablet *topodatapb.Tablet) (c tabletmanagerservicepb.TabletManagerClient, invalidator func(error), err error) } // Client implements tmclient.TabletManagerClient. @@ -152,11 +153,11 @@ func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) ( return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } -func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, func(error) /* map invalidator */, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name) if err != nil { - return nil, err + return nil, nil, err } client.mu.Lock() @@ -172,7 +173,7 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table for i := 0; i < cap(c); i++ { cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) if err != nil { - return nil, err + return nil, nil, err } c <- &tmc{ cc: cc, @@ -185,7 +186,19 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table result := <-c c <- result - return result.client, nil + invalidator := func(err error) { + if err == nil { + return + } + if _, ok := status.FromError(err); !ok { + // Not a gRPC error + return + } + client.mu.Lock() + defer client.mu.Unlock() + delete(client.rpcClientMap, addr) + } + return result.client, invalidator, nil } // Close is part of the tmclient.TabletManagerClient interface. @@ -470,9 +483,10 @@ func (client *Client) ExecuteQuery(ctx context.Context, tablet *topodatapb.Table func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -497,6 +511,9 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. DisableForeignKeyChecks: req.DisableForeignKeyChecks, }) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response.Result, nil @@ -526,9 +543,10 @@ func (client *Client) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topoda func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -546,6 +564,9 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. response, err := c.ExecuteFetchAsApp(ctx, req) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response.Result, nil @@ -576,8 +597,9 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.FullStatus, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -594,6 +616,9 @@ func (client *Client) FullStatus(ctx context.Context, tablet *topodatapb.Tablet) response, err := c.FullStatus(ctx, &tabletmanagerdatapb.FullStatusRequest{}) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response.Status, nil @@ -1066,8 +1091,9 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -1084,6 +1110,9 @@ func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tab response, err := c.CheckThrottler(ctx, req) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response, nil From bb78973482529fac3052e989b141c4119069f7ef Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 19 Mar 2024 12:57:35 +0200 Subject: [PATCH 03/12] empty commit to kick CI Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> From 395a07e8870d9c38b227de9b6b7db1c80869972b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 19 Mar 2024 13:03:19 +0200 Subject: [PATCH 04/12] fix ExecuteMultiFetchAsDba Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 2cc096ba2c4..05a8c57aa0a 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -523,9 +523,10 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error + var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, err = poolDialer.dialPool(ctx, tablet) + c, invalidator, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -550,6 +551,9 @@ func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topoda DisableForeignKeyChecks: req.DisableForeignKeyChecks, }) if err != nil { + if invalidator != nil { + invalidator(err) + } return nil, err } return response.Results, err From 853517f26393eeddbaf03bf236a77f81a1bb31c9 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Tue, 19 Mar 2024 18:48:20 +0530 Subject: [PATCH 05/12] test: add some comments Signed-off-by: Manan Gupta --- go/test/endtoend/vtorc/general/vtorc_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 38bc5f34df9..cfbb6a69a67 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -496,6 +496,8 @@ func TestDurabilityPolicySetLater(t *testing.T) { utils.CheckReplication(t, newCluster, primary, shard0.Vttablets, 10*time.Second) } +// TestFullStatusConnectionPooling tests that full status RPC succeeds despite a vttablet restarting with a different +// IP address and then back to its original. This test has been added in response to a bug seen in production with a similar situation occurring. func TestFullStatusConnectionPooling(t *testing.T) { defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance) defer cluster.PanicHandler(t) @@ -525,6 +527,8 @@ func TestFullStatusConnectionPooling(t *testing.T) { assert.Equal(t, 200, status) assert.Contains(t, resp, "UnreachablePrimary") + // We have to wait for some time to ensure the gRPC connections from VTOrc to vttablet + // are broken and closed due to keep-alives. Without this timeout the gRPC connections stay open and test passes trivially. time.Sleep(1 * time.Minute) // Change the primaries ports and restart it. @@ -552,8 +556,6 @@ func TestFullStatusConnectionPooling(t *testing.T) { assert.Equal(t, 200, status) assert.Contains(t, resp, "UnreachablePrimary") - time.Sleep(1 * time.Minute) - // Change the primaries ports back to original and restart it. curPrimary.VttabletProcess.Port = curPrimary.HTTPPort curPrimary.VttabletProcess.GrpcPort = curPrimary.GrpcPort From 3b6acfd62cfd1c5baf54bc194d4da98abf6764ae Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 20 Mar 2024 07:52:11 +0200 Subject: [PATCH 06/12] close all clients for address Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 05a8c57aa0a..96d5bbd9056 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -196,6 +196,10 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table } client.mu.Lock() defer client.mu.Unlock() + channels := client.rpcClientMap[addr] + for tm := range channels { + tm.cc.Close() + } delete(client.rpcClientMap, addr) } return result.client, invalidator, nil From 113d3f1e6bfd032854fdae50d13af54f06a4f523 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 20 Mar 2024 07:54:07 +0200 Subject: [PATCH 07/12] shorter form Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 96d5bbd9056..e4dbb0a9a5e 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -196,8 +196,7 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table } client.mu.Lock() defer client.mu.Unlock() - channels := client.rpcClientMap[addr] - for tm := range channels { + for tm := range client.rpcClientMap[addr] { tm.cc.Close() } delete(client.rpcClientMap, addr) From 2ff52e2aeeb1e748230d6e1ae499db63e41c873a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 21 Mar 2024 11:51:30 +0200 Subject: [PATCH 08/12] Do not close connection if error is SQLErr Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index e4dbb0a9a5e..964d816792d 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/status" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/grpcclient" @@ -194,6 +195,10 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table // Not a gRPC error return } + sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError) + if isSQLErr && sqlErr != nil { + return + } client.mu.Lock() defer client.mu.Unlock() for tm := range client.rpcClientMap[addr] { From 62ce4edf9e0ac7043b0f04fafc7d4c2b329cc33e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:01:22 +0200 Subject: [PATCH 09/12] do not close connections on error Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 964d816792d..35130b44bfe 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -25,9 +25,7 @@ import ( "github.com/spf13/pflag" "google.golang.org/grpc" - "google.golang.org/grpc/status" - "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/grpcclient" @@ -191,19 +189,12 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table if err == nil { return } - if _, ok := status.FromError(err); !ok { - // Not a gRPC error - return - } - sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError) - if isSQLErr && sqlErr != nil { - return - } + // A bit aggressively, we close connections and delete the client from cache + // upon any error. This is specifically to solve situation where gRPC communication + // is broken, but at this time we don't have a good way to distinguish between + // gRPC errors and other, "normal" errors. client.mu.Lock() defer client.mu.Unlock() - for tm := range client.rpcClientMap[addr] { - tm.cc.Close() - } delete(client.rpcClientMap, addr) } return result.client, invalidator, nil From 80a8a1bf863525821b560073b4daa0b52f80736e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:02:14 +0200 Subject: [PATCH 10/12] only invalidating cache in FullStatus and CheckThrottler Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 35130b44bfe..d4030044039 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -482,10 +482,9 @@ func (client *Client) ExecuteQuery(ctx context.Context, tablet *topodatapb.Table func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error - var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, invalidator, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -510,9 +509,6 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. DisableForeignKeyChecks: req.DisableForeignKeyChecks, }) if err != nil { - if invalidator != nil { - invalidator(err) - } return nil, err } return response.Result, nil @@ -522,10 +518,9 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error - var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, invalidator, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -550,9 +545,6 @@ func (client *Client) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topoda DisableForeignKeyChecks: req.DisableForeignKeyChecks, }) if err != nil { - if invalidator != nil { - invalidator(err) - } return nil, err } return response.Results, err @@ -582,10 +574,9 @@ func (client *Client) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topoda func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) { var c tabletmanagerservicepb.TabletManagerClient var err error - var invalidator func(error) if usePool { if poolDialer, ok := client.dialer.(poolDialer); ok { - c, invalidator, err = poolDialer.dialPool(ctx, tablet) + c, _, err = poolDialer.dialPool(ctx, tablet) if err != nil { return nil, err } @@ -603,9 +594,6 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. response, err := c.ExecuteFetchAsApp(ctx, req) if err != nil { - if invalidator != nil { - invalidator(err) - } return nil, err } return response.Result, nil From b34e2c2c7a4b95c9bcb37d8b7f85da33d45c4abf Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:03:49 +0200 Subject: [PATCH 11/12] do close when invalidating Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index d4030044039..ee3d5a62ab1 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -195,6 +195,9 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table // gRPC errors and other, "normal" errors. client.mu.Lock() defer client.mu.Unlock() + for tm := range client.rpcClientMap[addr] { + tm.cc.Close() + } delete(client.rpcClientMap, addr) } return result.client, invalidator, nil From 7da6c015d93c07900e1b620dbcd5c4863051b8d4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:42:36 +0200 Subject: [PATCH 12/12] only close the specific client Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/grpctmclient/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index ee3d5a62ab1..4c795cc70a9 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -195,9 +195,7 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table // gRPC errors and other, "normal" errors. client.mu.Lock() defer client.mu.Unlock() - for tm := range client.rpcClientMap[addr] { - tm.cc.Close() - } + result.cc.Close() delete(client.rpcClientMap, addr) } return result.client, invalidator, nil