Skip to content

Commit

Permalink
Merge branch 'master' into ADBDEV-5861
Browse files Browse the repository at this point in the history
  • Loading branch information
Stolb27 authored Aug 16, 2024
2 parents 61bbde9 + 8060b4e commit d359abf
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 25 deletions.
36 changes: 30 additions & 6 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package backup
*/

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -62,7 +63,7 @@ type BackupProgressCounters struct {
ProgressBar utils.ProgressBar
}

func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
if wasTerminated {
return -1, nil
}
Expand Down Expand Up @@ -112,7 +113,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
} else {
utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query)
}
result, err := connectionPool.Exec(query, connNum)
result, err := connectionPool.ExecContext(queryContext, query, connNum)
if err != nil {
return 0, err
}
Expand All @@ -121,7 +122,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
return numRows, nil
}

func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
workerInfo := ""
if gplog.GetVerbosity() >= gplog.LOGVERBOSE {
workerInfo = fmt.Sprintf("Worker %d: ", whichConn)
Expand All @@ -137,7 +138,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters
} else {
destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false)
}
rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn)
rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn)
if err != nil {
return err
}
Expand Down Expand Up @@ -181,6 +182,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
tasks <- table
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the backup helper didn't close before it died.
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

/*
* Worker 0 is a special database connection that
* 1) Exports the database snapshot if the feature is supported
Expand Down Expand Up @@ -212,8 +222,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
* transaction commits and the locks are released.
*/
for table := range tasks {
// Check if any error occurred in any other goroutines:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
}
if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil {
Expand Down Expand Up @@ -261,7 +278,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
break
}
}
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
// if copy isn't working, skip remaining backups, and let downstream panic
// handling deal with it
Expand Down Expand Up @@ -294,14 +311,21 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
}()
for _, table := range tables {
for {
// Check if any error occurred in any other goroutines:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated || isErroredBackup.Load() {
cancel()
return
}
state, _ := oidMap.Load(table.Oid)
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
isErroredBackup.Store(true)
gplog.Fatal(err, "")
Expand Down
19 changes: 10 additions & 9 deletions backup/data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup_test

import (
"context"
"fmt"
"regexp"

Expand Down Expand Up @@ -79,7 +80,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.gz"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -92,7 +93,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -102,7 +103,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.zst"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -115,7 +116,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -125,7 +126,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -138,7 +139,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -148,7 +149,7 @@ var _ = Describe("backup/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"

_, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum)
_, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum)

Expect(err).ShouldNot(HaveOccurred())
})
Expand Down Expand Up @@ -178,7 +179,7 @@ var _ = Describe("backup/data tests", func() {
backupFile := fmt.Sprintf("<SEG_DATA_DIR>/gpbackup_<SEGID>_20170101010101_pipe_(.*)_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)
err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expand All @@ -190,7 +191,7 @@ var _ = Describe("backup/data tests", func() {
backupFile := fmt.Sprintf("<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_%d", testTable.Oid)
copyCmd := fmt.Sprintf(copyFmtStr, backupFile)
mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10))
err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0)
err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0)

Expect(err).ShouldNot(HaveOccurred())
Expect(rowsCopiedMap[0]).To(Equal(int64(10)))
Expand Down
13 changes: 13 additions & 0 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,19 @@ LANGUAGE plpgsql NO SQL;`)
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.d: COPY d, line 1: "\d,d": ERROR: null value in column "c" violates not-null constraint`))
Expect(string(output)).To(MatchRegexp(`Error loading data into table public.e: COPY e, line 1: "\d,e": ERROR: null value in column "c" violates not-null constraint`))
Expect(string(output)).To(ContainSubstring(`Encountered 3 error(s) during table data restore`))
homeDir := os.Getenv("HOME")
helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*"))
helperCommand := exec.Command("grep", "Error copying data", helperFiles[len(helperFiles)-1])
helperOutput, _ := helperCommand.CombinedOutput()
Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16397, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16397, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16397, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16403, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16403, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16403, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16409, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16409, Batch 0: Error encountered: Error copying data: write`))
Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16409, Batch 0: Error encountered: Error copying data: write`))
assertArtifactsCleaned("20240710143553")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d; DROP TABLE e; DROP TABLE f; DROP TABLE g; DROP TABLE h;")
})
Expand Down
79 changes: 79 additions & 0 deletions end_to_end/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/exec"
path "path/filepath"
"time"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/dbconn"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/greenplum-db/gpbackup/testutils"
"github.com/greenplum-db/gpbackup/utils"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func copyPluginToAllHosts(conn *dbconn.DBConn, pluginPath string) {
Expand Down Expand Up @@ -322,6 +324,83 @@ var _ = Describe("End to End plugin tests", func() {
Skip("This test is only needed for the most recent backup versions")
}
})
It("Will not hang if gpbackup and gprestore runs with single-data-file and the helper goes down at its start", func(ctx SpecContext) {
copyPluginToAllHosts(backupConn, examplePluginExec)

testhelper.AssertQueryRuns(backupConn, "CREATE TABLE t0(a int);")
testhelper.AssertQueryRuns(backupConn, "INSERT INTO t0 SELECT i FROM generate_series(1, 10)i;")
defer testhelper.AssertQueryRuns(backupConn, "DROP TABLE t0;")

output := gpbackup(gpbackupPath, backupHelperPath,
"--single-data-file",
"--plugin-config", examplePluginTestConfig)
timestamp := getBackupTimestamp(string(output))

backupCluster.GenerateAndExecuteCommand(
"Instruct plugin to fail",
cluster.ON_HOSTS,
func(contentID int) string {
return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE")
})

defer backupCluster.GenerateAndExecuteCommand(
"Unset plugin instruction",
cluster.ON_HOSTS,
func(contentID int) string {
return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE")
})

gprestoreCmd := exec.Command(gprestorePath,
"--timestamp", timestamp,
"--redirect-db", "restoredb",
"--plugin-config", examplePluginTestConfig)

_, err := gprestoreCmd.CombinedOutput()
Expect(err).To(HaveOccurred())

assertArtifactsCleaned(timestamp)
}, SpecTimeout(time.Second*30))
It("Will not hang if gprestore runs with cluster resize and the helper goes down on one of the tables", func(ctx SpecContext) {
copyPluginToAllHosts(backupConn, examplePluginExec)

pluginBackupDirectory := `/tmp/plugin_dest`
os.Mkdir(pluginBackupDirectory, 0777)
command := exec.Command("tar", "-xzf", fmt.Sprintf("resources/%s.tar.gz", "9-segment-db-with-plugin"), "-C", pluginBackupDirectory)
mustRunCommand(command)

backupCluster.GenerateAndExecuteCommand(
"Instruct plugin to fail",
cluster.ON_HOSTS,
func(contentID int) string {
return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE")
})

defer backupCluster.GenerateAndExecuteCommand(
"Unset plugin instruction",
cluster.ON_HOSTS,
func(contentID int) string {
return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE")
})

timestamp := "20240812201233"

gprestoreCmd := exec.Command(gprestorePath,
"--resize-cluster",
"--timestamp", timestamp,
"--redirect-db", "restoredb",
"--plugin-config", examplePluginTestConfig)

// instruct plugin to die only before restoring the last table
gprestoreCmd.Env = os.Environ()
gprestoreCmd.Env = append(gprestoreCmd.Env, "GPBACKUP_PLUGIN_DIE_ON_OID=16392")

_, err := gprestoreCmd.CombinedOutput()
Expect(err).To(HaveOccurred())

assertArtifactsCleaned(timestamp)

os.RemoveAll(pluginBackupDirectory)
}, SpecTimeout(time.Second*30))
It("runs gpbackup and gprestore with plugin, single-data-file, and no-compression", func() {
copyPluginToAllHosts(backupConn, examplePluginExec)

Expand Down
Binary file not shown.
8 changes: 4 additions & 4 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ func doRestoreAgent() error {

LoopEnd:
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe))
err = flushAndCloseRestoreWriter(currentPipe, tableOid)
if err != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err))
errPipe := flushAndCloseRestoreWriter(currentPipe, tableOid)
if errPipe != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe))
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum))
Expand All @@ -367,7 +367,7 @@ func doRestoreAgent() error {
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe))
errPipe := deletePipe(currentPipe)
errPipe = deletePipe(currentPipe)
if errPipe != nil {
logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, currentPipe, errPipe)
}
Expand Down
8 changes: 3 additions & 5 deletions integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,9 @@ options:
err = helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

for _, i := range []int{1, 3} {
contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i))
// Empty output
Expect(contents).To(Equal([]byte{}))
}
contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, 1))
// Empty output
Expect(contents).To(Equal([]byte{}))

err = helperCmd.Wait()
Expect(err).To(HaveOccurred())
Expand Down
6 changes: 5 additions & 1 deletion plugins/example_plugin.bash
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ restore_data() {
timestamp_day_dir=${timestamp_dir%??????}
if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then
echo 'Some plugin warning' >&2
elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then
elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" -a "$GPBACKUP_PLUGIN_DIE_ON_OID" = "" ] ; then
exit 1
elif [[ -e "/tmp/GPBACKUP_PLUGIN_DIE" && "$filename" == *"$GPBACKUP_PLUGIN_DIE_ON_OID"* ]] ; then
# sleep a while for test purposes - to let gprestore start COPY commands
sleep 5
exit 1
fi
cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename
Expand Down
6 changes: 6 additions & 0 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

// Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending
// COPY commands that could be hanging on pipes, that the restore helper didn't close before it died.
if backupConfig.SingleDataFile || resizeCluster {
utils.StartHelperChecker(globalCluster, globalFPInfo, cancel)
}

for i := 0; i < connectionPool.NumConns; i++ {
workerPool.Add(1)
go func(whichConn int) {
Expand Down
16 changes: 16 additions & 0 deletions utils/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,19 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster,
return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid)
})
}

func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) {
go func() {
for {
time.Sleep(5 * time.Second)
remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string {
helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID))
return fmt.Sprintf("! ls %s", helperErrorFileName)
})
if remoteOutput.NumErrors != 0 {
gplog.Error("gpbackup_helper failed to start on some segments")
cancel()
}
}
}()
}

0 comments on commit d359abf

Please sign in to comment.