Skip to content

Commit

Permalink
fixing issue with xtrabackup and long gtids
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <rrangel@slack-corp.com>
  • Loading branch information
rvrangel committed Jul 16, 2024
1 parent 1c7632f commit fa9969f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 20 deletions.
35 changes: 19 additions & 16 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
writerBufferSize = 2 * 1024 * 1024 /*2 MiB*/
xtrabackupBinaryName = "xtrabackup"
xtrabackupEngineName = "xtrabackup"
xtrabackupInfoFile = "xtrabackup_info"
xbstream = "xbstream"
)

Expand Down Expand Up @@ -292,14 +293,14 @@ func (be *XtrabackupEngine) backupFiles(
numStripes int,
flavor string,
) (replicationPosition replication.Position, finalErr error) {

backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName)
flagsToExec := []string{"--defaults-file=" + params.Cnf.Path,
"--backup",
"--socket=" + params.Cnf.SocketFile,
"--slave-info",
"--user=" + xtrabackupUser,
"--target-dir=" + params.Cnf.TmpDir,
"--extra-lsndir=" + params.Cnf.TmpDir,
}
if xtrabackupStreamMode != "" {
flagsToExec = append(flagsToExec, "--stream="+xtrabackupStreamMode)
Expand Down Expand Up @@ -398,27 +399,14 @@ func (be *XtrabackupEngine) backupFiles(
// the replication position. Note that if we don't read stderr as we go, the
// xtrabackup process gets blocked when the write buffer fills up.
stderrBuilder := &strings.Builder{}
posBuilder := &strings.Builder{}
stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)

scanner := bufio.NewScanner(backupErr)
capture := false
for scanner.Scan() {
line := scanner.Text()
params.Logger.Infof("xtrabackup stderr: %s", line)

// Wait until we see the first line of the binlog position.
// Then capture all subsequent lines. We need multiple lines since
// the value we're looking for has newlines in it.
if !capture {
if !strings.Contains(line, "MySQL binlog position") {
continue
}
capture = true
}
fmt.Fprintln(posBuilder, line)
}
if err := scanner.Err(); err != nil {
params.Logger.Errorf("error reading from xtrabackup stderr: %v", err)
Expand Down Expand Up @@ -462,8 +450,7 @@ func (be *XtrabackupEngine) backupFiles(
return replicationPosition, vterrors.Wrap(err, fmt.Sprintf("xtrabackup failed with error. Output=%s", sterrOutput))
}

posOutput := posBuilder.String()
replicationPosition, rerr := findReplicationPosition(posOutput, flavor, params.Logger)
replicationPosition, rerr := findReplicationPositionFromXtrabackupInfo(params.Cnf.TmpDir, flavor, params.Logger)
if rerr != nil {
return replicationPosition, vterrors.Wrap(rerr, "backup failed trying to find replication position")
}
Expand Down Expand Up @@ -751,6 +738,22 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log
return nil
}

func findReplicationPositionFromXtrabackupInfo(directory, flavor string, logger logutil.Logger) (replication.Position, error) {
f, err := os.Open(path.Join(directory, xtrabackupInfoFile))
if err != nil {
return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT,
"couldn't open %q to read GTID position", path.Join(directory, xtrabackupInfoFile))
}
defer f.Close()

contents, err := io.ReadAll(f)
if err != nil {
return replication.Position{}, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "couldn't read GTID position from %q", f.Name())
}

return findReplicationPosition(string(contents), flavor, logger)
}

var xtrabackupReplicationPositionRegexp = regexp.MustCompile(`GTID of the last change '([^']*)'`)

func findReplicationPosition(input, flavor string, logger logutil.Logger) (replication.Position, error) {
Expand Down
35 changes: 31 additions & 4 deletions go/vt/mysqlctl/xtrabackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"crypto/rand"
"io"
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -49,22 +51,47 @@ func TestFindReplicationPosition(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}
func TestFindReplicationPositionFromXtrabackupInfo(t *testing.T) {
input := `tool_version = 8.0.35-30
binlog_pos = filename 'vt-0476396352-bin.000005', position '310088991', GTID of the last change '145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,
1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,
47b59de1-b368-11e9-b48b-624401d35560:1-152981,
557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,
599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,
b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262'
format = xbstream
`
want := "145e508e-ae54-11e9-8ce6-46824dd1815e:1-3,1e51f8be-ae54-11e9-a7c6-4280a041109b:1-3,47b59de1-b368-11e9-b48b-624401d35560:1-152981,557def0a-b368-11e9-84ed-f6fffd91cc57:1-3,599ef589-ae55-11e9-9688-ca1f44501925:1-14857169,b9ce485d-b36b-11e9-9b17-2a6e0a6011f4:1-371262"

tmp, err := os.MkdirTemp(t.TempDir(), "test")
assert.NoError(t, err)

f, err := os.Create(path.Join(tmp, xtrabackupInfoFile))
assert.NoError(t, err)
_, err = f.WriteString(input)
assert.NoError(t, err)
assert.NoError(t, f.Close())

pos, err := findReplicationPositionFromXtrabackupInfo(tmp, "MySQL56", logutil.NewConsoleLogger())
assert.NoError(t, err)
assert.Equal(t, want, pos.String())
}

func TestFindReplicationPositionNoMatch(t *testing.T) {
func TestFindReplicationPositionNoMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `nothing`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

func TestFindReplicationPositionEmptyMatch(t *testing.T) {
func TestFindReplicationPositionEmptyMatchFromXtrabackupInfo(t *testing.T) {
// Make sure failure to find a match triggers an error.
input := `GTID of the last change '
'`

_, err := findReplicationPosition(input, "MySQL56", logutil.NewConsoleLogger())
_, err := findReplicationPositionFromXtrabackupInfo(input, "MySQL56", logutil.NewConsoleLogger())
assert.Error(t, err)
}

Expand Down

0 comments on commit fa9969f

Please sign in to comment.