diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 5aa2d6d6ce6..5776c824f7c 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -660,7 +660,17 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. // GetUnresolvedTransactions is part of the tmclient.TabletManagerClient interface. func (client *Client) GetUnresolvedTransactions(ctx context.Context, tablet *topodatapb.Tablet) ([]*querypb.TransactionMetadata, error) { - panic("unimplemented") + c, closer, err := client.dialer.dial(ctx, tablet) + if err != nil { + return nil, err + } + defer closer.Close() + + response, err := c.GetUnresolvedTransactions(ctx, &tabletmanagerdatapb.GetUnresolvedTransactionsRequest{}) + if err != nil { + return nil, err + } + return response.Transactions, nil } // diff --git a/go/vt/vttablet/grpctmserver/server.go b/go/vt/vttablet/grpctmserver/server.go index 42cfd441eeb..3b1fc5ac842 100644 --- a/go/vt/vttablet/grpctmserver/server.go +++ b/go/vt/vttablet/grpctmserver/server.go @@ -275,6 +275,18 @@ func (s *server) ExecuteFetchAsApp(ctx context.Context, request *tabletmanagerda return response, nil } +func (s *server) GetUnresolvedTransactions(ctx context.Context, request *tabletmanagerdatapb.GetUnresolvedTransactionsRequest) (response *tabletmanagerdatapb.GetUnresolvedTransactionsResponse, err error) { + defer s.tm.HandleRPCPanic(ctx, "GetUnresolvedTransactions", request, response, false /*verbose*/, &err) + ctx = callinfo.GRPCCallInfo(ctx) + + transactions, err := s.tm.GetUnresolvedTransactions(ctx) + if err != nil { + return nil, vterrors.ToGRPC(err) + } + + return &tabletmanagerdatapb.GetUnresolvedTransactionsResponse{Transactions: transactions}, nil +} + // // Replication related methods // diff --git a/go/vt/vttablet/tabletmanager/rpc_agent.go b/go/vt/vttablet/tabletmanager/rpc_agent.go index 6dd21a21915..df202e5137a 100644 --- a/go/vt/vttablet/tabletmanager/rpc_agent.go +++ b/go/vt/vttablet/tabletmanager/rpc_agent.go @@ -83,6 +83,8 @@ type RPCTM interface { ExecuteFetchAsApp(ctx context.Context, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error) + GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error) + // Replication related methods PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error) diff --git a/go/vt/vttablet/tabletmanager/rpc_query.go b/go/vt/vttablet/tabletmanager/rpc_query.go index 303bcd4614d..4b8f3148122 100644 --- a/go/vt/vttablet/tabletmanager/rpc_query.go +++ b/go/vt/vttablet/tabletmanager/rpc_query.go @@ -275,6 +275,17 @@ func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, req *tabletmanag return sqltypes.ResultToProto3(result), err } +// GetUnresolvedTransactions returns the unresolved distributed transactions list for the Metadata manager. +func (tm *TabletManager) GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error) { + if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { + return nil, err + } + + tablet := tm.Tablet() + target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type} + return tm.QueryServiceControl.UnresolvedTransactions(ctx, target) +} + // ExecuteQuery submits a new online DDL request func (tm *TabletManager) ExecuteQuery(ctx context.Context, req *tabletmanagerdatapb.ExecuteQueryRequest) (*querypb.QueryResult, error) { if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index aae07fb96f6..9b799171c60 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -98,6 +98,8 @@ type Controller interface { // SetTwoPCAllowed sets whether TwoPC is allowed or not. SetTwoPCAllowed(bool) + + UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 1d9115392d2..d9b04b580dc 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -233,6 +233,11 @@ func (tqsc *Controller) RedoPreparedTransactions() {} func (tqsc *Controller) SetTwoPCAllowed(bool) { } +// UnresolvedTransactions is part of the tabletserver.Controller interface +func (tqsc *Controller) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) { + return nil, nil +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() diff --git a/go/vt/vttablet/tmrpctest/test_tm_rpc.go b/go/vt/vttablet/tmrpctest/test_tm_rpc.go index 9ba01b13d5a..ec685e6f4e4 100644 --- a/go/vt/vttablet/tmrpctest/test_tm_rpc.go +++ b/go/vt/vttablet/tmrpctest/test_tm_rpc.go @@ -56,47 +56,47 @@ type fakeRPCTM struct { } func (fra *fakeRPCTM) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) DeleteVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.DeleteVReplicationWorkflowRequest) (*tabletmanagerdatapb.DeleteVReplicationWorkflowResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) HasVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) ReadVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) ReadVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) UpdateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) UpdateVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowsResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) ResetSequences(ctx context.Context, tables []string) error { - //TODO implement me + // TODO implement me panic("implement me") } func (fra *fakeRPCTM) VDiff(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } @@ -726,6 +726,10 @@ func (fra *fakeRPCTM) ExecuteFetchAsApp(ctx context.Context, req *tabletmanagerd return testExecuteFetchResult, nil } +func (fra *fakeRPCTM) GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error) { + panic("implement me") +} + func tmRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { // using pool qr, err := client.ExecuteFetchAsDba(ctx, tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ @@ -1348,7 +1352,7 @@ func (fra *fakeRPCTM) CheckThrottler(ctx context.Context, req *tabletmanagerdata panic(fmt.Errorf("test-triggered panic")) } - //TODO implement me + // TODO implement me panic("implement me") } @@ -1357,7 +1361,7 @@ func (fra *fakeRPCTM) GetThrottlerStatus(ctx context.Context, req *tabletmanager panic(fmt.Errorf("test-triggered panic")) } - //TODO implement me + // TODO implement me panic("implement me") }