Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject TwoPC calls if semi-sync is not enabled #16608

Merged
18 changes: 18 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,24 @@ func TestReadingUnresolvedTransactions(t *testing.T) {
}
}

// TestSemiSyncRequiredWithTwoPC tests that semi-sync is required when using two-phase commit.
func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")

// After changing the durability policy for the given keyspace to none, we try to PRS.
// This call should fail.
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspaceName, shard.Name),
"--new-primary", newPrimary.Alias)
require.Error(t, err)
require.Contains(t, output, "two-pc is enabled, but semi-sync is not")
}

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var (
VT09023 = errorWithoutState("VT09023", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a keyspace id", "Unable to determine the shard for the given row.")
VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.")
VT09025 = errorWithoutState("VT09025", vtrpcpb.Code_FAILED_PRECONDITION, "atomic transaction error: %v", "Error in atomic transactions")
VT09026 = errorWithoutState("VT09026", vtrpcpb.Code_FAILED_PRECONDITION, "two-pc is enabled, but semi-sync is not", "Two-PC requires semi-sync but it is not enabled")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -348,6 +347,17 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
}
defer tm.unlock()

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// Check if two-pc is enabled but semi-sync is not.
// If so, we return an error, because atomic transactions require semi-sync for correctness.
if tm.QueryServiceControl.TwoPCEnabled() && semiSyncAction != SemiSyncActionSet {
return "", vterrors.VT09026()
}

// Setting super_read_only `OFF` so that we can run the DDL commands
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable {
Expand All @@ -369,11 +379,6 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

// Set the server read-write, from now on we can accept real
// client writes. Note that if semi-sync replication is enabled,
// we'll still need some replicas to be able to commit transactions.
Expand Down Expand Up @@ -910,12 +915,18 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str
}
defer tm.unlock()

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
if err != nil {
return "", err
}

semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync)
// Check if two-pc is enabled but semi-sync is not.
// If so, we return an error, because atomic transactions require semi-sync for correctness.
if tm.QueryServiceControl.TwoPCEnabled() && semiSyncAction != SemiSyncActionSet {
return "", vterrors.VT09026()
}

pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
if err != nil {
return "", err
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type Controller interface {
// CheckThrottler
CheckThrottler(ctx context.Context, appName string, flags *throttle.CheckFlags) *throttle.CheckResult
GetThrottlerStatus(ctx context.Context) *throttle.ThrottlerStatus

// TwoPCEnabled returns if two pc is enabled or not.
TwoPCEnabled() bool
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,11 @@ func (tsv *TabletServer) GetThrottlerStatus(ctx context.Context) *throttle.Throt
return r
}

// TwoPCEnabled returns whether TwoPC is enabled or not.
func (tsv *TabletServer) TwoPCEnabled() bool {
return tsv.config.TwoPCEnable
}

// HandlePanic is part of the queryservice.QueryService interface
func (tsv *TabletServer) HandlePanic(err *error) {
if x := recover(); x != nil {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott
return nil
}

// TwoPCEnabled returns whether TwoPC is enabled or not.
func (tqsc *Controller) TwoPCEnabled() bool {
return false
}

// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
Expand Down
Loading