Skip to content

Commit

Permalink
Timeouted wait until pipe created before restoring from it (#84)
Browse files Browse the repository at this point in the history
During a resize-restore or a single-data-file restore, the gprestore helper
creates pipes to restore the current table and the next one. During restore, two
errors may occur on the coordinator. For example, the table or its schema does
not exist, and the next table or its schema does not exist. In this case, the
main gprestore process does not run COPY commands, which means that no one is
reading from the corresponding pipes. If the on-error-continue option is also
used, then the main gprestore process creates skip-files on the segments and
begins to restore the third table. At the same time, the gprestore helper may
hang waiting until someone starts reading from the pipe, or until a skip-file
appears, or hang while deleting a pipe, or some other lengthy operation. And
then the main gprestore process can try unsuccessfully to restore as many tables
as desired until the corresponding pipe is created. So there is a race between
the entire read loop (which can create skip files) and the write loop (which can
do its own work without having to process skip files) before moving on to a new
loop. It appears that this situation was introduced with the addition of the
on-error-continue option. Without it, any error was fatal, the process stopped.
And when using this option and without any synchronization between the reader
and the writer, we can easily not have a channel for reading.

This patch adds timeouted wait until pipe created before restoring from it.

(cherry picked from commit f590f1f)
  • Loading branch information
RekGRpth committed Jul 26, 2024
1 parent 8ca61dc commit 4e78f4f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 1 deletion.
1 change: 1 addition & 0 deletions restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func CopyTableIn(queryContext context.Context, connectionPool *dbconn.DBConn, ta
if singleDataFile || resizeCluster {
//helper.go handles compression, so we don't want to set it here
customPipeThroughCommand = utils.DefaultPipeThroughProgram
readFromDestinationCommand = fmt.Sprintf("(timeout 300 bash -c \"while [ ! -p \"%s\" ]; do sleep 1; done\" || (echo \"Pipe not found %s\">&2; exit 1)) && %s", destinationToRead, destinationToRead, readFromDestinationCommand)
} else if MustGetFlagString(options.PLUGIN_CONFIG) != "" {
readFromDestinationCommand = fmt.Sprintf("%s restore_data %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath)
}
Expand Down
2 changes: 1 addition & 1 deletion restore/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("restore/data tests", func() {
Expect(err).ShouldNot(HaveOccurred())
})
It("will restore a table from a single data file", func() {
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM 'cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT")
execStr := regexp.QuoteMeta("COPY public.foo(i,j) FROM PROGRAM '(timeout 300 bash -c \"while [ ! -p \"<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456\" ]; do sleep 1; done\" || (echo \"Pipe not found <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456\">&2; exit 1)) && cat <SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456' WITH CSV DELIMITER ',' ON SEGMENT")
mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0))
filename := "<SEG_DATA_DIR>/backups/20170101/20170101010101/gpbackup_<SEGID>_20170101010101_pipe_3456"
_, err := restore.CopyTableIn(context.Background(), connectionPool, "public.foo", "(i,j)", filename, true, 0)
Expand Down

0 comments on commit 4e78f4f

Please sign in to comment.