diff --git a/pkg/migration/change.go b/pkg/migration/change.go index 9c12bfbb..96a1b3cf 100644 --- a/pkg/migration/change.go +++ b/pkg/migration/change.go @@ -4,8 +4,8 @@ import ( "context" "fmt" - "github.com/block/spirit/pkg/check" "github.com/block/spirit/pkg/dbconn" + "github.com/block/spirit/pkg/migration/check" "github.com/block/spirit/pkg/statement" "github.com/block/spirit/pkg/table" ) diff --git a/pkg/check/check.go b/pkg/migration/check/check.go similarity index 100% rename from pkg/check/check.go rename to pkg/migration/check/check.go diff --git a/pkg/check/check_test.go b/pkg/migration/check/check_test.go similarity index 100% rename from pkg/check/check_test.go rename to pkg/migration/check/check_test.go diff --git a/pkg/check/configuration.go b/pkg/migration/check/configuration.go similarity index 100% rename from pkg/check/configuration.go rename to pkg/migration/check/configuration.go diff --git a/pkg/check/configuration_test.go b/pkg/migration/check/configuration_test.go similarity index 100% rename from pkg/check/configuration_test.go rename to pkg/migration/check/configuration_test.go diff --git a/pkg/check/dropadd.go b/pkg/migration/check/dropadd.go similarity index 100% rename from pkg/check/dropadd.go rename to pkg/migration/check/dropadd.go diff --git a/pkg/check/dropadd_test.go b/pkg/migration/check/dropadd_test.go similarity index 100% rename from pkg/check/dropadd_test.go rename to pkg/migration/check/dropadd_test.go diff --git a/pkg/check/foreignkey.go b/pkg/migration/check/foreignkey.go similarity index 100% rename from pkg/check/foreignkey.go rename to pkg/migration/check/foreignkey.go diff --git a/pkg/check/foreignkey_test.go b/pkg/migration/check/foreignkey_test.go similarity index 100% rename from pkg/check/foreignkey_test.go rename to pkg/migration/check/foreignkey_test.go diff --git a/pkg/check/illegalclause.go b/pkg/migration/check/illegalclause.go similarity index 100% rename from pkg/check/illegalclause.go rename to pkg/migration/check/illegalclause.go diff --git a/pkg/check/illegalclause_test.go b/pkg/migration/check/illegalclause_test.go similarity index 100% rename from pkg/check/illegalclause_test.go rename to pkg/migration/check/illegalclause_test.go diff --git a/pkg/check/primarykey.go b/pkg/migration/check/primarykey.go similarity index 100% rename from pkg/check/primarykey.go rename to pkg/migration/check/primarykey.go diff --git a/pkg/check/primarykey_test.go b/pkg/migration/check/primarykey_test.go similarity index 100% rename from pkg/check/primarykey_test.go rename to pkg/migration/check/primarykey_test.go diff --git a/pkg/check/privileges.go b/pkg/migration/check/privileges.go similarity index 100% rename from pkg/check/privileges.go rename to pkg/migration/check/privileges.go diff --git a/pkg/check/privileges_test.go b/pkg/migration/check/privileges_test.go similarity index 100% rename from pkg/check/privileges_test.go rename to pkg/migration/check/privileges_test.go diff --git a/pkg/check/rename.go b/pkg/migration/check/rename.go similarity index 100% rename from pkg/check/rename.go rename to pkg/migration/check/rename.go diff --git a/pkg/check/rename_test.go b/pkg/migration/check/rename_test.go similarity index 100% rename from pkg/check/rename_test.go rename to pkg/migration/check/rename_test.go diff --git a/pkg/check/replicahealth.go b/pkg/migration/check/replicahealth.go similarity index 100% rename from pkg/check/replicahealth.go rename to pkg/migration/check/replicahealth.go diff --git a/pkg/check/replicahealth_test.go b/pkg/migration/check/replicahealth_test.go similarity index 100% rename from pkg/check/replicahealth_test.go rename to pkg/migration/check/replicahealth_test.go diff --git a/pkg/check/replicaprivileges.go b/pkg/migration/check/replicaprivileges.go similarity index 100% rename from pkg/check/replicaprivileges.go rename to pkg/migration/check/replicaprivileges.go diff --git a/pkg/check/replicaprivileges_test.go b/pkg/migration/check/replicaprivileges_test.go similarity index 100% rename from pkg/check/replicaprivileges_test.go rename to pkg/migration/check/replicaprivileges_test.go diff --git a/pkg/check/settings.go b/pkg/migration/check/settings.go similarity index 100% rename from pkg/check/settings.go rename to pkg/migration/check/settings.go diff --git a/pkg/check/settings_test.go b/pkg/migration/check/settings_test.go similarity index 100% rename from pkg/check/settings_test.go rename to pkg/migration/check/settings_test.go diff --git a/pkg/check/tablename.go b/pkg/migration/check/tablename.go similarity index 100% rename from pkg/check/tablename.go rename to pkg/migration/check/tablename.go diff --git a/pkg/check/tablename_test.go b/pkg/migration/check/tablename_test.go similarity index 100% rename from pkg/check/tablename_test.go rename to pkg/migration/check/tablename_test.go diff --git a/pkg/check/triggers.go b/pkg/migration/check/triggers.go similarity index 100% rename from pkg/check/triggers.go rename to pkg/migration/check/triggers.go diff --git a/pkg/check/triggers_test.go b/pkg/migration/check/triggers_test.go similarity index 100% rename from pkg/check/triggers_test.go rename to pkg/migration/check/triggers_test.go diff --git a/pkg/check/version.go b/pkg/migration/check/version.go similarity index 100% rename from pkg/check/version.go rename to pkg/migration/check/version.go diff --git a/pkg/check/version_test.go b/pkg/migration/check/version_test.go similarity index 100% rename from pkg/check/version_test.go rename to pkg/migration/check/version_test.go diff --git a/pkg/check/visibility_change.go b/pkg/migration/check/visibility_change.go similarity index 100% rename from pkg/check/visibility_change.go rename to pkg/migration/check/visibility_change.go diff --git a/pkg/check/visibility_change_test.go b/pkg/migration/check/visibility_change_test.go similarity index 100% rename from pkg/check/visibility_change_test.go rename to pkg/migration/check/visibility_change_test.go diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go index 0f826d27..0ee748d6 100644 --- a/pkg/migration/migration.go +++ b/pkg/migration/migration.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/block/spirit/pkg/check" + "github.com/block/spirit/pkg/migration/check" "github.com/block/spirit/pkg/statement" "github.com/block/spirit/pkg/table" "github.com/go-ini/ini" diff --git a/pkg/migration/migration_test.go b/pkg/migration/migration_test.go index 8e1410bf..63e3bc94 100644 --- a/pkg/migration/migration_test.go +++ b/pkg/migration/migration_test.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "strings" - "sync" "testing" "time" @@ -34,36 +33,6 @@ func mkIniFile(t *testing.T, content string) *os.File { return tmpFile } -var ( - isRBRTestRunnerCached bool - isRBRTestRunnerOnce sync.Once -) - -func isMinimalRBRTestRunner(t *testing.T) bool { - // Check if we are in the minimal RBR test runner. - // we use this to skip certain tests. - isRBRTestRunnerOnce.Do(func() { - cfg, err := mysql.ParseDSN(testutils.DSN()) - require.NoError(t, err) - db, err := sql.Open("mysql", cfg.FormatDSN()) - require.NoError(t, err) - defer db.Close() - var binlogRowImage, binlogRowValueOptions string - err = db.QueryRowContext(t.Context(), - `SELECT - @@global.binlog_row_image, - @@global.binlog_row_value_options`).Scan( - &binlogRowImage, - &binlogRowValueOptions, - ) - require.NoError(t, err) - if binlogRowImage != "FULL" || binlogRowValueOptions != "" { - isRBRTestRunnerCached = true - } - }) - return isRBRTestRunnerCached -} - func TestMain(m *testing.M) { status.CheckpointDumpInterval = 100 * time.Millisecond status.StatusInterval = 10 * time.Millisecond // the status will be accurate to 1ms @@ -278,7 +247,7 @@ func TestGeneratedColumns(t *testing.T) { testGeneratedColumns(t, false) }) t.Run("buffered", func(t *testing.T) { - if isMinimalRBRTestRunner(t) { + if testutils.IsMinimalRBRTestRunner(t) { t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty") } testGeneratedColumns(t, true) @@ -317,7 +286,7 @@ func TestStoredGeneratedColumns(t *testing.T) { testStoredGeneratedColumns(t, false) }) t.Run("buffered", func(t *testing.T) { - if isMinimalRBRTestRunner(t) { + if testutils.IsMinimalRBRTestRunner(t) { t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty") } testStoredGeneratedColumns(t, true) @@ -394,7 +363,7 @@ func TestBinaryChecksum(t *testing.T) { testBinaryChecksum(t, false) }) t.Run("buffered", func(t *testing.T) { - if isMinimalRBRTestRunner(t) { + if testutils.IsMinimalRBRTestRunner(t) { t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty") } testBinaryChecksum(t, true) @@ -448,7 +417,7 @@ func TestConvertCharset(t *testing.T) { testConvertCharset(t, false) }) t.Run("buffered", func(t *testing.T) { - if isMinimalRBRTestRunner(t) { + if testutils.IsMinimalRBRTestRunner(t) { t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty") } testConvertCharset(t, true) diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 46918c02..a000d335 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -12,11 +12,11 @@ import ( "github.com/go-sql-driver/mysql" "github.com/block/spirit/pkg/applier" - "github.com/block/spirit/pkg/check" "github.com/block/spirit/pkg/checksum" "github.com/block/spirit/pkg/copier" "github.com/block/spirit/pkg/dbconn" "github.com/block/spirit/pkg/metrics" + "github.com/block/spirit/pkg/migration/check" "github.com/block/spirit/pkg/repl" "github.com/block/spirit/pkg/status" "github.com/block/spirit/pkg/table" diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index cf3fba56..b7282857 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -11,9 +11,9 @@ import ( "testing" "time" - "github.com/block/spirit/pkg/check" "github.com/block/spirit/pkg/copier" "github.com/block/spirit/pkg/dbconn" + "github.com/block/spirit/pkg/migration/check" "github.com/block/spirit/pkg/status" "github.com/block/spirit/pkg/table" "github.com/block/spirit/pkg/testutils" diff --git a/pkg/move/check/check.go b/pkg/move/check/check.go new file mode 100644 index 00000000..18730f75 --- /dev/null +++ b/pkg/move/check/check.go @@ -0,0 +1,71 @@ +// Package check provides various configuration and health checks +// that can be run for move operations. +package check + +import ( + "context" + "database/sql" + "log/slog" + "sync" + + "github.com/block/spirit/pkg/applier" + "github.com/block/spirit/pkg/table" + "github.com/go-sql-driver/mysql" +) + +// ScopeFlag scopes a check +type ScopeFlag uint8 + +const ( + ScopeNone ScopeFlag = iota + ScopePreRun + ScopePreflight + ScopePostSetup + ScopeResume +) + +// Resources contains the resources needed for move checks +type Resources struct { + SourceDB *sql.DB + SourceConfig *mysql.Config + Targets []applier.Target + SourceTables []*table.TableInfo + CreateSentinel bool + // For PreRun checks (before DB connections established) + SourceDSN string + TargetDSN string +} + +type check struct { + callback func(context.Context, Resources, *slog.Logger) error + scope ScopeFlag +} + +var ( + checks map[string]check + lock sync.Mutex +) + +// registerCheck registers a check (callback func) and a scope (aka time) that it is expected to be run +func registerCheck(name string, callback func(context.Context, Resources, *slog.Logger) error, scope ScopeFlag) { + lock.Lock() + defer lock.Unlock() + if checks == nil { + checks = make(map[string]check) + } + checks[name] = check{callback: callback, scope: scope} +} + +// RunChecks runs all checks that are registered for the given scope +func RunChecks(ctx context.Context, r Resources, logger *slog.Logger, scope ScopeFlag) error { + for _, check := range checks { + if check.scope&scope == 0 { + continue + } + err := check.callback(ctx, r, logger) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/move/check/configuration.go b/pkg/move/check/configuration.go new file mode 100644 index 00000000..6f8c965d --- /dev/null +++ b/pkg/move/check/configuration.go @@ -0,0 +1,61 @@ +package check + +import ( + "context" + "errors" + "log/slog" +) + +func init() { + registerCheck("configuration", configurationCheck, ScopePreflight) +} + +// configurationCheck verifies the MySQL configuration on the source database +// is suitable for move operations. Move operations require: +// - ROW binlog format for reading changes +// - Binary logging enabled +// - log_slave_updates enabled (unless we verify no replication) +// - FULL binlog_row_image (for buffered copy which is always used in move) +func configurationCheck(ctx context.Context, r Resources, logger *slog.Logger) error { + var binlogFormat, binlogRowImage, logBin, logSlaveUpdates, binlogRowValueOptions string + err := r.SourceDB.QueryRowContext(ctx, + `SELECT @@global.binlog_format, + @@global.binlog_row_image, + @@global.log_bin, + @@global.log_slave_updates, + @@global.binlog_row_value_options`).Scan( + &binlogFormat, + &binlogRowImage, + &logBin, + &logSlaveUpdates, + &binlogRowValueOptions, + ) + if err != nil { + return err + } + + if binlogFormat != "ROW" { + return errors.New("binlog_format must be ROW") + } + + // Move always uses buffered copy, which requires FULL binlog_row_image + if binlogRowImage != "FULL" { + return errors.New("binlog_row_image must be FULL for move operations (buffered copy requires reading all columns from binlog)") + } + + if binlogRowValueOptions != "" { + return errors.New("binlog_row_value_options must be empty for move operations (buffered copy requires reading all columns from binlog)") + } + + if logBin != "1" { + return errors.New("log_bin must be enabled") + } + + if logSlaveUpdates != "1" { + // This is a hard requirement unless we enhance this to confirm + // it's not receiving any updates via the replication stream. + return errors.New("log_slave_updates must be enabled") + } + + return nil +} diff --git a/pkg/move/check/configuration_test.go b/pkg/move/check/configuration_test.go new file mode 100644 index 00000000..22f2986a --- /dev/null +++ b/pkg/move/check/configuration_test.go @@ -0,0 +1,28 @@ +package check + +import ( + "context" + "database/sql" + "log/slog" + "testing" + + "github.com/block/spirit/pkg/testutils" + "github.com/stretchr/testify/assert" +) + +func TestConfigurationCheck(t *testing.T) { + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } + + db, err := sql.Open("mysql", testutils.DSN()) + assert.NoError(t, err) + defer db.Close() + + // Test with valid configuration + r := Resources{ + SourceDB: db, + } + err = configurationCheck(context.Background(), r, slog.Default()) + assert.NoError(t, err) +} diff --git a/pkg/move/check/resume_state.go b/pkg/move/check/resume_state.go new file mode 100644 index 00000000..7a1a1f8b --- /dev/null +++ b/pkg/move/check/resume_state.go @@ -0,0 +1,83 @@ +package check + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log/slog" + "slices" + + "github.com/block/spirit/pkg/table" +) + +func init() { + registerCheck("resume_state", resumeStateCheck, ScopeResume) +} + +// resumeStateCheck validates that the state is compatible with resuming from a checkpoint. +// This check runs at ScopeResume and verifies: +// 1. Checkpoint table exists on source +// 2. Target tables exist with matching schema to source +// 3. All source tables have corresponding target tables +// +// This check is the complement to target_state check: +// - target_state validates we can start a NEW copy (empty or matching empty tables) +// - resume_state validates we can RESUME from checkpoint (tables exist with data) +func resumeStateCheck(ctx context.Context, r Resources, logger *slog.Logger) error { + if len(r.SourceTables) == 0 { + return errors.New("no source tables available for resume validation") + } + + // Check 1: Verify checkpoint table exists on source + checkpointTableName := "_spirit_checkpoint" + var checkpointExists int + err := r.SourceDB.QueryRowContext(ctx, + "SELECT 1 FROM information_schema.TABLES WHERE table_schema = ? AND table_name = ?", + r.SourceConfig.DBName, checkpointTableName).Scan(&checkpointExists) + if err == sql.ErrNoRows { + return fmt.Errorf("checkpoint table '%s.%s' does not exist; cannot resume", r.SourceConfig.DBName, checkpointTableName) + } + if err != nil { + return fmt.Errorf("failed to check for checkpoint table: %w", err) + } + + logger.Info("checkpoint table exists, validating target tables for resume", + "checkpoint_table", fmt.Sprintf("%s.%s", r.SourceConfig.DBName, checkpointTableName)) + + // Check 2: Verify all source tables have corresponding target tables with matching schema + for _, sourceTable := range r.SourceTables { + for i, target := range r.Targets { + // Check if table exists on target + var tableExists int + err := target.DB.QueryRowContext(ctx, + "SELECT 1 FROM information_schema.TABLES WHERE table_schema = ? AND table_name = ?", + target.Config.DBName, sourceTable.TableName).Scan(&tableExists) + if err == sql.ErrNoRows { + return fmt.Errorf("table '%s' does not exist on target %d (%s); cannot resume. Target state is incomplete", + sourceTable.TableName, i, target.Config.DBName) + } + if err != nil { + return fmt.Errorf("failed to check for table '%s' on target %d: %w", sourceTable.TableName, i, err) + } + + // Verify schema matches + targetTable := table.NewTableInfo(target.DB, target.Config.DBName, sourceTable.TableName) + if err := targetTable.SetInfo(ctx); err != nil { + return fmt.Errorf("failed to get table info for target %d table '%s': %w", i, sourceTable.TableName, err) + } + + if !slices.Equal(sourceTable.Columns, targetTable.Columns) { + return fmt.Errorf("table '%s' schema mismatch between source and target %d (%s); cannot resume safely. Schema may have changed since checkpoint was created", + sourceTable.TableName, i, target.Config.DBName) + } + + logger.Debug("validated target table for resume", + "table", sourceTable.TableName, + "target", i, + "database", target.Config.DBName, + "columns", len(targetTable.Columns)) + } + } + return nil +} diff --git a/pkg/move/check/resume_state_test.go b/pkg/move/check/resume_state_test.go new file mode 100644 index 00000000..498f4fbe --- /dev/null +++ b/pkg/move/check/resume_state_test.go @@ -0,0 +1,252 @@ +package check + +import ( + "database/sql" + "log/slog" + "testing" + + "github.com/block/spirit/pkg/applier" + "github.com/block/spirit/pkg/table" + "github.com/block/spirit/pkg/testutils" + "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestResumeStateCheck(t *testing.T) { + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } + + // Setup source and target databases + testutils.RunSQL(t, `DROP DATABASE IF EXISTS resume_src`) + testutils.RunSQL(t, `CREATE DATABASE resume_src`) + testutils.RunSQL(t, `DROP DATABASE IF EXISTS resume_tgt`) + testutils.RunSQL(t, `CREATE DATABASE resume_tgt`) + + sourceDB, err := sql.Open("mysql", testutils.DSNForDatabase("resume_src")) + assert.NoError(t, err) + defer sourceDB.Close() + + targetDB, err := sql.Open("mysql", testutils.DSNForDatabase("resume_tgt")) + assert.NoError(t, err) + defer targetDB.Close() + + // Create a test table on source + _, err = sourceDB.ExecContext(t.Context(), "CREATE TABLE resume_src.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + + // Get source table info + sourceTable := table.NewTableInfo(sourceDB, "resume_src", "test_table") + err = sourceTable.SetInfo(t.Context()) + require.NoError(t, err) + + sourceConfig := &mysql.Config{ + DBName: "resume_src", + } + + targetConfig := &mysql.Config{ + DBName: "resume_tgt", + } + + // Test 1: No checkpoint table should fail + t.Run("no checkpoint table fails", func(t *testing.T) { + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := resumeStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "checkpoint table") + assert.Contains(t, err.Error(), "does not exist") + }) + + // Create checkpoint table for remaining tests + _, err = sourceDB.ExecContext(t.Context(), `CREATE TABLE resume_src._spirit_checkpoint ( + id int NOT NULL AUTO_INCREMENT PRIMARY KEY, + copier_watermark TEXT, + checksum_watermark TEXT, + binlog_name VARCHAR(255), + binlog_pos INT, + statement TEXT + )`) + require.NoError(t, err) + defer func() { + _, _ = sourceDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_src._spirit_checkpoint") + }() + // Test 2: Checkpoint exists but no target tables should fail + t.Run("checkpoint exists but no target tables fails", func(t *testing.T) { + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := resumeStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not exist on target") + assert.Contains(t, err.Error(), "cannot resume") + }) + + // Create matching target table + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE resume_tgt.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + defer func() { + _, _ = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_tgt.test_table") + }() + + // Test 3: Checkpoint and matching target tables should pass + t.Run("checkpoint and matching target tables pass", func(t *testing.T) { + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := resumeStateCheck(t.Context(), r, slog.Default()) + assert.NoError(t, err) + }) + + // Test 4: Schema mismatch should fail + t.Run("schema mismatch fails", func(t *testing.T) { + // Drop and recreate target table with different schema + _, err = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_tgt.test_table") + require.NoError(t, err) + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE resume_tgt.test_table (id int not null primary key, different_col VARCHAR(100))") + require.NoError(t, err) + defer func() { + _, _ = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_tgt.test_table") + }() + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := resumeStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "schema mismatch") + assert.Contains(t, err.Error(), "cannot resume safely") + }) + + // Test 5: No source tables should fail + t.Run("no source tables fails", func(t *testing.T) { + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{}, // Empty + } + err := resumeStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no source tables") + }) + + // Test 6: Multiple targets with matching schema should pass + t.Run("multiple targets with matching schema pass", func(t *testing.T) { + // Create second target database + testutils.RunSQL(t, `DROP DATABASE IF EXISTS resume_tgt2`) + testutils.RunSQL(t, `CREATE DATABASE resume_tgt2`) + + targetDB2, err := sql.Open("mysql", testutils.DSNForDatabase("resume_tgt2")) + require.NoError(t, err) + defer targetDB2.Close() + + // Recreate first target table with correct schema + _, err = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_tgt.test_table") + require.NoError(t, err) + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE resume_tgt.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + + // Create matching table on second target + _, err = targetDB2.ExecContext(t.Context(), "CREATE TABLE resume_tgt2.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + defer func() { + _, _ = targetDB2.ExecContext(t.Context(), "DROP TABLE IF EXISTS resume_tgt2.test_table") + }() + + targetConfig2 := &mysql.Config{ + DBName: "resume_tgt2", + } + + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + { + DB: targetDB2, + Config: targetConfig2, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err = resumeStateCheck(t.Context(), r, slog.Default()) + assert.NoError(t, err) + }) + + // Test 7: Multiple targets with one missing table should fail + t.Run("multiple targets with one missing table fails", func(t *testing.T) { + // Create second target database without the table + testutils.RunSQL(t, `DROP DATABASE IF EXISTS resume_tgt3`) + testutils.RunSQL(t, `CREATE DATABASE resume_tgt3`) + + targetDB3, err := sql.Open("mysql", testutils.DSNForDatabase("resume_tgt3")) + require.NoError(t, err) + defer targetDB3.Close() + + targetConfig3 := &mysql.Config{ + DBName: "resume_tgt3", + } + + r := Resources{ + SourceDB: sourceDB, + SourceConfig: sourceConfig, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + { + DB: targetDB3, + Config: targetConfig3, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err = resumeStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not exist on target 1") + assert.Contains(t, err.Error(), "cannot resume") + }) +} diff --git a/pkg/move/check/target_state.go b/pkg/move/check/target_state.go new file mode 100644 index 00000000..c9aac834 --- /dev/null +++ b/pkg/move/check/target_state.go @@ -0,0 +1,114 @@ +package check + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "slices" + + "github.com/block/spirit/pkg/applier" + "github.com/block/spirit/pkg/table" +) + +func init() { + registerCheck("target_state", targetStateCheck, ScopePostSetup) +} + +// targetStateCheck validates that target databases are ready for the move operation. +// If SourceTables is specified, it checks that those specific tables don't exist in the target, +// OR if they do exist, they must be empty (zero rows) and have matching schema to the source. +// If SourceTables is not specified, it checks that the target database is completely empty. +// +// This check runs at ScopePostSetup because it needs the source tables to be discovered first. +func targetStateCheck(ctx context.Context, r Resources, logger *slog.Logger) error { + if len(r.SourceTables) == 0 { + // No source tables to validate against, skip check + return nil + } + + // Build a map of source tables for quick lookup + sourceTableMap := make(map[string]*table.TableInfo) + for _, tbl := range r.SourceTables { + sourceTableMap[tbl.TableName] = tbl + } + + for i, target := range r.Targets { + rows, err := target.DB.QueryContext(ctx, "SHOW TABLES") + if err != nil { + return fmt.Errorf("failed to check target %d: %w", i, err) + } + defer rows.Close() + + var tableName string + var existingTables []string + for rows.Next() { + if err := rows.Scan(&tableName); err != nil { + return fmt.Errorf("failed to scan table name on target %d: %w", i, err) + } + + // Only validate tables that are in our source list + if _, isSourceTable := sourceTableMap[tableName]; isSourceTable { + existingTables = append(existingTables, tableName) + } + } + if err := rows.Err(); err != nil { + return err + } + + // If there are existing tables, validate they are empty and have matching schema + if len(existingTables) > 0 { + for _, tableName := range existingTables { + if err := validateExistingTargetTable(ctx, target, tableName, i, sourceTableMap[tableName], logger); err != nil { + return err + } + } + } + } + return nil +} + +// validateExistingTargetTable checks that an existing table in the target database +// is empty (zero rows) and has a schema that matches the source table. +// This allows move-tables to work with pre-created tables, which is necessary +// for declarative schema management workflows where tables must be tracked +// in the migrations directory before data is moved. +func validateExistingTargetTable(ctx context.Context, target applier.Target, tableName string, targetIndex int, sourceTable *table.TableInfo, logger *slog.Logger) error { + // Check 1: Table must have zero rows + // Use LIMIT 1 instead of COUNT(*) for performance - we only need to know if there's at least one row + var hasRows int + err := target.DB.QueryRowContext(ctx, fmt.Sprintf("SELECT 1 FROM `%s`.`%s` LIMIT 1", target.Config.DBName, tableName)).Scan(&hasRows) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to check if table '%s' on target %d is empty: %w", tableName, targetIndex, err) + } + if err == nil { + // Found at least one row - table is not empty + return fmt.Errorf("table '%s' already exists on target %d (%s) and is not empty; move-tables requires target tables to be empty to prevent data loss. Please drop the table or use a different target", + tableName, targetIndex, target.Config.DBName) + } + + // Check 2: Schema must match source exactly + if sourceTable == nil { + // Table exists on target but not in source - this is fine, we'll ignore it + return nil + } + + // Get target table info and compare columns + targetTable := table.NewTableInfo(target.DB, target.Config.DBName, tableName) + if err := targetTable.SetInfo(ctx); err != nil { + return fmt.Errorf("failed to get table info for target %d table '%s': %w", targetIndex, tableName, err) + } + + if !slices.Equal(sourceTable.Columns, targetTable.Columns) { + return fmt.Errorf("table '%s' exists on target %d (%s) but schema does not match source; column mismatch detected. Please ensure the table schema matches exactly or drop the table", + tableName, targetIndex, target.Config.DBName) + } + + logger.Info("validated existing target table", + "table", tableName, + "target", targetIndex, + "database", target.Config.DBName, + "columns", len(targetTable.Columns)) + + return nil +} diff --git a/pkg/move/check/target_state_test.go b/pkg/move/check/target_state_test.go new file mode 100644 index 00000000..83d139b4 --- /dev/null +++ b/pkg/move/check/target_state_test.go @@ -0,0 +1,137 @@ +package check + +import ( + "database/sql" + "log/slog" + "testing" + + "github.com/block/spirit/pkg/applier" + "github.com/block/spirit/pkg/table" + "github.com/block/spirit/pkg/testutils" + "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTargetStateCheck(t *testing.T) { + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } + + // Setup source and target databases + testutils.RunSQL(t, `DROP DATABASE IF EXISTS state_src`) + testutils.RunSQL(t, `CREATE DATABASE state_src`) + testutils.RunSQL(t, `DROP DATABASE IF EXISTS state_tgt`) + testutils.RunSQL(t, `CREATE DATABASE state_tgt`) + + sourceDB, err := sql.Open("mysql", testutils.DSNForDatabase("state_src")) + assert.NoError(t, err) + defer sourceDB.Close() + + targetDB, err := sql.Open("mysql", testutils.DSNForDatabase("state_tgt")) + assert.NoError(t, err) + defer targetDB.Close() + + // Create a test table on source + _, err = sourceDB.ExecContext(t.Context(), "CREATE TABLE state_src.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + + // Get source table info + sourceTable := table.NewTableInfo(sourceDB, "state_src", "test_table") + err = sourceTable.SetInfo(t.Context()) + require.NoError(t, err) + + targetConfig := &mysql.Config{ + DBName: "state_tgt", + } + + // Test 1: Empty target should pass + t.Run("empty target passes", func(t *testing.T) { + r := Resources{ + SourceDB: sourceDB, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := targetStateCheck(t.Context(), r, slog.Default()) + assert.NoError(t, err) + }) + + // Test 2: Target with matching empty table should pass + t.Run("matching empty table passes", func(t *testing.T) { + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE state_tgt.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + defer func() { + _, _ = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS state_tgt.test_table") + }() + + r := Resources{ + SourceDB: sourceDB, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := targetStateCheck(t.Context(), r, slog.Default()) + assert.NoError(t, err) + }) + + // Test 3: Target with non-empty table should fail + t.Run("non-empty table fails", func(t *testing.T) { + _, err = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS state_tgt.test_table") + require.NoError(t, err) + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE state_tgt.test_table (id int not null primary key auto_increment, name VARCHAR(100))") + require.NoError(t, err) + _, err = targetDB.ExecContext(t.Context(), "INSERT INTO state_tgt.test_table (name) VALUES ('test')") + require.NoError(t, err) + defer func() { + _, _ = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS state_tgt.test_table") + }() + + r := Resources{ + SourceDB: sourceDB, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := targetStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "is not empty") + }) + + // Test 4: Target with mismatched schema should fail + t.Run("mismatched schema fails", func(t *testing.T) { + _, err = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS state_tgt.test_table") + require.NoError(t, err) + _, err = targetDB.ExecContext(t.Context(), "CREATE TABLE state_tgt.test_table (id INT PRIMARY KEY, different_col VARCHAR(100))") + require.NoError(t, err) + defer func() { + _, _ = targetDB.ExecContext(t.Context(), "DROP TABLE IF EXISTS state_tgt.test_table") + }() + + r := Resources{ + SourceDB: sourceDB, + Targets: []applier.Target{ + { + DB: targetDB, + Config: targetConfig, + }, + }, + SourceTables: []*table.TableInfo{sourceTable}, + } + err := targetStateCheck(t.Context(), r, slog.Default()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "schema does not match") + }) +} diff --git a/pkg/move/move_test.go b/pkg/move/move_test.go index 36187f36..2e9d5dc9 100644 --- a/pkg/move/move_test.go +++ b/pkg/move/move_test.go @@ -25,7 +25,9 @@ func TestMain(m *testing.M) { } func TestBasicMove(t *testing.T) { - settingsCheck(t) + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) @@ -67,7 +69,9 @@ func TestResumeFromCheckpointE2E(t *testing.T) { } func testResumeFromCheckpointE2E(t *testing.T, deferSecondaryIndexes bool) { - settingsCheck(t) + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) @@ -177,7 +181,9 @@ func testResumeFromCheckpointE2E(t *testing.T, deferSecondaryIndexes bool) { // TestEmptyDatabaseMove tests that a move operation succeeds when the source database has no tables. // This is a valid scenario for shard splits where an empty shard needs to be split. func TestEmptyDatabaseMove(t *testing.T) { - settingsCheck(t) + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) @@ -227,28 +233,3 @@ func TestEmptyDatabaseMove(t *testing.T) { // Clean up runner.Close() } - -// settingsCheck checks that the database settings are appropriate for running moves. -// Move is not supported unless there is full binlog images, etc. but in CI -// we have some tests and features that do not require this. -func settingsCheck(t *testing.T) { - cfg, err := mysql.ParseDSN(testutils.DSN()) - assert.NoError(t, err) - - db, err := sql.Open("mysql", cfg.FormatDSN()) - assert.NoError(t, err) - defer db.Close() - - var binlogRowImage, binlogRowValueOptions string - err = db.QueryRowContext(t.Context(), - `SELECT - @@global.binlog_row_image, - @@global.binlog_row_value_options`).Scan( - &binlogRowImage, - &binlogRowValueOptions, - ) - assert.NoError(t, err) - if binlogRowImage != "FULL" || binlogRowValueOptions != "" { - t.Skip("Skipping test because binlog_row_image is not FULL or binlog_row_value_options is not empty") - } -} diff --git a/pkg/move/runner.go b/pkg/move/runner.go index 9f0f47e2..9f79eb15 100644 --- a/pkg/move/runner.go +++ b/pkg/move/runner.go @@ -14,6 +14,7 @@ import ( "github.com/block/spirit/pkg/copier" "github.com/block/spirit/pkg/dbconn" "github.com/block/spirit/pkg/metrics" + "github.com/block/spirit/pkg/move/check" "github.com/block/spirit/pkg/repl" "github.com/block/spirit/pkg/statement" "github.com/block/spirit/pkg/status" @@ -153,97 +154,6 @@ func (r *Runner) getTables(ctx context.Context, db *sql.DB) ([]*table.TableInfo, return tables, rows.Err() } -// checkTargetEmpty checks that the target database is ready for the move operation. -// If SourceTables is specified, it checks that those specific tables don't exist in the target, -// OR if they do exist, they must be empty (zero rows) and have matching schema to the source. -// If SourceTables is not specified, it checks that the target database is completely empty. -func (r *Runner) checkTargetEmpty(ctx context.Context) error { - for i, target := range r.targets { - rows, err := target.DB.QueryContext(ctx, "SHOW TABLES") - if err != nil { - return fmt.Errorf("failed to check target %d: %w", i, err) - } - defer rows.Close() - var tableName string - var existingTables []string - for rows.Next() { - if err := rows.Scan(&tableName); err != nil { - return fmt.Errorf("failed to scan table name on target %d: %w", i, err) - } - if r.sourceTableMap != nil && !r.sourceTableMap[tableName] { - // We are only copying specific tables, but this table - // found is not one of them, so we can ignore and continue. - continue - } - existingTables = append(existingTables, tableName) - } - if err := rows.Err(); err != nil { - return err - } - - // If there are existing tables, validate they are empty and have matching schema - if len(existingTables) > 0 { - for _, tableName := range existingTables { - if err := r.validateExistingTargetTable(ctx, target, tableName, i); err != nil { - return err - } - } - } - } - return nil -} - -// validateExistingTargetTable checks that an existing table in the target database -// is empty (zero rows) and has a schema that matches the source table. -// This allows move-tables to work with pre-created tables, which is necessary -// for declarative schema management workflows where tables must be tracked -// in the migrations directory before data is moved. -func (r *Runner) validateExistingTargetTable(ctx context.Context, target applier.Target, tableName string, targetIndex int) error { - // Check 1: Table must have zero rows - var rowCount int64 - err := target.DB.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`", target.Config.DBName, tableName)).Scan(&rowCount) - if err != nil { - return fmt.Errorf("failed to check row count for table '%s' on target %d: %w", tableName, targetIndex, err) - } - if rowCount > 0 { - return fmt.Errorf("table '%s' already exists on target %d (%s) and contains %d rows; move-tables requires target tables to be empty to prevent data loss. Please drop the table or use a different target", - tableName, targetIndex, target.Config.DBName, rowCount) - } - - // Check 2: Schema must match source exactly - // Find the source table info - var sourceTable *table.TableInfo - for _, src := range r.sourceTables { - if src.TableName == tableName { - sourceTable = src - break - } - } - if sourceTable == nil { - return nil // exists on target only is fine. - } - - // Get target table info and compare columns - targetTable := table.NewTableInfo(target.DB, target.Config.DBName, tableName) - if err := targetTable.SetInfo(ctx); err != nil { - return fmt.Errorf("failed to get table info for target %d table '%s': %w", targetIndex, tableName, err) - } - - if !slices.Equal(sourceTable.Columns, targetTable.Columns) { - return fmt.Errorf("table '%s' exists on target %d (%s) but schema does not match source; column mismatch detected. Please ensure the table schema matches exactly or drop the table", - tableName, targetIndex, target.Config.DBName) - } - - r.logger.Info("validated existing target table", - "table", tableName, - "target", targetIndex, - "database", target.Config.DBName, - "rows", rowCount, - "columns", len(targetTable.Columns)) - - return nil -} - // createTargetTables creates tables on all targets. // If DeferSecondaryIndexes is enabled, tables are created without secondary indexes. // Secondary indexes will be added later by restoreSecondaryIndexes() before cutover. @@ -394,6 +304,13 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { func (r *Runner) setup(ctx context.Context) error { var err error + + // Run preflight checks on the source database + r.logger.Info("Running preflight checks") + if err := r.runChecks(ctx, check.ScopePreflight); err != nil { + return err + } + // Fetch a list of tables from the source. r.logger.Info("Fetching source table list") if r.sourceTables, err = r.getTables(ctx, r.source); err != nil { @@ -424,25 +341,22 @@ func (r *Runner) setup(ctx context.Context) error { DBConfig: r.dbConfig, }) - r.logger.Info("Checking target database state") - - err = r.checkTargetEmpty(ctx) - if err != nil { - // checkTargetEmpty returns an error if: - // 1. Tables exist that don't match our criteria (non-empty or schema mismatch) - // 2. There are tables we can't validate - // In either case, try to resume from checkpoint as a fallback. - // The checkpoint is on the source (not target) because reshards are 1:N and the - // source is always guaranteed to be singular. + // Run post-setup checks + if err = r.runChecks(ctx, check.ScopePostSetup); err != nil { + // The checks returned an error, which could just mean that tables exist on the target. + // So we can switch tactics and check if these artifacts pass the tests + // to resume from checkpoint instead. + if resumeErr := r.runChecks(ctx, check.ScopeResume); resumeErr != nil { + return fmt.Errorf("target state is invalid for both new copy and resume: new_copy_error=%v, resume_error=%v", err, resumeErr) + } + // We pass the pre-check for resume, so attempt it if err := r.resumeFromCheckpoint(ctx); err != nil { - return fmt.Errorf("target database validation failed and could not resume from checkpoint: %v", err) + return fmt.Errorf("resume validation passed but checkpoint resume failed: %v", err) } - r.logger.Info("Resumed move from existing checkpoint") + r.logger.Info("Successfully resumed move from existing checkpoint") return nil } - - // Target is ready: it is either empty or has valid - // pre-existing empty tables with matching schema + // The post-setup checks returned no errors so we can proceed with new copy return r.newCopy(ctx) } @@ -755,6 +669,19 @@ func (r *Runner) SetLogger(logger *slog.Logger) { r.logger = logger } +// runChecks wraps around check.RunChecks and adds the context of this move operation +func (r *Runner) runChecks(ctx context.Context, scope check.ScopeFlag) error { + return check.RunChecks(ctx, check.Resources{ + SourceDB: r.source, + SourceConfig: r.sourceConfig, + Targets: r.targets, + SourceTables: r.sourceTables, + CreateSentinel: r.move.CreateSentinel, + SourceDSN: r.move.SourceDSN, + TargetDSN: r.move.TargetDSN, + }, r.logger, scope) +} + // restoreSecondaryIndexes restores any secondary indexes that were deferred during table creation. // This function is always called (regardless of DeferSecondaryIndexes flag) to handle // checkpoint resume scenarios. It uses statement.GetMissingSecondaryIndexes to compare source and target diff --git a/pkg/move/runner_test.go b/pkg/move/runner_test.go index 44be2948..6c762501 100644 --- a/pkg/move/runner_test.go +++ b/pkg/move/runner_test.go @@ -18,12 +18,14 @@ import ( // writes, exercising both deferred and non-deferred secondary indexes and // reproducing the "not all changes flushed" error seen during cutover. func TestMoveWithConcurrentWrites(t *testing.T) { + if testutils.IsMinimalRBRTestRunner(t) { + t.Skip("Skipping test for minimal RBR test runner") + } testMoveWithConcurrentWrites(t, false) testMoveWithConcurrentWrites(t, true) } func testMoveWithConcurrentWrites(t *testing.T, deferSecondaryIndexes bool) { - settingsCheck(t) cfg, err := mysql.ParseDSN(testutils.DSN()) assert.NoError(t, err) diff --git a/pkg/testutils/testing.go b/pkg/testutils/testing.go index 7e16af0d..6cc87d9b 100644 --- a/pkg/testutils/testing.go +++ b/pkg/testutils/testing.go @@ -8,9 +8,12 @@ import ( "fmt" "os" "strings" + "sync" "testing" + "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func DSN() string { @@ -88,3 +91,33 @@ func RunSQL(t *testing.T, stmt string) { _, err = db.ExecContext(t.Context(), stmt) assert.NoError(t, err) } + +var ( + isRBRTestRunnerCached bool + isRBRTestRunnerOnce sync.Once +) + +func IsMinimalRBRTestRunner(t *testing.T) bool { + // Check if we are in the minimal RBR test runner. + // we use this to skip certain tests. + isRBRTestRunnerOnce.Do(func() { + cfg, err := mysql.ParseDSN(DSN()) + require.NoError(t, err) + db, err := sql.Open("mysql", cfg.FormatDSN()) + require.NoError(t, err) + defer db.Close() + var binlogRowImage, binlogRowValueOptions string + err = db.QueryRowContext(t.Context(), + `SELECT + @@global.binlog_row_image, + @@global.binlog_row_value_options`).Scan( + &binlogRowImage, + &binlogRowValueOptions, + ) + require.NoError(t, err) + if binlogRowImage != "FULL" || binlogRowValueOptions != "" { + isRBRTestRunnerCached = true + } + }) + return isRBRTestRunnerCached +}