diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index dd9b2aafa..73663db6f 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -98,6 +98,7 @@ type config struct { uid string dataDir string + walDir string debug bool pgListenAddress string pgAdvertiseAddress string @@ -126,6 +127,7 @@ func init() { CmdKeeper.PersistentFlags().StringVar(&cfg.uid, "id", "", "keeper uid (must be unique in the cluster and can contain only lower-case letters, numbers and the underscore character). If not provided a random uid will be generated.") CmdKeeper.PersistentFlags().StringVar(&cfg.uid, "uid", "", "keeper uid (must be unique in the cluster and can contain only lower-case letters, numbers and the underscore character). If not provided a random uid will be generated.") CmdKeeper.PersistentFlags().StringVar(&cfg.dataDir, "data-dir", "", "data directory") + CmdKeeper.PersistentFlags().StringVar(&cfg.walDir, "wal-dir", "", "wal directory") CmdKeeper.PersistentFlags().StringVar(&cfg.pgListenAddress, "pg-listen-address", "", "postgresql instance listening address, local address used for the postgres instance. For all network interface, you can set the value to '*'.") CmdKeeper.PersistentFlags().StringVar(&cfg.pgAdvertiseAddress, "pg-advertise-address", "", "postgresql instance address from outside. Use it to expose ip different than local ip with a NAT networking config") CmdKeeper.PersistentFlags().StringVar(&cfg.pgPort, "pg-port", "5432", "postgresql instance listening port") @@ -471,6 +473,7 @@ type PostgresKeeper struct { bootUUID string dataDir string + walDir string pgListenAddress string pgAdvertiseAddress string pgPort string @@ -522,6 +525,7 @@ func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { bootUUID: common.UUID(), dataDir: dataDir, + walDir: cfg.walDir, pgListenAddress: cfg.pgListenAddress, pgAdvertiseAddress: cfg.pgAdvertiseAddress, @@ -823,7 +827,7 @@ func (p *PostgresKeeper) Start(ctx context.Context) { // TODO(sgotti) reconfigure the various configurations options // (RequestTimeout) after a changed cluster config - pgm := pg.NewManager(p.pgBinPath, p.dataDir, p.getLocalConnParams(), p.getLocalReplConnParams(), p.pgSUAuthMethod, p.pgSUUsername, p.pgSUPassword, p.pgReplAuthMethod, p.pgReplUsername, p.pgReplPassword, p.requestTimeout) + pgm := pg.NewManager(p.pgBinPath, p.dataDir, p.walDir, p.getLocalConnParams(), p.getLocalReplConnParams(), p.pgSUAuthMethod, p.pgSUUsername, p.pgSUPassword, p.pgReplAuthMethod, p.pgReplUsername, p.pgReplPassword, p.requestTimeout) p.pgm = pgm _ = p.pgm.StopIfStarted(true) @@ -916,7 +920,7 @@ func (p *PostgresKeeper) resync(db, masterDB, followedDB *cluster.DB, tryPgrewin replSlot = common.StolonName(db.UID) } - if err := pgm.RemoveAll(); err != nil { + if err := pgm.RemoveAllIfInitialized(); err != nil { return fmt.Errorf("failed to remove the postgres data dir: %v", err) } if slog.IsDebug() { @@ -1115,7 +1119,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { } // Clean up cluster db datadir - if err = pgm.RemoveAll(); err != nil { + if err = pgm.RemoveAllIfInitialized(); err != nil { log.Errorw("failed to remove the postgres data dir", zap.Error(err)) return } @@ -1174,7 +1178,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to stop pg instance", zap.Error(err)) return } - if err = pgm.RemoveAll(); err != nil { + if err = pgm.RemoveAllIfInitialized(); err != nil { log.Errorw("failed to remove the postgres data dir", zap.Error(err)) return } @@ -1236,7 +1240,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Errorw("failed to stop pg instance", zap.Error(err)) return } - if err = pgm.RemoveAll(); err != nil { + if err = pgm.RemoveAllIfInitialized(); err != nil { log.Errorw("failed to remove the postgres data dir", zap.Error(err)) return } diff --git a/doc/commands/stolon-keeper.md b/doc/commands/stolon-keeper.md index a18311910..c46a47404 100644 --- a/doc/commands/stolon-keeper.md +++ b/doc/commands/stolon-keeper.md @@ -45,6 +45,7 @@ stolon-keeper [flags] --store-skip-tls-verify skip store certificate verification (insecure!!!) --store-timeout duration store request timeout (default 5s) --uid string keeper uid (must be unique in the cluster and can contain only lower-case letters, numbers and the underscore character). If not provided a random uid will be generated. + --wal-dir string wal directory ``` ###### Auto generated by spf13/cobra on 24-Feb-2021 diff --git a/doc/pitr.md b/doc/pitr.md index b974ebcfa..1be8782bf 100644 --- a/doc/pitr.md +++ b/doc/pitr.md @@ -39,7 +39,7 @@ Note: the `\"` is needed by json to put double quotes inside strings. We aren't When initializing a cluster in pitr init mode a random registered keeper will be choosed and it'll start restoring the database with these steps: * Remove the current data directory -* Call the `dataRestoreCommand` expanding every %d to the data directory full path. If it exits with a non zero exit code then stop here since something went wrong. +* Call the `dataRestoreCommand` expanding every %d to the data directory full path and every %w to the wal directory full path (if wal directory is provided to the keeper). If it exits with a non zero exit code then stop here since something went wrong. * Create a `recovery.conf` with the right parameters and with `restore_command` set to `restoreCommand`. * Start the postgres instance and wait for the archive recovery. diff --git a/internal/postgresql/postgresql.go b/internal/postgresql/postgresql.go index 00c14bcd7..b37516848 100644 --- a/internal/postgresql/postgresql.go +++ b/internal/postgresql/postgresql.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "io/fs" "io/ioutil" "os" "os/exec" @@ -66,6 +67,7 @@ type PGManager interface { type Manager struct { pgBinPath string dataDir string + walDir string parameters common.Parameters recoveryOptions *RecoveryOptions hba []string @@ -133,10 +135,11 @@ func SetLogger(l *zap.SugaredLogger) { log = l } -func NewManager(pgBinPath string, dataDir string, localConnParams, replConnParams ConnParams, suAuthMethod, suUsername, suPassword, replAuthMethod, replUsername, replPassword string, requestTimeout time.Duration) *Manager { +func NewManager(pgBinPath string, dataDir, walDir string, localConnParams, replConnParams ConnParams, suAuthMethod, suUsername, suPassword, replAuthMethod, replUsername, replPassword string, requestTimeout time.Duration) *Manager { return &Manager{ pgBinPath: pgBinPath, dataDir: filepath.Join(dataDir, "postgres"), + walDir: walDir, parameters: make(common.Parameters), recoveryOptions: NewRecoveryOptions(), curParameters: make(common.Parameters), @@ -222,6 +225,13 @@ func (p *Manager) Init(initConfig *InitConfig) error { } log.Debugw("execing cmd", "cmd", cmd) + // initdb supports configuring a separate wal directory via symlinks. Normally this + // parameter might be part of the initConfig, but it will also be required whenever we + // fall-back to a pg_basebackup during a re-sync, which is why it's a Manager field. + if p.walDir != "" { + cmd.Args = append(cmd.Args, "--waldir", p.walDir) + } + if initConfig.Locale != "" { cmd.Args = append(cmd.Args, "--locale", initConfig.Locale) } @@ -240,7 +250,9 @@ func (p *Manager) Init(initConfig *InitConfig) error { } // remove the dataDir, so we don't end with an half initialized database if err != nil { - os.RemoveAll(p.dataDir) + if cleanupErr := p.RemoveAll(); cleanupErr != nil { + log.Errorf("failed to cleanup database: %v", cleanupErr) + } return err } return nil @@ -250,7 +262,7 @@ func (p *Manager) Restore(command string) error { var err error var cmd *exec.Cmd - command = expand(command, p.dataDir) + command = expandRecoveryCommand(command, p.dataDir, p.walDir) if err = os.MkdirAll(p.dataDir, 0700); err != nil { err = fmt.Errorf("cannot create data dir: %v", err) @@ -269,7 +281,9 @@ func (p *Manager) Restore(command string) error { // On every error remove the dataDir, so we don't end with an half initialized database out: if err != nil { - os.RemoveAll(p.dataDir) + if cleanupErr := p.RemoveAll(); cleanupErr != nil { + log.Errorf("failed to cleanup database: %v", cleanupErr) + } return err } return nil @@ -286,10 +300,84 @@ func (p *Manager) StartTmpMerged() error { return p.start("-c", fmt.Sprintf("config_file=%s", tmpPostgresConfPath)) } +func (p *Manager) moveWal() (err error) { + var curPath string + var desiredPath string + var tmpPath string + symlinkPath := filepath.Join(p.dataDir, "pg_wal") + if curPath, err = filepath.EvalSymlinks(symlinkPath); err != nil { + log.Errorf("could not evaluate symlink %s: %e", symlinkPath, err) + return err + } + if p.walDir == "" { + desiredPath = symlinkPath + tmpPath = filepath.Join(p.dataDir, "pg_wal_new") + } else { + desiredPath = p.walDir + tmpPath = p.walDir + } + if curPath == desiredPath { + return nil + } + if p.walDir == "" { + log.Infof("moving WAL from %s to %s first and then to %s", curPath, tmpPath, desiredPath) + } else { + log.Infof("moving WAL from %s to new location %s", curPath, desiredPath) + } + // We use tmpPath here first and (if needed) mv tmpPath to desiredPath when all is copied. + // This allows stolon-keeper to re-read symlink dest and continue should stolon-keeper be restarted while copying. + if err = moveDirRecursive(curPath, tmpPath); err != nil { + return err + } + + var symlinkStat fs.FileInfo + if symlinkStat, err = os.Lstat(symlinkPath); errors.Is(err, os.ErrNotExist) { + // File or folder already removed + } else if err != nil { + log.Errorf("could not get info on current pg_wal folder/symlink %s: %e", symlinkPath, err) + return err + } else if symlinkStat.Mode()&os.ModeSymlink != 0 { + if err = os.Remove(symlinkPath); err != nil { + log.Errorf("could not remove current pg_wal symlink %s: %e", symlinkPath, err) + return err + } + } else if symlinkStat.IsDir() { + if err = syscall.Rmdir(symlinkPath); err != nil { + log.Errorf("could not remove current folder %s: %e", symlinkPath, err) + return err + } + } else { + err = fmt.Errorf("location %s is no symlink and no dir, so please check and resolve by hand", symlinkPath) + log.Error(err) + return err + } + if p.walDir == "" { + // So we were moving WAL files back into PGDATA. Let's rename the tmpDir now holding all WAL files and use that + // as PGDATA/pg_wal + if err = os.Rename(tmpPath, desiredPath); err != nil { + log.Errorf("cannot move %s to %s: %e", tmpPath, desiredPath, err) + return err + } + } else { + log.Infof("symlinking %s to %s", symlinkPath, desiredPath) + if err = os.Symlink(desiredPath, symlinkPath); err != nil { + // We were copying WAL files from PGDATA (or another location) to a location outside of PGDATA and + // pointing the symlink in the right direction failed. + log.Errorf("could not create symlink %s to %s: %e", symlinkPath, desiredPath, err) + return err + } + } + log.Infof("moving pg_wal from %s to %s is succesful", curPath, desiredPath) + return nil +} + func (p *Manager) Start() error { if err := p.writeConfs(false); err != nil { return err } + if err := p.moveWal(); err != nil { + return err + } return p.start() } @@ -967,6 +1055,9 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams, replSlot strin if replSlot != "" { args = append(args, "--slot", replSlot) } + if p.walDir != "" { + args = append(args, "--waldir", p.walDir) + } cmd := exec.Command(name, args...) cmd.Env = append(os.Environ(), fmt.Sprintf("PGPASSFILE=%s", pgpass.Name())) @@ -1000,7 +1091,7 @@ func (p *Manager) SyncFromFollowed(followedConnParams ConnParams, replSlot strin return nil } -func (p *Manager) RemoveAll() error { +func (p *Manager) RemoveAllIfInitialized() error { initialized, err := p.IsInitialized() if err != nil { return fmt.Errorf("failed to retrieve instance state: %v", err) @@ -1016,6 +1107,17 @@ func (p *Manager) RemoveAll() error { if started { return fmt.Errorf("cannot remove postregsql database. Instance is active") } + + return p.RemoveAll() +} + +// RemoveAll entirely cleans up the data directory, including any wal directory if that +// exists outside of the data directory. +func (p *Manager) RemoveAll() error { + if p.walDir != "" { + os.RemoveAll(p.walDir) + } + return os.RemoveAll(p.dataDir) } diff --git a/internal/postgresql/utils.go b/internal/postgresql/utils.go index 4ca50cd59..1d22ddbe2 100644 --- a/internal/postgresql/utils.go +++ b/internal/postgresql/utils.go @@ -18,10 +18,16 @@ import ( "bufio" "context" "database/sql" + "errors" "fmt" + "io" + "io/fs" + "io/ioutil" + "path/filepath" "regexp" "strconv" "strings" + "syscall" "github.com/sorintlab/stolon/internal/common" @@ -362,27 +368,20 @@ func fileExists(path string) (bool, error) { return true, nil } -func expand(s, dataDir string) string { - buf := make([]byte, 0, 2*len(s)) - // %d %% are all ASCII, so bytes are fine for this operation. - i := 0 - for j := 0; j < len(s); j++ { - if s[j] == '%' && j+1 < len(s) { - switch s[j+1] { - case 'd': - buf = append(buf, s[i:j]...) - buf = append(buf, []byte(dataDir)...) - j += 1 - i = j + 1 - case '%': - j += 1 - buf = append(buf, s[i:j]...) - i = j + 1 - default: - } +// expandRecoveryCommand substitues the data and wal directories into a point-in-time +// recovery command string. Any %d become the data directory, any %w become the wal +// directory and any literal % characters are escaped by themselves (%% -> %). +func expandRecoveryCommand(cmd, dataDir, walDir string) string { + return regexp.MustCompile(`%[dw%]`).ReplaceAllStringFunc(cmd, func(match string) string { + switch match[1] { + case 'd': + return dataDir + case 'w': + return walDir } - } - return string(buf) + s[i:] + + return "%" + }) } func getConfigFilePGParameters(ctx context.Context, connParams ConnParams) (common.Parameters, error) { @@ -563,3 +562,78 @@ func WalFileNameNoTimeLine(name string) (string, error) { } return name[8:24], nil } + +func moveFile(sourcePath, destPath string) error { + // using os.Rename is faster when on same filesystem + if err := os.Rename(sourcePath, destPath); err == nil { + return nil + } + // Error. Let's try to write + inputFile, err := os.Open(sourcePath) + if err != nil { + return fmt.Errorf("Couldn't open source file: %s", err) + } + inFileStat, err := inputFile.Stat() + if err != nil { + return err + } + flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + perm := inFileStat.Mode() & os.ModePerm + outputFile, err := os.OpenFile(destPath, flag, perm) + if err != nil { + return err + } + defer outputFile.Close() + _, err = io.Copy(outputFile, inputFile) + inputFile.Close() + if err != nil { + return fmt.Errorf("Writing to output file failed: %s", err) + } + // The copy was successful, so now delete the original file + err = os.Remove(sourcePath) + if err != nil { + return fmt.Errorf("Failed removing original file: %s", err) + } + return nil +} + +func moveDirRecursive(src string, dest string) error { + log.Infof("Moving %s to %s", src, dest) + if stat, err := os.Stat(src); err != nil { + log.Errorf("could not get stat of %s: %e", src, err) + return err + } else if stat.IsDir() { + // Make the dir if it doesn't exist + if _, err := os.Stat(dest); errors.Is(err, os.ErrNotExist) { + if err := os.MkdirAll(dest, stat.Mode()&os.ModePerm); err != nil { + return err + } + } else if err != nil { + log.Errorf("could not get stat of %s: %e", dest, err) + return err + } + // Copy all files and folders in this folder + var entries []fs.FileInfo + if entries, err = ioutil.ReadDir(src); err != nil { + log.Errorf("could not read contents of folder %s: %e", src, err) + return err + } else { + for _, entry := range entries { + srcEntry := filepath.Join(src, entry.Name()) + dstEntry := filepath.Join(dest, entry.Name()) + if err := moveDirRecursive(srcEntry, dstEntry); err != nil { + return err + } + } + } + // Remove this folder, which is now supposedly empty + if err := syscall.Rmdir(src); err != nil { + log.Errorf("could not remove folder %s: %e", src, err) + // If this is a mountpoint or you don't have enough permissions, you might nog be able to. But that is fine. + //return err + } + } else { + return moveFile(src, dest) + } + return nil +} diff --git a/internal/postgresql/utils_test.go b/internal/postgresql/utils_test.go index 0f7eedb31..8e2f3615d 100644 --- a/internal/postgresql/utils_test.go +++ b/internal/postgresql/utils_test.go @@ -89,7 +89,7 @@ func TestValidReplSlotName(t *testing.T) { } } -func TestExpand(t *testing.T) { +func TestExpandRecoveryCommand(t *testing.T) { tests := []struct { in string out string @@ -106,6 +106,10 @@ func TestExpand(t *testing.T) { in: "%d", out: "/datadir", }, + { + in: "%w", + out: "/waldir", + }, { in: "%%d", out: "%d", @@ -121,7 +125,7 @@ func TestExpand(t *testing.T) { } for i, tt := range tests { - out := expand(tt.in, "/datadir") + out := expandRecoveryCommand(tt.in, "/datadir", "/waldir") if out != tt.out { t.Errorf("#%d: wrong expanded string: got: %s, want: %s", i, out, tt.out) } diff --git a/internal/timer/timer_fallback.go b/internal/timer/timer_fallback.go index 510072618..b3d4edcc6 100644 --- a/internal/timer/timer_fallback.go +++ b/internal/timer/timer_fallback.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !linux // +build !linux package timer diff --git a/internal/timer/timer_linux.go b/internal/timer/timer_linux.go index 3f6e9c3f9..3d75d9835 100644 --- a/internal/timer/timer_linux.go +++ b/internal/timer/timer_linux.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build linux // +build linux package timer diff --git a/tests/integration/config_test.go b/tests/integration/config_test.go index a6631c838..e95e5f96f 100644 --- a/tests/integration/config_test.go +++ b/tests/integration/config_test.go @@ -845,3 +845,79 @@ func TestAdvertise(t *testing.T) { } } } + +func TestKeeperBootsWithWalDir(t *testing.T) { + t.Parallel() + + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + tstore, err := NewTestStore(t, dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tstore.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tstore.WaitUp(10 * time.Second); err != nil { + t.Fatalf("error waiting on store up: %v", err) + } + storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) + defer tstore.Stop() + + clusterName := uuid.NewV4().String() + + storePath := filepath.Join(common.StorePrefix, clusterName) + + sm := store.NewKVBackedStore(tstore.store, storePath) + automaticPgRestart := true + pgParameters := map[string]string{"max_connections": "100"} + + initialClusterSpec := &cluster.ClusterSpec{ + InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), + AutomaticPgRestart: &automaticPgRestart, + PGParameters: pgParameters, + } + + initialClusterSpecFile, err := writeClusterSpec(dir, initialClusterSpec) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ts, err := NewTestSentinel(t, dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-spec=%s", initialClusterSpecFile)) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := ts.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer ts.Stop() + + waldir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + tk, err := NewTestKeeper(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints, "--wal-dir", waldir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer tk.Stop() + + if err := WaitClusterPhase(sm, cluster.ClusterPhaseNormal, 60*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk.WaitDBUp(60 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Test that the Keeper is accepting queries + if _, err := tk.Exec("select now()"); err != nil { + t.Fatalf("unexpected err: %v", err) + } +} diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index c8f72ae47..28b116c8b 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -2143,3 +2143,180 @@ func TestSyncStandbyNotInSync(t *testing.T) { func TestSyncStandbyNotInSync0(t *testing.T) { testSyncStandbyNotInSync(t, true) } + +func TestFailoverWithCustomWalDir(t *testing.T) { + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + // Set up store + tstore := setupStore(t, dir) + storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) + + clusterName := uuid.NewV4().String() + + syncRep := true + usePgRewind := true + initialClusterSpec := &cluster.ClusterSpec{ + InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew), + SleepInterval: &cluster.Duration{Duration: 2 * time.Second}, + FailInterval: &cluster.Duration{Duration: 5 * time.Second}, + ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second}, + MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB + SynchronousReplication: &syncRep, + UsePgrewind: &usePgRewind, + PGParameters: defaultPGParameters, + } + initialClusterSpecFile, err := writeClusterSpec(dir, initialClusterSpec) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Set up sentinel + sentinel, err := NewTestSentinel(t, dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-spec=%s", initialClusterSpecFile)) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := sentinel.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Set up first keeper + keeper1waldir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + tk1, err := NewTestKeeper( + t, + dir, + clusterName, + pgSUUsername, + pgSUPassword, + pgReplUsername, + pgReplPassword, + tstore.storeBackend, + storeEndpoints, + "--wal-dir", + keeper1waldir, + ) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk1.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Set up second keeper + keeper2waldir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + tk2, err := NewTestKeeper( + t, + dir, + clusterName, + pgSUUsername, + pgSUPassword, + pgReplUsername, + pgReplPassword, + tstore.storeBackend, + storeEndpoints, + "--wal-dir", + keeper2waldir, + ) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk2.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + storePath := filepath.Join(common.StorePrefix, clusterName) + store := store.NewKVBackedStore(tstore.store, storePath) + + // Wait for keepers to become ready + if err := WaitClusterPhase(store, cluster.ClusterPhaseNormal, 60*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk1.WaitDBUp(60 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tk2.WaitDBUp(60 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + keepers := map[string]*TestKeeper{tk1.uid: tk1, tk2.uid: tk2} + sentinels := map[string]*TestSentinel{sentinel.uid: sentinel} + + // Set up proxy + proxy, err := NewTestProxy(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := proxy.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + defer shutdown(keepers, sentinels, proxy, tstore) + + master, standbys := waitMasterStandbysReady(t, store, keepers) + standby := standbys[0] + + fmt.Printf("master: %s\n", master.uid) + fmt.Printf("standby: %s\n", standby.uid) + + if err := WaitClusterDataSynchronousStandbys([]string{standby.uid}, store, 30*time.Second); err != nil { + t.Fatalf("expected synchronous standby on keeper %q in cluster data", standby.uid) + } + + if err := populate(t, master); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := write(t, master, 1, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // get the master XLogPos + xLogPos, err := GetXLogPos(master) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // wait for the keepers to have reported their state + if err := WaitClusterSyncedXLogPos([]*TestKeeper{master, standby}, xLogPos, store, 20*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // the proxy should connect to the right master + if err := proxy.WaitRightMaster(master, 3*cluster.DefaultProxyCheckInterval); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Stop the keeper process on master, should also stop the database + t.Logf("Stopping current master keeper: %s", master.uid) + master.Stop() + + // Wait for cluster data containing standby as master + if err := WaitClusterDataMaster(standby.uid, store, 30*time.Second); err != nil { + t.Fatalf("expected master %q in cluster view", standby.uid) + } + if err := standby.WaitDBRole(common.RoleMaster, nil, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + c, err := getLines(t, standby) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if c != 1 { + t.Fatalf("wrong number of lines, want: %d, got: %d", 1, c) + } + + // the proxy should connect to the right master + if err := proxy.WaitRightMaster(standby, 3*cluster.DefaultProxyCheckInterval); err != nil { + t.Fatalf("unexpected err: %v", err) + } +}