diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 9263f350933..1481f679af3 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1627,6 +1627,120 @@ func TestVindexes(t *testing.T) { }, }, }, + { + name: "Consistent Lookup Single Update", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_consistent_lookup set col = 9 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.consistent_lookup:80-": { + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup-Unique Single Update", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_consistent_lookup set col_unique = 20 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(4) INT64(20)]", + }, + "ks.consistent_lookup_unique:80-": { + "insert:[INT64(20) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup And Consistent Lookup-Unique Single Delete", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "delete from twopc_consistent_lookup where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.twopc_consistent_lookup:40-80": { + "delete:[INT64(6) INT64(4) INT64(9)]", + }, + "ks.consistent_lookup_unique:80-": { + "delete:[INT64(9) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup:80-": { + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Consistent Lookup And Consistent Lookup-Unique Mix", + initQueries: []string{ + "insert into twopc_consistent_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_consistent_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_consistent_lookup(id, col, col_unique) values(20, 4, 22)", + "update twopc_consistent_lookup set col = 9 where col_unique = 9", + "delete from twopc_consistent_lookup where id = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"delete from twopc_consistent_lookup where id = 9 limit 10001 /* INT64 */\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"update twopc_consistent_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_consistent_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + "ks.twopc_consistent_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.twopc_consistent_lookup:80-": { + "delete:[INT64(9) INT64(4) INT64(4)]", + }, + "ks.consistent_lookup_unique:-40": { + "insert:[INT64(22) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup_unique:80-": { + "delete:[INT64(4) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.consistent_lookup:80-": { + "insert:[INT64(4) INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[INT64(9) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[INT64(4) INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, } for _, tt := range testcases { diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 3ce138bc0e4..0461864d767 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -118,10 +118,31 @@ func (txc *TxConn) Commit(ctx context.Context, session *econtext.SafeSession) er } defer recordCommitTime(session, twopc, time.Now()) + + if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil { + _ = txc.Release(ctx, session) + return err + } + if twopc { - return txc.commit2PC(ctx, session) + if err := txc.commit2PC(ctx, session); err != nil { + return err + } + } else { + if err := txc.commitNormal(ctx, session); err != nil { + return err + } } - return txc.commitNormal(ctx, session) + + if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil { + // If last commit fails, there will be nothing to rollback. + session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)}) + // With reserved connection we should release them. + if session.InReservedConn() { + _ = txc.Release(ctx, session) + } + } + return nil } func recordCommitTime(session *econtext.SafeSession, twopc bool, startTime time.Time) { @@ -165,11 +186,6 @@ func (txc *TxConn) commitShard(ctx context.Context, s *vtgatepb.Session_ShardSes } func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSession) error { - if err := txc.runSessions(ctx, session.PreSessions, session.GetLogger(), txc.commitShard); err != nil { - _ = txc.Release(ctx, session) - return err - } - // Retain backward compatibility on commit order for the normal session. for i, shardSession := range session.ShardSessions { if err := txc.commitShard(ctx, shardSession, session.GetLogger()); err != nil { @@ -197,15 +213,6 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi return err } } - - if err := txc.runSessions(ctx, session.PostSessions, session.GetLogger(), txc.commitShard); err != nil { - // If last commit fails, there will be nothing to rollback. - session.RecordWarning(&querypb.QueryWarning{Message: fmt.Sprintf("post-operation transaction had an error: %v", err)}) - // With reserved connection we should release them. - if session.InReservedConn() { - _ = txc.Release(ctx, session) - } - } return nil } @@ -216,11 +223,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) return txc.commitNormal(ctx, session) } - if err := txc.checkValidCondition(session); err != nil { - _ = txc.Rollback(ctx, session) - return err - } - mmShard := session.ShardSessions[0] rmShards := session.ShardSessions[1:] dtid := dtids.New(mmShard) @@ -301,13 +303,6 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) return nil } -func (txc *TxConn) checkValidCondition(session *econtext.SafeSession) error { - if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { - return vterrors.VT12001("atomic distributed transaction commit with consistent lookup vindex") - } - return nil -} - func (txc *TxConn) errActionAndLogWarn( ctx context.Context, session *econtext.SafeSession,