From fa9969f77181f1f79863d964e2a4e016f3327a9a Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Mon, 1 Jul 2024 04:51:34 -0700 Subject: [PATCH] fixing issue with xtrabackup and long gtids Signed-off-by: Renan Rangel --- go/vt/mysqlctl/xtrabackupengine.go | 35 ++++++++++++++----------- go/vt/mysqlctl/xtrabackupengine_test.go | 35 ++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 3f8491fdfb6..e6d02eedc1d 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -71,6 +71,7 @@ const ( writerBufferSize = 2 * 1024 * 1024 /*2 MiB*/ xtrabackupBinaryName = "xtrabackup" xtrabackupEngineName = "xtrabackup" + xtrabackupInfoFile = "xtrabackup_info" xbstream = "xbstream" ) @@ -292,7 +293,6 @@ func (be *XtrabackupEngine) backupFiles( numStripes int, flavor string, ) (replicationPosition replication.Position, finalErr error) { - backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + params.Cnf.Path, "--backup", @@ -300,6 +300,7 @@ func (be *XtrabackupEngine) backupFiles( "--slave-info", "--user=" + xtrabackupUser, "--target-dir=" + params.Cnf.TmpDir, + "--extra-lsndir=" + params.Cnf.TmpDir, } if xtrabackupStreamMode != "" { flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode) @@ -398,27 +399,14 @@ func (be *XtrabackupEngine) backupFiles( // the replication position. Note that if we don't read stderr as we go, the // xtrabackup process gets blocked when the write buffer fills up. stderrBuilder := &strings.Builder{} - posBuilder := &strings.Builder{} stderrDone := make(chan struct{}) go func() { defer close(stderrDone) scanner := bufio.NewScanner(backupErr) - capture := false for scanner.Scan() { line := scanner.Text() params.Logger.Infof("xtrabackup stderr: %s", line) - - // Wait until we see the first line of the binlog position. - // Then capture all subsequent lines. We need multiple lines since - // the value we're looking for has newlines in it. - if !capture { - if !strings.Contains(line, "MySQL binlog position") { - continue - } - capture = true - } - fmt.Fprintln(posBuilder, line) } if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from xtrabackup stderr: %v", err) @@ -462,8 +450,7 @@ func (be *XtrabackupEngine) backupFiles( return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput)) } - posOutput := posBuilder.String() - replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger) + replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger) if rerr != nil { return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position") } @@ -751,6 +738,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log return nil } +func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (replication.Position, error) { + f, err := os.Open(path.Join(directory, xtrabackupInfoFile)) + if err != nil { + return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, + "couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile)) + } + defer f.Close() + + contents, err := io.ReadAll(f) + if err != nil { + return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name()) + } + + return findReplicationPosition(string(contents), flavor, logger) +} + var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`) func findReplicationPosition(input, flavor string, logger logutil.Logger) (replication.Position, error) { diff --git a/go/vt/mysqlctl/xtrabackupengine_test.go b/go/vt/mysqlctl/xtrabackupengine_test.go index f560833d278..1139c671ca5 100644 --- a/go/vt/mysqlctl/xtrabackupengine_test.go +++ b/go/vt/mysqlctl/xtrabackupengine_test.go @@ -20,6 +20,8 @@ import ( "bytes" "crypto/rand" "io" + "os" + "path" "testing" "github.com/stretchr/testify/assert" @@ -49,22 +51,47 @@ func TestFindReplicationPosition(t *testing.T) { assert.NoError(t, err) assert.Equal(t, want, pos.String()) } +func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) { + input := `tool_version = 8.0.35-30 + binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3, + 1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3, + 47b59de1-b368-11e9-b48b-624401d35560:1-152981, + 557def0a-b368-11e9-84ed-f6fffd91cc57:1-3, + 599ef589-ae55-11e9-9688-ca1f44501925:1-14857169, + b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262' + format = xbstream + ` + want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262" + + tmp, err := os.MkdirTemp(t.TempDir(), "test") + assert.NoError(t, err) + + f, err := os.Create(path.Join(tmp, xtrabackupInfoFile)) + assert.NoError(t, err) + _, err = f.WriteString(input) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger()) + assert.NoError(t, err) + assert.Equal(t, want, pos.String()) +} -func TestFindReplicationPositionNoMatch(t *testing.T) { +func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `nothing` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) assert.Error(t, err) } -func TestFindReplicationPositionEmptyMatch(t *testing.T) { +func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) { // Make sure failure to find a match triggers an error. input := `GTID of the last change ' '` - _, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger()) + _, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger()) assert.Error(t, err) }