diff --git a/internal/scripts/e2e_postgres.sh b/internal/scripts/e2e_postgres.sh index 8c94c70..dd7c282 100755 --- a/internal/scripts/e2e_postgres.sh +++ b/internal/scripts/e2e_postgres.sh @@ -59,6 +59,7 @@ start_pg_flo_worker() { --target-dbname "$TARGET_PG_DB" \ --target-user "$TARGET_PG_USER" \ --target-password "$TARGET_PG_PASSWORD" \ + --batch-size 5000 \ --target-sync-schema \ >"$pg_flo_WORKER_LOG" 2>&1 & pg_flo_WORKER_PID=$! diff --git a/internal/scripts/e2e_test_local.sh b/internal/scripts/e2e_test_local.sh index f3ccb82..f73c76d 100755 --- a/internal/scripts/e2e_test_local.sh +++ b/internal/scripts/e2e_test_local.sh @@ -33,8 +33,8 @@ make build setup_docker -log "Running e2e copy & stream tests..." -if CI=false ./internal/scripts/e2e_copy_and_stream.sh; then +log "Running e2e postgres tests..." +if CI=false ./internal/scripts/e2e_postgres.sh; then success "Original e2e tests completed successfully" else error "Original e2e tests failed" diff --git a/pkg/sinks/postgres.go b/pkg/sinks/postgres.go index 0184d4b..b906e3e 100644 --- a/pkg/sinks/postgres.go +++ b/pkg/sinks/postgres.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" "strings" + "time" "github.com/jackc/pgx/v5" "github.com/rs/zerolog" @@ -24,6 +25,8 @@ func init() { type PostgresSink struct { conn *pgx.Conn disableForeignKeyChecks bool + connConfig *pgx.ConnConfig + retryConfig utils.RetryConfig } // NewPostgresSink creates a new PostgresSink instance @@ -33,14 +36,18 @@ func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser return nil, fmt.Errorf("failed to parse connection config: %v", err) } - conn, err := pgx.ConnectConfig(context.Background(), connConfig) - if err != nil { - return nil, fmt.Errorf("failed to connect to target database: %v", err) - } - sink := &PostgresSink{ - conn: conn, + connConfig: connConfig, disableForeignKeyChecks: disableForeignKeyChecks, + retryConfig: utils.RetryConfig{ + MaxAttempts: 5, + InitialWait: 1 * time.Second, + MaxWait: 30 * time.Second, + }, + } + + if err := sink.connect(context.Background()); err != nil { + return nil, err } if syncSchema { @@ -52,6 +59,19 @@ func NewPostgresSink(targetHost string, targetPort int, targetDBName, targetUser return sink, nil } +// New method to handle connection +func (s *PostgresSink) connect(ctx context.Context) error { + return utils.WithRetry(ctx, s.retryConfig, func() error { + conn, err := pgx.ConnectConfig(ctx, s.connConfig) + if err != nil { + log.Error().Err(err).Msg("Failed to connect to database, will retry") + return err + } + s.conn = conn + return nil + }) +} + // syncSchema synchronizes the schema between source and target databases func (s *PostgresSink) syncSchema(sourceHost string, sourcePort int, sourceDBName, sourceUser, sourcePassword string) error { dumpCmd := exec.Command("pg_dump", "--schema-only") @@ -208,58 +228,75 @@ func (s *PostgresSink) enableForeignKeys(ctx context.Context) error { // WriteBatch writes a batch of CDC messages to the target database func (s *PostgresSink) WriteBatch(messages []*utils.CDCMessage) error { ctx := context.Background() - tx, err := s.conn.Begin(ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction: %v", err) - } - defer func() { - if err := tx.Rollback(ctx); err != nil && err != pgx.ErrTxClosed { - log.Error().Err(err).Msg("failed to rollback transaction") + + // Check connection and reconnect if needed + if s.conn == nil || s.conn.IsClosed() { + if err := s.connect(ctx); err != nil { + return fmt.Errorf("failed to reconnect to database: %v", err) } - }() + } - if s.disableForeignKeyChecks { - if err := s.disableForeignKeys(ctx); err != nil { - return fmt.Errorf("failed to disable foreign key checks: %v", err) + return utils.WithRetry(ctx, s.retryConfig, func() error { + tx, err := s.conn.Begin(ctx) + if err != nil { + // Check if connection is closed and try to reconnect + if s.conn.IsClosed() { + if reconnectErr := s.connect(ctx); reconnectErr != nil { + return reconnectErr + } + return err // Retry the operation + } + return fmt.Errorf("failed to begin transaction: %v", err) } defer func() { - if err := s.enableForeignKeys(ctx); err != nil { - log.Error().Err(err).Msg("failed to re-enable foreign key checks") + if err := tx.Rollback(ctx); err != nil && err != pgx.ErrTxClosed { + log.Error().Err(err).Msg("failed to rollback transaction") } }() - } - - for _, message := range messages { - primaryKeyColumn := message.MappedPrimaryKeyColumn - if primaryKeyColumn != "" { - message.PrimaryKeyColumn = message.MappedPrimaryKeyColumn + if s.disableForeignKeyChecks { + if err := s.disableForeignKeys(ctx); err != nil { + return fmt.Errorf("failed to disable foreign key checks: %v", err) + } + defer func() { + if err := s.enableForeignKeys(ctx); err != nil { + log.Error().Err(err).Msg("failed to re-enable foreign key checks") + } + }() } - var err error - switch message.Type { - case "INSERT": - err = s.handleInsert(tx, message) - case "UPDATE": - err = s.handleUpdate(tx, message) - case "DELETE": - err = s.handleDelete(tx, message) - case "DDL": - err = s.handleDDL(tx, message) - default: - return fmt.Errorf("unknown event type: %s", message.Type) - } + for _, message := range messages { - if err != nil { - return fmt.Errorf("failed to handle %s: %v", message.Type, err) + primaryKeyColumn := message.MappedPrimaryKeyColumn + if primaryKeyColumn != "" { + message.PrimaryKeyColumn = message.MappedPrimaryKeyColumn + } + + var err error + switch message.Type { + case "INSERT": + err = s.handleInsert(tx, message) + case "UPDATE": + err = s.handleUpdate(tx, message) + case "DELETE": + err = s.handleDelete(tx, message) + case "DDL": + err = s.handleDDL(tx, message) + default: + return fmt.Errorf("unknown event type: %s", message.Type) + } + + if err != nil { + return fmt.Errorf("failed to handle %s: %v", message.Type, err) + } } - } - if err := tx.Commit(ctx); err != nil { - return fmt.Errorf("failed to commit transaction: %v", err) - } + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %v", err) + } - return nil + return nil + }) } // Close closes the database connection diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go new file mode 100644 index 0000000..69b1fab --- /dev/null +++ b/pkg/utils/retry.go @@ -0,0 +1,38 @@ +package utils + +import ( + "context" + "time" +) + +type RetryConfig struct { + MaxAttempts int + InitialWait time.Duration + MaxWait time.Duration +} + +func WithRetry(ctx context.Context, cfg RetryConfig, operation func() error) error { + wait := cfg.InitialWait + for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ { + err := operation() + if err == nil { + return nil + } + + if attempt == cfg.MaxAttempts { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(wait): + // Exponential backoff with max wait + wait *= 2 + if wait > cfg.MaxWait { + wait = cfg.MaxWait + } + } + } + return nil +}