Skip to content

Commit

Permalink
implement tablet manager side of retrieving unresolved transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Aug 29, 2024
1 parent cbb6144 commit 6ec68ca
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 12 deletions.
12 changes: 11 additions & 1 deletion go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,17 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb.

// GetUnresolvedTransactions is part of the tmclient.TabletManagerClient interface.
func (client *Client) GetUnresolvedTransactions(ctx context.Context, tablet *topodatapb.Tablet) ([]*querypb.TransactionMetadata, error) {
panic("unimplemented")
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
}
defer closer.Close()

response, err := c.GetUnresolvedTransactions(ctx, &tabletmanagerdatapb.GetUnresolvedTransactionsRequest{})
if err != nil {
return nil, err
}
return response.Transactions, nil
}

//
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/grpctmserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func (s *server) ExecuteFetchAsApp(ctx context.Context, request *tabletmanagerda
return response, nil
}

func (s *server) GetUnresolvedTransactions(ctx context.Context, request *tabletmanagerdatapb.GetUnresolvedTransactionsRequest) (response *tabletmanagerdatapb.GetUnresolvedTransactionsResponse, err error) {
defer s.tm.HandleRPCPanic(ctx, "GetUnresolvedTransactions", request, response, false /*verbose*/, &err)
ctx = callinfo.GRPCCallInfo(ctx)

transactions, err := s.tm.GetUnresolvedTransactions(ctx)
if err != nil {
return nil, vterrors.ToGRPC(err)
}

return &tabletmanagerdatapb.GetUnresolvedTransactionsResponse{Transactions: transactions}, nil
}

//
// Replication related methods
//
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type RPCTM interface {

ExecuteFetchAsApp(ctx context.Context, req *tabletmanagerdatapb.ExecuteFetchAsAppRequest) (*querypb.QueryResult, error)

GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error)

// Replication related methods
PrimaryStatus(ctx context.Context) (*replicationdatapb.PrimaryStatus, error)

Expand Down
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ func (tm *TabletManager) ExecuteFetchAsApp(ctx context.Context, req *tabletmanag
return sqltypes.ResultToProto3(result), err
}

// GetUnresolvedTransactions returns the unresolved distributed transactions list for the Metadata manager.
func (tm *TabletManager) GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error) {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}

tablet := tm.Tablet()
target := &querypb.Target{Keyspace: tablet.Keyspace, Shard: tablet.Shard, TabletType: tablet.Type}
return tm.QueryServiceControl.UnresolvedTransactions(ctx, target)
}

// ExecuteQuery submits a new online DDL request
func (tm *TabletManager) ExecuteQuery(ctx context.Context, req *tabletmanagerdatapb.ExecuteQueryRequest) (*querypb.QueryResult, error) {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type Controller interface {

// SetTwoPCAllowed sets whether TwoPC is allowed or not.
SetTwoPCAllowed(bool)

UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error)
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func (tqsc *Controller) RedoPreparedTransactions() {}
func (tqsc *Controller) SetTwoPCAllowed(bool) {
}

// UnresolvedTransactions is part of the tabletserver.Controller interface
func (tqsc *Controller) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) {
return nil, nil
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down
26 changes: 15 additions & 11 deletions go/vt/vttablet/tmrpctest/test_tm_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,47 +56,47 @@ type fakeRPCTM struct {
}

func (fra *fakeRPCTM) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) DeleteVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.DeleteVReplicationWorkflowRequest) (*tabletmanagerdatapb.DeleteVReplicationWorkflowResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) HasVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) ReadVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) ReadVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) UpdateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) UpdateVReplicationWorkflows(ctx context.Context, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowsResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) ResetSequences(ctx context.Context, tables []string) error {
//TODO implement me
// TODO implement me
panic("implement me")
}

func (fra *fakeRPCTM) VDiff(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -726,6 +726,10 @@ func (fra *fakeRPCTM) ExecuteFetchAsApp(ctx context.Context, req *tabletmanagerd
return testExecuteFetchResult, nil
}

func (fra *fakeRPCTM) GetUnresolvedTransactions(ctx context.Context) ([]*querypb.TransactionMetadata, error) {
panic("implement me")
}

func tmRPCTestExecuteFetch(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) {
// using pool
qr, err := client.ExecuteFetchAsDba(ctx, tablet, true, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Expand Down Expand Up @@ -1348,7 +1352,7 @@ func (fra *fakeRPCTM) CheckThrottler(ctx context.Context, req *tabletmanagerdata
panic(fmt.Errorf("test-triggered panic"))
}

//TODO implement me
// TODO implement me
panic("implement me")
}

Expand All @@ -1357,7 +1361,7 @@ func (fra *fakeRPCTM) GetThrottlerStatus(ctx context.Context, req *tabletmanager
panic(fmt.Errorf("test-triggered panic"))
}

//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down

0 comments on commit 6ec68ca

Please sign in to comment.