Skip to content

Commit

Permalink
Fix gprestore handle copy error with jobs > 1 (#78)
Browse files Browse the repository at this point in the history
Fix gprestore handle copy error with jobs > 1.

When using the jobs > 1 option, if a copy error occurred, the corresponding
goroutine was terminated. But other goroutines continued to wait for COPY and
restore froze.

This patch adds the use of execution context to goroutines, which allows
requests to be canceled on errors.
  • Loading branch information
RekGRpth authored May 6, 2024
1 parent 8c25531 commit 76cd98a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
15 changes: 15 additions & 0 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,21 @@ LANGUAGE plpgsql NO SQL;`)
assertArtifactsCleaned(restoreConn, "20240502095933")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;")
})
It("Will not hang after error during restore with jobs", func() {
command := exec.Command("tar", "-xzf", "resources/2-segment-db-error.tar.gz", "-C", backupDir)
mustRunCommand(command)
gprestoreCmd := exec.Command(gprestorePath,
"--timestamp", "20240502095933",
"--redirect-db", "restoredb",
"--backup-dir", path.Join(backupDir, "2-segment-db-error"),
"--resize-cluster", "--jobs", "3")
output, err := gprestoreCmd.CombinedOutput()
Expect(err).To(HaveOccurred())
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t1`))
Expect(string(output)).To(ContainSubstring(`Error loading data into table public.t2`))
assertArtifactsCleaned(restoreConn, "20240502095933")
testhelper.AssertQueryRuns(restoreConn, "DROP TABLE t0; DROP TABLE t1; DROP TABLE t2; DROP TABLE t3; DROP TABLE t4;")
})
})
Describe("Restore indexes and constraints on exchanged partition tables", func() {
BeforeEach(func() {
Expand Down
22 changes: 17 additions & 5 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package restore
*/

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -25,7 +26,7 @@ var (
tableDelim = ","
)

func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) {
func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, tableName string, tableAttributes string, destinationToRead string, singleDataFile bool, whichConn int) (int64, error) {
whichConn = connectionPool.ValidateConnNum(whichConn)
copyCommand := ""
readFromDestinationCommand := "cat"
Expand Down Expand Up @@ -61,7 +62,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute
} else {
gplog.Verbose(`Executing "%s" on master`, query)
}
result, err := connectionPool.Exec(query, whichConn)
result, err := connectionPool.ExecContext(queryContext, query, whichConn)
if err != nil {
errStr := fmt.Sprintf("Error loading data into table %s", tableName)

Expand All @@ -79,7 +80,7 @@ func CopyTableIn(connectionPool *dbconn.DBConn, tableName string, tableAttribute
return numRows, err
}

func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int) error {
func restoreSingleTableData(queryContext context.Context, fpInfo *filepath.FilePathInfo, entry toc.CoordinatorDataEntry, tableName string, whichConn int, origSize int, destSize int) error {
resizeCluster := MustGetFlagBool(options.RESIZE_CLUSTER)
destinationToRead := ""
if backupConfig.SingleDataFile || resizeCluster {
Expand All @@ -88,7 +89,7 @@ func restoreSingleTableData(fpInfo *filepath.FilePathInfo, entry toc.Coordinator
destinationToRead = fpInfo.GetTableBackupFilePathForCopyCommand(entry.Oid, utils.GetPipeThroughProgram().Extension, backupConfig.SingleDataFile)
}
gplog.Debug("Reading from %s", destinationToRead)
numRowsRestored, err := CopyTableIn(connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn)
numRowsRestored, err := CopyTableIn(queryContext, connectionPool, tableName, entry.AttributeString, destinationToRead, backupConfig.SingleDataFile, whichConn)
if err != nil {
return err
}
Expand Down Expand Up @@ -214,6 +215,8 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
var numErrors int32
var mutex = &sync.Mutex{}
panicChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors

for i := 0; i < connectionPool.NumConns; i++ {
workerPool.Add(1)
Expand All @@ -227,8 +230,15 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co

setGUCsForConnection(gucStatements, whichConn)
for entry := range tasks {
// Check if any error occurred in any other goroutines:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
if wasTerminated {
dataProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
}
tableName := utils.MakeFQN(entry.Schema, entry.Name)
Expand All @@ -241,7 +251,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
err = TruncateTable(tableName, whichConn)
}
if err == nil {
err = restoreSingleTableData(&fpInfo, entry, tableName, whichConn, origSize, destSize)
err = restoreSingleTableData(ctx, &fpInfo, entry, tableName, whichConn, origSize, destSize)

if gplog.GetVerbosity() > gplog.LOGINFO {
// No progress bar at this log level, so we note table count here
Expand All @@ -256,6 +266,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
atomic.AddInt32(&numErrors, 1)
if !MustGetFlagBool(options.ON_ERROR_CONTINUE) {
dataProgressBar.(*pb.ProgressBar).NotPrint = true
cancel()
return
} else if connectionPool.Version.AtLeast("6") && (backupConfig.SingleDataFile || MustGetFlagBool(options.RESIZE_CLUSTER)) {
// inform segment helpers to skip this entry
Expand All @@ -270,6 +281,7 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co
agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if agentErr != nil {
gplog.Error(agentErr.Error())
cancel()
return
}
}
Expand Down
17 changes: 9 additions & 8 deletions restore/data_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package restore_test

import (
"context"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -33,7 +34,7 @@ var _ = Describe("restore/data tests", func() {
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.gz | gzip -d -c' WITH CSV DELIMITER ',' ON SEGMENT")
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.gz"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -42,23 +43,23 @@ var _ = Describe("restore/data tests", func() {
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.zst | zstd --decompress -c' WITH CSV DELIMITER ',' ON SEGMENT")
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456.zst"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
It("will restore a table from its own file without compression", func() {
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456 | cat -' WITH CSV DELIMITER ',' ON SEGMENT")
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
It("will restore a table from a single data file", func() {
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456 | cat -' WITH CSV DELIMITER ',' ON SEGMENT")
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, true, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -71,7 +72,7 @@ var _ = Describe("restore/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456.gz"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -84,7 +85,7 @@ var _ = Describe("restore/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456.zst"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -96,7 +97,7 @@ var _ = Describe("restore/data tests", func() {
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))

filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456.gz"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).ShouldNot(HaveOccurred())
})
Expand All @@ -110,7 +111,7 @@ var _ = Describe("restore/data tests", func() {
}
mock.ExpectExec(execStr).WillReturnError(pgErr)
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_3456"
_, err := restore.CopyTableIn(connectionPool, "public.foo", "(i,j)", filename, false, 0)
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, false, 0)

Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("Error loading data into table public.foo: " +
Expand Down

0 comments on commit 76cd98a

Please sign in to comment.