diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 8d228d073..14e573404 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2263,6 +2263,21 @@ LANGUAGE plpgsql NO SQL;`) assertArtifactsCleaned(restoreConn, "20240502095933") testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") }) + It("Will not hang after error during restore with jobs", func() { + command := exec.Command("tar", "-xzf", "resources/2-segment-db-error.tar.gz", "-C", backupDir) + mustRunCommand(command) + gprestoreCmd := exec.Command(gprestorePath, + "--timestamp", "20240502095933", + "--redirect-db", "restoredb", + "--backup-dir", path.Join(backupDir, "2-segment-db-error"), + "--resize-cluster", "--jobs", "3") + output, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`)) + Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2`)) + assertArtifactsCleaned(restoreConn, "20240502095933") + testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;") + }) }) Describe("Restore indexes and constraints on exchanged partition tables", func() { BeforeEach(func() { diff --git a/restore/data.go b/restore/data.go index 97c7afb56..03e2617cb 100644 --- a/restore/data.go +++ b/restore/data.go @@ -5,6 +5,7 @@ package restore */ import ( + "context" "fmt" "sync" "sync/atomic" @@ -25,7 +26,7 @@ var ( tableDelim = "," ) -func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { +func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) { whichConn = connectionPool.ValidateConnNum(whichConn) copyCommand := "" readFromDestinationCommand := "cat" @@ -61,7 +62,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute } else { gplog.Verbose(`Executing "%s" on master`, query) } - result, err := connectionPool.Exec(query, whichConn) + result, err := connectionPool.ExecContext(queryContext, query, whichConn) if err != nil { errStr := fmt.Sprintf("Error loading data into table %s", tableName) @@ -79,7 +80,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute return numRows, err } -func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int) error { +func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int) error { resizeCluster := MustGetFlagBool(options.RESIZE_CLUSTER) destinationToRead := "" if backupConfig.SingleDataFile || resizeCluster { @@ -88,7 +89,7 @@ func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.Coordinator destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile) } gplog.Debug("Reading from %s", destinationToRead) - numRowsRestored, err := CopyTableIn(connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) + numRowsRestored, err := CopyTableIn(queryContext, connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn) if err != nil { return err } @@ -214,6 +215,8 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co var numErrors int32 var mutex = &sync.Mutex{} panicChan := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure it's called to release resources even if no errors for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) @@ -227,8 +230,15 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co setGUCsForConnection(gucStatements, whichConn) for entry := range tasks { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated { dataProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } tableName := utils.MakeFQN(entry.Schema, entry.Name) @@ -241,7 +251,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co err = TruncateTable(tableName, whichConn) } if err == nil { - err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn, origSize, destSize) + err = restoreSingleTableData(ctx, &fpInfo, entry, tableName, whichConn, origSize, destSize) if gplog.GetVerbosity() > gplog.LOGINFO { // No progress bar at this log level, so we note table count here @@ -256,6 +266,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co atomic.AddInt32(&numErrors, 1) if !MustGetFlagBool(options.ON_ERROR_CONTINUE) { dataProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } else if connectionPool.Version.AtLeast("6") && (backupConfig.SingleDataFile || MustGetFlagBool(options.RESIZE_CLUSTER)) { // inform segment helpers to skip this entry @@ -270,6 +281,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo) if agentErr != nil { gplog.Error(agentErr.Error()) + cancel() return } } diff --git a/restore/data_test.go b/restore/data_test.go index 2323d83ef..c39a9e5ae 100644 --- a/restore/data_test.go +++ b/restore/data_test.go @@ -1,6 +1,7 @@ package restore_test import ( + "context" "fmt" "regexp" "sort" @@ -33,7 +34,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz | gzip -d -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -42,7 +43,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst | zstd --decompress -c' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -50,7 +51,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_3456 | cat -' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -58,7 +59,7 @@ var _ = Describe("restore/data tests", func() { execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat /backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456 | cat -' WITH CSV DELIMITER ',' ON SEGMENT") mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, true, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -71,7 +72,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -84,7 +85,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.zst" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -96,7 +97,7 @@ var _ = Describe("restore/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_pipe_3456.gz" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).ShouldNot(HaveOccurred()) }) @@ -110,7 +111,7 @@ var _ = Describe("restore/data tests", func() { } mock.ExpectExec(execStr).WillReturnError(pgErr) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0) + _, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("Error loading data into table public.foo: " +