Skip to content

Commit

Permalink
Add execution context into backup
Browse files Browse the repository at this point in the history
Plus add checker goroutine into backup.
  • Loading branch information
whitehawk committed Aug 7, 2024
1 parent 9384554 commit 7102051
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 29 deletions.
36 changes: 30 additions & 6 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package backup
*/

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -62,7 +63,7 @@ type BackupProgressCounters struct {
ProgressBar utils.ProgressBar
}

func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
if wasTerminated {
return -1, nil
}
Expand Down Expand Up @@ -112,7 +113,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
} else {
utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query)
}
result, err := connectionPool.Exec(query, connNum)
result, err := connectionPool.ExecContext(queryContext, query, connNum)
if err != nil {
return 0, err
}
Expand All @@ -121,7 +122,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
return numRows, nil
}

func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
workerInfo := ""
if gplog.GetVerbosity() >= gplog.LOGVERBOSE {
workerInfo = fmt.Sprintf("Worker %d: ", whichConn)
Expand All @@ -137,7 +138,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters
} else {
destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false)
}
rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn)
rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn)
if err != nil {
return err
}
Expand Down Expand Up @@ -181,6 +182,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
tasks <- table
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the backup helper didn't close before it died.
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

/*
* Worker 0 is a special database connection that
* 1) Exports the database snapshot if the feature is supported
Expand Down Expand Up @@ -212,8 +222,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
* transaction commits and the locks are released.
*/
for table := range tasks {
// Check if any error occurred in any other goroutines:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
}
if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil {
Expand Down Expand Up @@ -261,7 +278,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
break
}
}
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
// if copy isn't working, skip remaining backups, and let downstream panic
// handling deal with it
Expand Down Expand Up @@ -294,14 +311,21 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
}()
for _, table := range tables {
for {
// Check if any error occurred in any other goroutines:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
cancel()
return
}
state, _ := oidMap.Load(table.Oid)
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
isErroredBackup.Store(true)
gplog.Fatal(err, "")
Expand Down
19 changes: 10 additions & 9 deletions backup/data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup_test

import (
"context"
"fmt"
"regexp"

Expand Down Expand Up @@ -79,7 +80,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.gz"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -92,7 +93,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -102,7 +103,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.zst"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -115,7 +116,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -125,7 +126,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -138,7 +139,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -148,7 +149,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand Down Expand Up @@ -178,7 +179,7 @@ var _ = Describe("backup/data tests", func() {
backupFile := fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_20170101010101_pipe_(.*)_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)
err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expand All @@ -190,7 +191,7 @@ var _ = Describe("backup/data tests", func() {
backupFile := fmt.Sprintf("<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)
err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expand Down
15 changes: 1 addition & 14 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/dbconn"
Expand Down Expand Up @@ -262,19 +261,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
// Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the restore helper didn't close before it died.
if backupConfig.SingleDataFile || resizeCluster {
go func() {
for {
time.Sleep(5 * time.Second)
remoteOutput := globalCluster.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string {
helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
return fmt.Sprintf("! ls %s", helperErrorFileName)
})
if remoteOutput.NumErrors != 0 {
gplog.Error("gpbackup_helper failed to start on some segments")
cancel()
}
}
}()
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

for i := 0; i < connectionPool.NumConns; i++ {
Expand Down
16 changes: 16 additions & 0 deletions utils/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,19 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster,
return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid)
})
}

func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) {
go func() {
for {
time.Sleep(5 * time.Second)
remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string {
helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
return fmt.Sprintf("! ls %s", helperErrorFileName)
})
if remoteOutput.NumErrors != 0 {
gplog.Error("gpbackup_helper failed to start on some segments")
cancel()
}
}
}()
}

0 comments on commit 7102051

Please sign in to comment.