Skip to content

Commit

Permalink
ddl allowed outside of transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 5, 2024
1 parent 5ced946 commit f4521c7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 69 deletions.
58 changes: 24 additions & 34 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/tableacl"
Expand All @@ -45,10 +47,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

// QueryExecutor is used for executing a query request.
Expand Down Expand Up @@ -192,8 +191,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return qr, nil
case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
return qre.execOther()
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanDDL, p.PlanLoad:
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanLoad:
return qre.execAutocommit(qre.txConnExec)
case p.PlanDDL:
return qre.execDDL(nil)
case p.PlanUpdateLimit, p.PlanDeleteLimit:
return qre.execAsTransaction(qre.txConnExec)
case p.PlanCallProc:
Expand Down Expand Up @@ -538,7 +539,7 @@ func (qre *QueryExecutor) checkAccess(authorized *tableacl.ACLResult, tableName
return nil
}

func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, error) {
func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (result *sqltypes.Result, err error) {
// Let's see if this is a normal DDL statement or an Online DDL statement.
// An Online DDL statement is identified by /*vt+ .. */ comment with expected directives, like uuid etc.
if onlineDDL, err := schema.OnlineDDLFromCommentedStatement(qre.plan.FullStmt); err == nil {
Expand All @@ -549,6 +550,21 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
}
}

if conn == nil {
conn, err = qre.tsv.te.txPool.createConn(qre.ctx, qre.options, qre.setting)
if err != nil {
return nil, err
}
defer conn.Release(tx.ConnRelease)
}

// A DDL statement should commit the current transaction in the VTGate.
// The change was made in PR: https://github.com/vitessio/vitess/pull/14110 in v18.
// DDL statement received by vttablet will be outside of a transaction.
if conn.txProps != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL statement executed inside a transaction")
}

isTemporaryTable := false
if ddlStmt, ok := qre.plan.FullStmt.(sqlparser.DDLStatement); ok {
isTemporaryTable = ddlStmt.IsTemporary()
Expand Down Expand Up @@ -580,19 +596,7 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
return nil, err
}
}
result, err := qre.execStatefulConn(conn, sql, true)
if err != nil {
return nil, err
}
// Only perform this operation when the connection has transaction open.
// TODO: This actually does not retain the old transaction. We should see how to provide correct behaviour to client.
if conn.txProps != nil {
err = qre.BeginAgain(qre.ctx, conn)
if err != nil {
return nil, err
}
}
return result, nil
return qre.execStatefulConn(conn, sql, true)
}

func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result, error) {
Expand All @@ -603,20 +607,6 @@ func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result,
return result, nil
}

// BeginAgain commits the existing transaction and begins a new one
func (*QueryExecutor) BeginAgain(ctx context.Context, dc *StatefulConnection) error {
if dc.IsClosed() || dc.TxProperties().Autocommit {
return nil
}
if _, err := dc.Exec(ctx, "commit", 1, false); err != nil {
return err
}
if _, err := dc.Exec(ctx, "begin", 1, false); err != nil {
return err
}
return nil
}

func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
env := evalengine.NewExpressionEnv(qre.ctx, qre.bindVars, evalengine.NewEmptyVCursor(qre.tsv.Environment(), time.Local))
result, err := env.Evaluate(qre.plan.NextCount)
Expand Down
81 changes: 46 additions & 35 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func TestQueryExecutorPlans(t *testing.T) {
// If empty, then we should expect the same as logWant.
inTxWant string
// errorWant is the error we expect to get, if any, and should be nil if no error should be returned
errorWant error
errorWant string
onlyInTxErr bool
// TxThrottler allows the test case to override the transaction throttler
txThrottler txthrottler.TxThrottler
}{{
Expand Down Expand Up @@ -196,9 +197,11 @@ func TestQueryExecutorPlans(t *testing.T) {
query: "alter table test_table add column zipcode int",
result: dmlResult,
}},
resultWant: dmlResult,
planWant: "DDL",
logWant: "alter table test_table add column zipcode int",
resultWant: dmlResult,
planWant: "DDL",
logWant: "alter table test_table add column zipcode int",
onlyInTxErr: true,
errorWant: "DDL statement executed inside a transaction",
}, {
input: "savepoint a",
dbResponses: []dbResponse{{
Expand All @@ -215,20 +218,24 @@ func TestQueryExecutorPlans(t *testing.T) {
query: "alter table `user` add key a (id)",
result: emptyResult,
}},
resultWant: emptyResult,
planWant: "DDL",
logWant: "alter table `user` add key a (id)",
inTxWant: "alter table `user` add key a (id)",
resultWant: emptyResult,
planWant: "DDL",
logWant: "alter table `user` add key a (id)",
inTxWant: "alter table `user` add key a (id)",
onlyInTxErr: true,
errorWant: "DDL statement executed inside a transaction",
}, {
input: "create index a on user(id1 + id2)",
dbResponses: []dbResponse{{
query: "create index a on user(id1 + id2)",
result: emptyResult,
}},
resultWant: emptyResult,
planWant: "DDL",
logWant: "create index a on user(id1 + id2)",
inTxWant: "create index a on user(id1 + id2)",
resultWant: emptyResult,
planWant: "DDL",
logWant: "create index a on user(id1 + id2)",
inTxWant: "create index a on user(id1 + id2)",
onlyInTxErr: true,
errorWant: "DDL statement executed inside a transaction",
}, {
input: "ROLLBACK work to SAVEPOINT a",
dbResponses: []dbResponse{{
Expand Down Expand Up @@ -282,7 +289,7 @@ func TestQueryExecutorPlans(t *testing.T) {
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
errorWant: "Transaction throttled",
txThrottler: &mockTxThrottler{true},
}, {
input: "update test_table set a=1",
Expand All @@ -291,7 +298,7 @@ func TestQueryExecutorPlans(t *testing.T) {
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
errorWant: "Transaction throttled",
txThrottler: &mockTxThrottler{true},
},
}
Expand All @@ -315,39 +322,43 @@ func TestQueryExecutorPlans(t *testing.T) {
// Test outside a transaction.
qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0)
got, err := qre.Execute()
if tcase.errorWant == nil {
if tcase.errorWant != "" && !tcase.onlyInTxErr {
assert.EqualError(t, err, tcase.errorWant)
} else {
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
// Wait for the existing query to be processed by the cache
time.Sleep(100 * time.Millisecond)

// Test inside a transaction.
target := tsv.sm.Target()
state, err := tsv.Begin(ctx, target, nil)
if tcase.errorWant == nil {
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
if tcase.errorWant != "" && !tcase.onlyInTxErr {
require.EqualError(t, err, tcase.errorWant)
return
}
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
if tcase.onlyInTxErr {
require.EqualError(t, err, tcase.errorWant)
return
}
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
})
}
}
Expand Down

0 comments on commit f4521c7

Please sign in to comment.