Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/migration/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"

"github.com/block/spirit/pkg/check"
"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/migration/check"
"github.com/block/spirit/pkg/statement"
"github.com/block/spirit/pkg/table"
)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"
"time"

"github.com/block/spirit/pkg/check"
"github.com/block/spirit/pkg/migration/check"
"github.com/block/spirit/pkg/statement"
"github.com/block/spirit/pkg/table"
"github.com/go-ini/ini"
Expand Down
39 changes: 4 additions & 35 deletions pkg/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -34,36 +33,6 @@ func mkIniFile(t *testing.T, content string) *os.File {
return tmpFile
}

var (
isRBRTestRunnerCached bool
isRBRTestRunnerOnce sync.Once
)

func isMinimalRBRTestRunner(t *testing.T) bool {
// Check if we are in the minimal RBR test runner.
// we use this to skip certain tests.
isRBRTestRunnerOnce.Do(func() {
cfg, err := mysql.ParseDSN(testutils.DSN())
require.NoError(t, err)
db, err := sql.Open("mysql", cfg.FormatDSN())
require.NoError(t, err)
defer db.Close()
var binlogRowImage, binlogRowValueOptions string
err = db.QueryRowContext(t.Context(),
`SELECT
@@global.binlog_row_image,
@@global.binlog_row_value_options`).Scan(
&binlogRowImage,
&binlogRowValueOptions,
)
require.NoError(t, err)
if binlogRowImage != "FULL" || binlogRowValueOptions != "" {
isRBRTestRunnerCached = true
}
})
return isRBRTestRunnerCached
}

func TestMain(m *testing.M) {
status.CheckpointDumpInterval = 100 * time.Millisecond
status.StatusInterval = 10 * time.Millisecond // the status will be accurate to 1ms
Expand Down Expand Up @@ -278,7 +247,7 @@ func TestGeneratedColumns(t *testing.T) {
testGeneratedColumns(t, false)
})
t.Run("buffered", func(t *testing.T) {
if isMinimalRBRTestRunner(t) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty")
}
testGeneratedColumns(t, true)
Expand Down Expand Up @@ -317,7 +286,7 @@ func TestStoredGeneratedColumns(t *testing.T) {
testStoredGeneratedColumns(t, false)
})
t.Run("buffered", func(t *testing.T) {
if isMinimalRBRTestRunner(t) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty")
}
testStoredGeneratedColumns(t, true)
Expand Down Expand Up @@ -394,7 +363,7 @@ func TestBinaryChecksum(t *testing.T) {
testBinaryChecksum(t, false)
})
t.Run("buffered", func(t *testing.T) {
if isMinimalRBRTestRunner(t) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty")
}
testBinaryChecksum(t, true)
Expand Down Expand Up @@ -448,7 +417,7 @@ func TestConvertCharset(t *testing.T) {
testConvertCharset(t, false)
})
t.Run("buffered", func(t *testing.T) {
if isMinimalRBRTestRunner(t) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping buffered copy test because binlog_row_image is not FULL or binlog_row_value_options is not empty")
}
testConvertCharset(t, true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/go-sql-driver/mysql"

"github.com/block/spirit/pkg/applier"
"github.com/block/spirit/pkg/check"
"github.com/block/spirit/pkg/checksum"
"github.com/block/spirit/pkg/copier"
"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/metrics"
"github.com/block/spirit/pkg/migration/check"
"github.com/block/spirit/pkg/repl"
"github.com/block/spirit/pkg/status"
"github.com/block/spirit/pkg/table"
Expand Down
2 changes: 1 addition & 1 deletion pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"testing"
"time"

"github.com/block/spirit/pkg/check"
"github.com/block/spirit/pkg/copier"
"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/migration/check"
"github.com/block/spirit/pkg/status"
"github.com/block/spirit/pkg/table"
"github.com/block/spirit/pkg/testutils"
Expand Down
71 changes: 71 additions & 0 deletions pkg/move/check/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Package check provides various configuration and health checks
// that can be run for move operations.
package check

import (
"context"
"database/sql"
"log/slog"
"sync"

"github.com/block/spirit/pkg/applier"
"github.com/block/spirit/pkg/table"
"github.com/go-sql-driver/mysql"
)

// ScopeFlag scopes a check
type ScopeFlag uint8

const (
ScopeNone ScopeFlag = iota
ScopePreRun
ScopePreflight
ScopePostSetup
ScopeResume
)

// Resources contains the resources needed for move checks
type Resources struct {
SourceDB *sql.DB
SourceConfig *mysql.Config
Targets []applier.Target
SourceTables []*table.TableInfo
CreateSentinel bool
// For PreRun checks (before DB connections established)
SourceDSN string
TargetDSN string
}

type check struct {
callback func(context.Context, Resources, *slog.Logger) error
scope ScopeFlag
}

var (
checks map[string]check
lock sync.Mutex
)

// registerCheck registers a check (callback func) and a scope (aka time) that it is expected to be run
func registerCheck(name string, callback func(context.Context, Resources, *slog.Logger) error, scope ScopeFlag) {
lock.Lock()
defer lock.Unlock()
if checks == nil {
checks = make(map[string]check)
}
checks[name] = check{callback: callback, scope: scope}
}

// RunChecks runs all checks that are registered for the given scope
func RunChecks(ctx context.Context, r Resources, logger *slog.Logger, scope ScopeFlag) error {
for _, check := range checks {
if check.scope&scope == 0 {
continue
}
err := check.callback(ctx, r, logger)
if err != nil {
return err
}
}
return nil
}
61 changes: 61 additions & 0 deletions pkg/move/check/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package check

import (
"context"
"errors"
"log/slog"
)

func init() {
registerCheck("configuration", configurationCheck, ScopePreflight)
}

// configurationCheck verifies the MySQL configuration on the source database
// is suitable for move operations. Move operations require:
// - ROW binlog format for reading changes
// - Binary logging enabled
// - log_slave_updates enabled (unless we verify no replication)
// - FULL binlog_row_image (for buffered copy which is always used in move)
func configurationCheck(ctx context.Context, r Resources, logger *slog.Logger) error {
var binlogFormat, binlogRowImage, logBin, logSlaveUpdates, binlogRowValueOptions string
err := r.SourceDB.QueryRowContext(ctx,
`SELECT @@global.binlog_format,
@@global.binlog_row_image,
@@global.log_bin,
@@global.log_slave_updates,
@@global.binlog_row_value_options`).Scan(
&binlogFormat,
&binlogRowImage,
&logBin,
&logSlaveUpdates,
&binlogRowValueOptions,
)
if err != nil {
return err
}

if binlogFormat != "ROW" {
return errors.New("binlog_format must be ROW")
}

// Move always uses buffered copy, which requires FULL binlog_row_image
if binlogRowImage != "FULL" {
return errors.New("binlog_row_image must be FULL for move operations (buffered copy requires reading all columns from binlog)")
}

if binlogRowValueOptions != "" {
return errors.New("binlog_row_value_options must be empty for move operations (buffered copy requires reading all columns from binlog)")
}

if logBin != "1" {
return errors.New("log_bin must be enabled")
}

if logSlaveUpdates != "1" {
// This is a hard requirement unless we enhance this to confirm
// it's not receiving any updates via the replication stream.
return errors.New("log_slave_updates must be enabled")
}

return nil
}
28 changes: 28 additions & 0 deletions pkg/move/check/configuration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package check

import (
"context"
"database/sql"
"log/slog"
"testing"

"github.com/block/spirit/pkg/testutils"
"github.com/stretchr/testify/assert"
)

func TestConfigurationCheck(t *testing.T) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping test for minimal RBR test runner")
}

db, err := sql.Open("mysql", testutils.DSN())
assert.NoError(t, err)
defer db.Close()

// Test with valid configuration
r := Resources{
SourceDB: db,
}
err = configurationCheck(context.Background(), r, slog.Default())
assert.NoError(t, err)
}
83 changes: 83 additions & 0 deletions pkg/move/check/resume_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package check

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"slices"

"github.com/block/spirit/pkg/table"
)

func init() {
registerCheck("resume_state", resumeStateCheck, ScopeResume)
}

// resumeStateCheck validates that the state is compatible with resuming from a checkpoint.
// This check runs at ScopeResume and verifies:
// 1. Checkpoint table exists on source
// 2. Target tables exist with matching schema to source
// 3. All source tables have corresponding target tables
//
// This check is the complement to target_state check:
// - target_state validates we can start a NEW copy (empty or matching empty tables)
// - resume_state validates we can RESUME from checkpoint (tables exist with data)
func resumeStateCheck(ctx context.Context, r Resources, logger *slog.Logger) error {
if len(r.SourceTables) == 0 {
return errors.New("no source tables available for resume validation")
}

// Check 1: Verify checkpoint table exists on source
checkpointTableName := "_spirit_checkpoint"
var checkpointExists int
err := r.SourceDB.QueryRowContext(ctx,
"SELECT 1 FROM information_schema.TABLES WHERE table_schema = ? AND table_name = ?",
r.SourceConfig.DBName, checkpointTableName).Scan(&checkpointExists)
if err == sql.ErrNoRows {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anything in the checkpoint table itself need to be checked at this stage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could, but restoring the checkpoint is also able to return an error in pkg/move/runner.go:resumeFromCheckpoint()

My preference is not to because if I change the checkpoint format (it happens) the code is non-dry. This is because the check should only check and is not expected to load or mutate structures.

If I move checkpoints to a separate package, that would be a different argument though; they could call the same helper.

return fmt.Errorf("checkpoint table '%s.%s' does not exist; cannot resume", r.SourceConfig.DBName, checkpointTableName)
}
if err != nil {
return fmt.Errorf("failed to check for checkpoint table: %w", err)
}

logger.Info("checkpoint table exists, validating target tables for resume",
"checkpoint_table", fmt.Sprintf("%s.%s", r.SourceConfig.DBName, checkpointTableName))

// Check 2: Verify all source tables have corresponding target tables with matching schema
for _, sourceTable := range r.SourceTables {
for i, target := range r.Targets {
// Check if table exists on target
var tableExists int
err := target.DB.QueryRowContext(ctx,
"SELECT 1 FROM information_schema.TABLES WHERE table_schema = ? AND table_name = ?",
target.Config.DBName, sourceTable.TableName).Scan(&tableExists)
if err == sql.ErrNoRows {
return fmt.Errorf("table '%s' does not exist on target %d (%s); cannot resume. Target state is incomplete",
sourceTable.TableName, i, target.Config.DBName)
}
if err != nil {
return fmt.Errorf("failed to check for table '%s' on target %d: %w", sourceTable.TableName, i, err)
}

// Verify schema matches
targetTable := table.NewTableInfo(target.DB, target.Config.DBName, sourceTable.TableName)
if err := targetTable.SetInfo(ctx); err != nil {
return fmt.Errorf("failed to get table info for target %d table '%s': %w", i, sourceTable.TableName, err)
}

if !slices.Equal(sourceTable.Columns, targetTable.Columns) {
return fmt.Errorf("table '%s' schema mismatch between source and target %d (%s); cannot resume safely. Schema may have changed since checkpoint was created",
sourceTable.TableName, i, target.Config.DBName)
}

logger.Debug("validated target table for resume",
"table", sourceTable.TableName,
"target", i,
"database", target.Config.DBName,
"columns", len(targetTable.Columns))
}
}
return nil
}
Loading