Skip to content

Commit

Permalink
feat: add consistent lookup vindex tests and make them work
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Dec 15, 2024
1 parent dd6ef5b commit 2b4d1f6
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 28 deletions.
114 changes: 114 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,120 @@ func TestVindexes(t *testing.T) {
},
},
},
{
name: "Consistent Lookup Single Update",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_consistent_lookup set col = 9 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.consistent_lookup:80-": {
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup-Unique Single Update",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"update twopc_consistent_lookup set col_unique = 20 where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(4) INT64(20)]",
},
"ks.consistent_lookup_unique:80-": {
"insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup And Consistent Lookup-Unique Single Delete",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"delete from twopc_consistent_lookup where col_unique = 9",
"commit",
},
logExpected: map[string][]string{
"ks.twopc_consistent_lookup:40-80": {
"delete:[INT64(6) INT64(4) INT64(9)]",
},
"ks.consistent_lookup_unique:80-": {
"delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup:80-": {
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
{
name: "Consistent Lookup And Consistent Lookup-Unique Mix",
initQueries: []string{
"insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)",
"insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)",
},
testQueries: []string{
"begin",
"insert into twopc_consistent_lookup(id, col, col_unique) values(20, 4, 22)",
"update twopc_consistent_lookup set col = 9 where col_unique = 9",
"delete from twopc_consistent_lookup where id = 9",
"commit",
},
logExpected: map[string][]string{
"ks.redo_statement:80-": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]",
},
"ks.redo_statement:40-80": {
"insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
"delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]",
},
"ks.twopc_consistent_lookup:-40": {
"insert:[INT64(20) INT64(4) INT64(22)]",
},
"ks.twopc_consistent_lookup:40-80": {
"update:[INT64(6) INT64(9) INT64(9)]",
},
"ks.twopc_consistent_lookup:80-": {
"delete:[INT64(9) INT64(4) INT64(4)]",
},
"ks.consistent_lookup_unique:-40": {
"insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup_unique:80-": {
"delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
"ks.consistent_lookup:80-": {
"insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
"delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]",
},
},
},
}

for _, tt := range testcases {
Expand Down
51 changes: 23 additions & 28 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,31 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er
}

defer recordCommitTime(session, twopc, time.Now())

if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil {
_ = txc.Release(ctx, session)
return err
}

if twopc {
return txc.commit2PC(ctx, session)
if err := txc.commit2PC(ctx, session); err != nil {
return err
}
} else {
if err := txc.commitNormal(ctx, session); err != nil {
return err
}
}
return txc.commitNormal(ctx, session)

if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil {
// If last commit fails, there will be nothing to rollback.
session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)})
// With reserved connection we should release them.
if session.InReservedConn() {
_ = txc.Release(ctx, session)
}
}
return nil
}

func recordCommitTime(session *econtext.SafeSession, twopc bool, startTime time.Time) {
Expand Down Expand Up @@ -165,11 +186,6 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes
}

func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) error {
if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil {
_ = txc.Release(ctx, session)
return err
}

// Retain backward compatibility on commit order for the normal session.
for i, shardSession := range session.ShardSessions {
if err := txc.commitShard(ctx, shardSession, session.GetLogger()); err != nil {
Expand Down Expand Up @@ -197,15 +213,6 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi
return err
}
}

if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil {
// If last commit fails, there will be nothing to rollback.
session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)})
// With reserved connection we should release them.
if session.InReservedConn() {
_ = txc.Release(ctx, session)
}
}
return nil
}

Expand All @@ -216,11 +223,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
return txc.commitNormal(ctx, session)
}

if err := txc.checkValidCondition(session); err != nil {
_ = txc.Rollback(ctx, session)
return err
}

mmShard := session.ShardSessions[0]
rmShards := session.ShardSessions[1:]
dtid := dtids.New(mmShard)
Expand Down Expand Up @@ -301,13 +303,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession)
return nil
}

func (txc *TxConn) checkValidCondition(session *econtext.SafeSession) error {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex")
}
return nil
}

func (txc *TxConn) errActionAndLogWarn(
ctx context.Context,
session *econtext.SafeSession,
Expand Down

0 comments on commit 2b4d1f6

Please sign in to comment.