Skip to content

Commit

Permalink
VReplication: Properly ignore errors from trying to drop tables that …
Browse files Browse the repository at this point in the history
…don't exist (#16505)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Aug 8, 2024
1 parent cf8f5d1 commit 8f0d2d4
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 26 deletions.
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ const smMaterializeSchemaSource = `

const smMaterializeVSchemaSource = `
{
"sharded": true,
"tables": {
"mat": {
"column_vindexes": [
Expand Down Expand Up @@ -197,6 +196,8 @@ func testMaterialize(t *testing.T, useVtctldClient bool) {
_, err = ks2Primary.QueryTablet(customFunc, targetKs, true)
require.NoError(t, err)

testMaterializeWithNonExistentTable(t)

materialize(t, smMaterializeSpec2, useVtctldClient)
catchup(t, ks2Primary, "wf1", "Materialize")

Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,18 @@ func materialize(t *testing.T, spec string, useVtctldClient bool) {
}
}

func testMaterializeWithNonExistentTable(t *testing.T) {
t.Run("vtctldclient materialize with nonexistent table", func(t *testing.T) {
tableSettings := `[{"target_table": "table_that_doesnt_exist", "create_ddl": "create table mat_val_counts (mat_val varbinary(10), cnt int unsigned, primary key (mat_val))", "source_expression": "select val, count(*) as cnt from mat group by val"}]`
output, err := vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--workflow=tablenogood", "--target-keyspace=source",
"create", "--source-keyspace=source", "--table-settings", tableSettings)
require.NoError(t, err, "Materialize create failed, err: %v, output: %s", err, output)
waitForWorkflowState(t, vc, "source.tablenogood", binlogdatapb.VReplicationWorkflowState_Stopped.String())
output, err = vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--workflow=tablenogood", "--target-keyspace=source", "cancel")
require.NoError(t, err, "Materialize cancel failed, err: %v, output: %s", err, output)
})
}

func materializeProduct(t *testing.T, useVtctldClient bool) {
t.Run("materializeProduct", func(t *testing.T) {
// Materializing from "product" keyspace to "customer" keyspace.
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type testKeyspace struct {
type queryResult struct {
query string
result *querypb.QueryResult
err error
}

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -389,7 +390,7 @@ func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatap
return nil, fmt.Errorf("tablet %v:\nunexpected query\n%s\nwant:\n%s", tablet, query, qrs[0].query)
}
tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:]
return qrs[0].result, nil
return qrs[0].result, qrs[0].err
}

func (tmc *testTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) {
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/sqlescape"
Expand Down Expand Up @@ -2548,7 +2547,7 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) {
Query: []byte(sqlOptimizeTable),
MaxRows: uint64(100), // always produces 1+rows with notes and status
}); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Num == sqlerror.ERNoSuchTable { // the table may not exist
if IsTableDidNotExistError(err) {
return
}
log.Warningf("Failed to optimize the copy_state table on %q: %v", tablet.Alias.String(), err)
Expand Down
61 changes: 52 additions & 9 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,34 @@ func TestWorkflowDelete(t *testing.T) {
defer cancel()

workflowName := "wf1"
tableName := "t1"
table1Name := "t1"
table2Name := "t1_2"
table3Name := "t1_3"
tableTemplate := "CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))"
sourceKeyspaceName := "sourceks"
targetKeyspaceName := "targetks"
schema := map[string]*tabletmanagerdatapb.SchemaDefinition{
"t1": {
table1Name: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: tableName,
Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName),
Name: table1Name,
Schema: fmt.Sprintf(tableTemplate, table1Name),
},
},
},
table2Name: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: table2Name,
Schema: fmt.Sprintf(tableTemplate, table2Name),
},
},
},
table3Name: {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: table3Name,
Schema: fmt.Sprintf(tableTemplate, table3Name),
},
},
},
Expand All @@ -239,7 +258,7 @@ func TestWorkflowDelete(t *testing.T) {
postFunc func(t *testing.T, env *testEnv)
}{
{
name: "basic",
name: "missing table",
sourceKeyspace: &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
Expand All @@ -261,7 +280,21 @@ func TestWorkflowDelete(t *testing.T) {
},
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, tableName),
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name),
result: &querypb.QueryResult{},
},
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name),
result: &querypb.QueryResult{},
// We don't care that the cell and tablet info is off in the error message, only that
// it contains the expected SQL error we'd encounter when attempting to drop a table
// that doesn't exist. That will then cause this error to be non-fatal and the workflow
// delete work will continue.
err: fmt.Errorf("rpc error: code = Unknown desc = TabletManager.ExecuteFetchAsDba on cell-01: rpc error: code = Unknown desc = Unknown table 'vt_%s.%s' (errno 1051) (sqlstate 42S02) during query: drop table `vt_%s`.`%s`",
targetKeyspaceName, table2Name, targetKeyspaceName, table2Name),
},
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name),
result: &querypb.QueryResult{},
},
},
Expand All @@ -281,7 +314,7 @@ func TestWorkflowDelete(t *testing.T) {
},
},
{
name: "basic with existing denied table entries",
name: "missing denied table entries",
sourceKeyspace: &testKeyspace{
KeyspaceName: sourceKeyspaceName,
ShardNames: []string{"0"},
Expand All @@ -298,7 +331,9 @@ func TestWorkflowDelete(t *testing.T) {
defer targetUnlock(&err)
for _, shard := range env.targetKeyspace.ShardNames {
_, err := env.ts.UpdateShardFields(lockCtx, targetKeyspaceName, shard, func(si *topo.ShardInfo) error {
err := si.UpdateDeniedTables(lockCtx, topodatapb.TabletType_PRIMARY, nil, false, []string{tableName, "t2", "t3"})
// So t1_2 and t1_3 do not exist in the denied table list when we go
// to remove t1, t1_2, and t1_3.
err := si.UpdateDeniedTables(lockCtx, topodatapb.TabletType_PRIMARY, nil, false, []string{table1Name, "t2", "t3"})
return err
})
require.NoError(t, err)
Expand All @@ -317,7 +352,15 @@ func TestWorkflowDelete(t *testing.T) {
},
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, tableName),
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table1Name),
result: &querypb.QueryResult{},
},
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table2Name),
result: &querypb.QueryResult{},
},
{
query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, table3Name),
result: &querypb.QueryResult{},
},
},
Expand Down
17 changes: 8 additions & 9 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -548,12 +547,12 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
DisableForeignKeyChecks: true,
})
if err != nil {
if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable {
if IsTableDidNotExistError(err) {
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), tableName)
return nil
} else {
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), tableName, err)
return err
}
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), tableName, err)
return err
}
ts.Logger().Infof("%s: Removed table %s.%s\n", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), source.GetPrimary().DbName(), tableName)

Expand Down Expand Up @@ -1179,13 +1178,13 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
})
log.Infof("Removed target table with result: %+v", res)
if err != nil {
if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable {
if IsTableDidNotExistError(err) {
// The table was already gone, so we can ignore the error.
ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName)
return nil
} else {
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)
return err
}
ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err)
return err
}
ts.Logger().Infof("%s: Removed table %s.%s\n",
topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName)
Expand Down
18 changes: 14 additions & 4 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ import (
"strings"
"sync"

querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/vtgate/vindexes"

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
Expand All @@ -46,9 +43,11 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -949,3 +948,14 @@ func getTabletTypeSuffix(tabletType topodatapb.TabletType) string {
}
return ""
}

// IsTableDidNotExistError will convert the given error to an sqlerror.SQLError and if
// the error code is ERNoSuchTable or ERBadTable, it will return true. This is helpful
// when e.g. processing a gRPC error which will be a status.Error that needs to be
// converted to an sqlerror.SQLError before we can examine the error code.
func IsTableDidNotExistError(err error) bool {
if sqlErr, ok := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError); ok {
return sqlErr.Num == sqlerror.ERNoSuchTable || sqlErr.Num == sqlerror.ERBadTable
}
return false
}

0 comments on commit 8f0d2d4

Please sign in to comment.