Skip to content

Commit

Permalink
Fix binlog backup not to be missing (#604)
Browse files Browse the repository at this point in the history
* Fix binlog backup not to be missing
  • Loading branch information
shunki-fujita authored Nov 29, 2023
1 parent 4b26fdb commit 6834a5e
Show file tree
Hide file tree
Showing 10 changed files with 540 additions and 116 deletions.
4 changes: 4 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,10 @@ type BackupStatus struct {
// SourceUUID is the `server_uuid` of the backup source instance.
SourceUUID string `json:"sourceUUID"`

// UUIDSet is the `server_uuid` set of all candidate instances for the backup source.
// +optional
UUIDSet map[string]string `json:"uuidSet"`

// BinlogFilename is the binlog filename that the backup source instance was writing to
// at the backup.
BinlogFilename string `json:"binlogFilename"`
Expand Down
7 changes: 7 additions & 0 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

144 changes: 83 additions & 61 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package backup

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
Expand Down Expand Up @@ -45,6 +47,7 @@ type BackupManager struct {
startTime time.Time
sourceIndex int
status bkop.ServerStatus
uuidSet map[string]string
gtidSet string
dumpSize int64
binlogSize int64
Expand Down Expand Up @@ -117,7 +120,13 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
orderedPods[index] = &pods.Items[i]
}

sourceIndex, err := bm.ChoosePod(ctx, orderedPods)
uuidSet, err := bm.GetUUIDSet(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to get server_uuid set: %w", err)
}
bm.uuidSet = uuidSet

sourceIndex, doBackupBinlog, err := bm.ChoosePod(ctx, orderedPods)
if err != nil {
return fmt.Errorf("failed to choose source instance: %w", err)
}
Expand Down Expand Up @@ -146,8 +155,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
}

// dump and upload binlog for the second or later backups
lastBackup := &bm.cluster.Status.Backup
if !lastBackup.Time.IsZero() {
if doBackupBinlog {
if err := bm.backupBinlog(ctx, op); err != nil {
// since the full backup has succeeded, we should continue
ev := event.BackupNoBinlog.ToEvent(bm.clusterRef)
Expand All @@ -172,6 +180,7 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
sb.Elapsed = metav1.Duration{Duration: elapsed}
sb.SourceIndex = sourceIndex
sb.SourceUUID = bm.status.UUID
sb.UUIDSet = bm.uuidSet
sb.BinlogFilename = bm.status.CurrentBinlog
sb.GTIDSet = bm.gtidSet
sb.DumpSize = bm.dumpSize
Expand All @@ -194,89 +203,87 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
return nil
}

func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, error) {
func (bm *BackupManager) GetUUIDSet(ctx context.Context, pods []*corev1.Pod) (map[string]string, error) {
cluster := bm.cluster
// if this is the first time
if cluster.Status.Backup.Time.IsZero() {
if len(pods) == 1 {
return 0, nil
}
uuids := make(map[string]string, len(pods))
for i := range pods {
if podIsReady(pods[i]) {
op, err := newOperator(cluster.PodHostname(i),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return nil, fmt.Errorf("failed to create operator: %w", err)
}
defer op.Close()

for i := range pods {
if i == int(cluster.Status.CurrentPrimaryIndex) {
if err := op.GetServerStatus(ctx, &bm.status); err != nil {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
uuids[strconv.Itoa(i)] = bm.status.UUID
}
return int(cluster.Status.CurrentPrimaryIndex), nil
}

lastIndex := cluster.Status.Backup.SourceIndex
op, err := newOperator(cluster.PodHostname(lastIndex),
constants.MySQLPort,
constants.BackupUser,
bm.mysqlPassword,
bm.threads)
if err != nil {
return -1, err
}
defer op.Close()

st := &bkop.ServerStatus{}
if err := op.GetServerStatus(ctx, st); err != nil {
return -1, err
}
return uuids, nil
}

if st.UUID != cluster.Status.Backup.SourceUUID {
bm.log.Info("server_uuid of the last backup source has changed", "index", lastIndex)

// ChoosePod chooses a pod to take a backup from.
// It returns the index of the chosen pod and whether backupBinlog should be called.
func (bm *BackupManager) ChoosePod(ctx context.Context, pods []*corev1.Pod) (int, bool, error) {
currentPrimaryIndex := int(bm.cluster.Status.CurrentPrimaryIndex)
lastBackup := &bm.cluster.Status.Backup
// if this is the first time
if lastBackup.Time.IsZero() {
for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
return i, false, nil
}
}
return cluster.Status.CurrentPrimaryIndex, nil
if podIsReady(pods[currentPrimaryIndex]) {
return currentPrimaryIndex, false, nil
} else {
return 0, false, errors.New("no ready pod exists")
}
}

if !podIsReady(pods[lastIndex]) {
bm.log.Info("the last backup source is not ready", "index", lastIndex)
lastIndex := lastBackup.SourceIndex
choosableIndexes := getIdxsWithUnchangedUUID(bm.uuidSet, lastBackup.UUIDSet)

if len(choosableIndexes) == 0 {
bm.log.Info("the server_uuid of all pods has changed or some pods are not ready")
bm.warnings = append(bm.warnings, "skip binlog backups because some binlog files may be missing")
for i := range pods {
if i == lastIndex {
continue
}
if i == int(cluster.Status.CurrentPrimaryIndex) {
if i == currentPrimaryIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
return i, false, nil
}
}
return cluster.Status.CurrentPrimaryIndex, nil
if podIsReady(pods[currentPrimaryIndex]) {
return currentPrimaryIndex, false, nil
} else {
return 0, false, errors.New("no ready pod exists")
}
}

if lastIndex == int(cluster.Status.CurrentPrimaryIndex) {
bm.log.Info("the last backup source is not a replica", "index", lastIndex)
for i := range pods {
if i == lastIndex {
continue
}
if podIsReady(pods[i]) {
return i, nil
}
replicas := []int{}
for _, i := range choosableIndexes {
if i == currentPrimaryIndex {
continue
}
if i == lastIndex {
return i, true, nil
}
return cluster.Status.CurrentPrimaryIndex, nil
replicas = append(replicas, i)
}

return lastIndex, nil
if len(replicas) != 0 {
return replicas[0], true, nil
}
return currentPrimaryIndex, true, nil
}

func (bm *BackupManager) backupFull(ctx context.Context, op bkop.Operator) error {
Expand Down Expand Up @@ -361,7 +368,7 @@ func (bm *BackupManager) backupBinlog(ctx context.Context, op bkop.Operator) err
}

if err := op.DumpBinlog(ctx, binlogDir, binlogName, lastBackup.GTIDSet); err != nil {
return fmt.Errorf("failed to take a binlog backup: %w", err)
return fmt.Errorf("failed to exec mysqlbinlog command: %w", err)
}

usage, err := dirUsage(binlogDir)
Expand Down Expand Up @@ -470,3 +477,18 @@ func dirUsage(dir string) (int64, error) {

return usage, nil
}

func getIdxsWithUnchangedUUID(current, last map[string]string) []int {
idxs := []int{}
for key, currentUUID := range current {
if lastUUID, ok := last[key]; ok && currentUUID == lastUUID {
i, err := strconv.Atoi(key)
if err != nil {
continue
}
idxs = append(idxs, i)
}
}
sort.Ints(idxs)
return idxs
}
Loading

0 comments on commit 6834a5e

Please sign in to comment.