diff --git a/backup/data.go b/backup/data.go index e3d4c37d4..c06debb7a 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,6 +5,7 @@ package backup */ import ( + "context" "errors" "fmt" "strings" @@ -62,7 +63,7 @@ type BackupProgressCounters struct { ProgressBar utils.ProgressBar } -func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { +func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { if wasTerminated { return -1, nil } @@ -112,7 +113,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite } else { utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query) } - result, err := connectionPool.Exec(query, connNum) + result, err := connectionPool.ExecContext(queryContext, query, connNum) if err != nil { return 0, err } @@ -121,7 +122,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite return numRows, nil } -func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { +func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { workerInfo := "" if gplog.GetVerbosity() >= gplog.LOGVERBOSE { workerInfo = fmt.Sprintf("Worker %d: ", whichConn) @@ -137,7 +138,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters } else { destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false) } - rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn) + rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn) if err != nil { return err } @@ -181,6 +182,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { tasks <- table } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure it's called to release resources even if no errors + + // Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending + // COPY commands that could be hanging on pipes, that the backup helper didn't close before it died. + if MustGetFlagBool(options.SINGLE_DATA_FILE) { + utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) + } + /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported @@ -212,8 +222,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { * transaction commits and the locks are released. */ for table := range tasks { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil { @@ -261,7 +278,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { break } } - err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) + err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn) if err != nil { // if copy isn't working, skip remaining backups, and let downstream panic // handling deal with it @@ -294,14 +311,21 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { }() for _, table := range tables { for { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { + cancel() return } state, _ := oidMap.Load(table.Oid) if state.(int) == Unknown { time.Sleep(time.Millisecond * 50) } else if state.(int) == Deferred { - err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) + err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0) if err != nil { isErroredBackup.Store(true) gplog.Fatal(err, "") diff --git a/backup/data_test.go b/backup/data_test.go index 2743d1328..3fd8fe038 100644 --- a/backup/data_test.go +++ b/backup/data_test.go @@ -1,6 +1,7 @@ package backup_test import ( + "context" "fmt" "regexp" @@ -79,7 +80,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -92,7 +93,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -102,7 +103,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -115,7 +116,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -125,7 +126,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -138,7 +139,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -148,7 +149,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -178,7 +179,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/gpbackup__20170101010101_pipe_(.*)_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) @@ -190,7 +191,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/backups/20170101/20170101010101/gpbackup__20170101010101_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 65bc813f6..a8385df4e 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2537,6 +2537,19 @@ LANGUAGE plpgsql NO SQL;`) Expect(string(output)).To(MatchRegexp(`Error loading data into table public.d: COPY d, line 1: "\d,d": ERROR: null value in column "c" violates not-null constraint`)) Expect(string(output)).To(MatchRegexp(`Error loading data into table public.e: COPY e, line 1: "\d,e": ERROR: null value in column "c" violates not-null constraint`)) Expect(string(output)).To(ContainSubstring(`Encountered 3 error(s) during table data restore`)) + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + helperCommand := exec.Command("grep", "Error copying data", helperFiles[len(helperFiles)-1]) + helperOutput, _ := helperCommand.CombinedOutput() + Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16397, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16397, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16397, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16403, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16403, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16403, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 2: Oid 16409, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 0: Oid 16409, Batch 0: Error encountered: Error copying data: write`)) + Expect(string(helperOutput)).To(ContainSubstring(`Segment 1: Oid 16409, Batch 0: Error encountered: Error copying data: write`)) assertArtifactsCleaned("20240710143553") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE a; DROP TABLE b; DROP TABLE c; DROP TABLE d; DROP TABLE e; DROP TABLE f; DROP TABLE g; DROP TABLE h;") }) diff --git a/end_to_end/plugin_test.go b/end_to_end/plugin_test.go index 8fd8b5492..5e04483c5 100644 --- a/end_to_end/plugin_test.go +++ b/end_to_end/plugin_test.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" path "path/filepath" + "time" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -14,6 +15,7 @@ import ( "github.com/greenplum-db/gpbackup/testutils" "github.com/greenplum-db/gpbackup/utils" . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) func copyPluginToAllHosts(conn *dbconn.DBConn, pluginPath string) { @@ -322,6 +324,83 @@ var _ = Describe("End to End plugin tests", func() { Skip("This test is only needed for the most recent backup versions") } }) + It("Will not hang if gpbackup and gprestore runs with single-data-file and the helper goes down at its start", func(ctx SpecContext) { + copyPluginToAllHosts(backupConn, examplePluginExec) + + testhelper.AssertQueryRuns(backupConn, "CREATE TABLE t0(a int);") + testhelper.AssertQueryRuns(backupConn, "INSERT INTO t0 SELECT i FROM generate_series(1, 10)i;") + defer testhelper.AssertQueryRuns(backupConn, "DROP TABLE t0;") + + output := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--plugin-config", examplePluginTestConfig) + timestamp := getBackupTimestamp(string(output)) + + backupCluster.GenerateAndExecuteCommand( + "Instruct plugin to fail", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE") + }) + + defer backupCluster.GenerateAndExecuteCommand( + "Unset plugin instruction", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE") + }) + + gprestoreCmd := exec.Command(gprestorePath, + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--plugin-config", examplePluginTestConfig) + + _, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + + assertArtifactsCleaned(timestamp) + }, SpecTimeout(time.Second*30)) + It("Will not hang if gprestore runs with cluster resize and the helper goes down on one of the tables", func(ctx SpecContext) { + copyPluginToAllHosts(backupConn, examplePluginExec) + + pluginBackupDirectory := `/tmp/plugin_dest` + os.Mkdir(pluginBackupDirectory, 0777) + command := exec.Command("tar", "-xzf", fmt.Sprintf("resources/%s.tar.gz", "9-segment-db-with-plugin"), "-C", pluginBackupDirectory) + mustRunCommand(command) + + backupCluster.GenerateAndExecuteCommand( + "Instruct plugin to fail", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE") + }) + + defer backupCluster.GenerateAndExecuteCommand( + "Unset plugin instruction", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE") + }) + + timestamp := "20240812201233" + + gprestoreCmd := exec.Command(gprestorePath, + "--resize-cluster", + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--plugin-config", examplePluginTestConfig) + + // instruct plugin to die only before restoring the last table + gprestoreCmd.Env = os.Environ() + gprestoreCmd.Env = append(gprestoreCmd.Env, "GPBACKUP_PLUGIN_DIE_ON_OID=16392") + + _, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + + assertArtifactsCleaned(timestamp) + + os.RemoveAll(pluginBackupDirectory) + }, SpecTimeout(time.Second*30)) It("runs gpbackup and gprestore with plugin, single-data-file, and no-compression", func() { copyPluginToAllHosts(backupConn, examplePluginExec) diff --git a/end_to_end/resources/9-segment-db-with-plugin.tar.gz b/end_to_end/resources/9-segment-db-with-plugin.tar.gz new file mode 100644 index 000000000..9ecebe887 Binary files /dev/null and b/end_to_end/resources/9-segment-db-with-plugin.tar.gz differ diff --git a/helper/restore_helper.go b/helper/restore_helper.go index c188b6494..d49029bb7 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -348,9 +348,9 @@ func doRestoreAgent() error { LoopEnd: logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe)) - err = flushAndCloseRestoreWriter(currentPipe, tableOid) - if err != nil { - logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err)) + errPipe := flushAndCloseRestoreWriter(currentPipe, tableOid) + if errPipe != nil { + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe)) } logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum)) @@ -367,7 +367,7 @@ func doRestoreAgent() error { } logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe)) - errPipe := deletePipe(currentPipe) + errPipe = deletePipe(currentPipe) if errPipe != nil { logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, currentPipe, errPipe) } diff --git a/integration/helper_test.go b/integration/helper_test.go index 75914f3de..e518ca93b 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -372,11 +372,9 @@ options: err = helperCmd.Start() Expect(err).ToNot(HaveOccurred()) - for _, i := range []int{1, 3} { - contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) - // Empty output - Expect(contents).To(Equal([]byte{})) - } + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, 1)) + // Empty output + Expect(contents).To(Equal([]byte{})) err = helperCmd.Wait() Expect(err).To(HaveOccurred()) diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index b2e193d3f..302d5d6c8 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -79,7 +79,11 @@ restore_data() { timestamp_day_dir=${timestamp_dir%??????} if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then echo 'Some plugin warning' >&2 - elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then + elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" -a "$GPBACKUP_PLUGIN_DIE_ON_OID" = "" ] ; then + exit 1 + elif [[ -e "/tmp/GPBACKUP_PLUGIN_DIE" && "$filename" == *"$GPBACKUP_PLUGIN_DIE_ON_OID"* ]] ; then + # sleep a while for test purposes - to let gprestore start COPY commands + sleep 5 exit 1 fi cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename diff --git a/restore/data.go b/restore/data.go index 3c436dd66..2396d7360 100644 --- a/restore/data.go +++ b/restore/data.go @@ -258,6 +258,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors + // Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending + // COPY commands that could be hanging on pipes, that the restore helper didn't close before it died. + if backupConfig.SingleDataFile || resizeCluster { + utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) + } + for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { diff --git a/utils/agent_remote.go b/utils/agent_remote.go index af43f42d4..3ee401afd 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -361,3 +361,19 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid) }) } + +func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) { + go func() { + for { + time.Sleep(5 * time.Second) + remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string { + helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) + return fmt.Sprintf("! ls %s", helperErrorFileName) + }) + if remoteOutput.NumErrors != 0 { + gplog.Error("gpbackup_helper failed to start on some segments") + cancel() + } + } + }() +}