Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync 1.29.1 changes #33

Merged
merged 46 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9ef4afd
Under certain flags, place timestamp+pid lockfile in backup-dir
AJR-VMware Apr 19, 2023
78bfb34
Update datadomain Vault vars
AJR-VMware Apr 17, 2023
4f30f34
Add skip to an end to end test
AJR-VMware Apr 25, 2023
fcd0c6c
Update CI artifact names
AJR-VMware May 2, 2023
fada3b6
Add test for delete_replica
AJR-VMware May 3, 2023
8addcac
Reorganize writes to history file
AJR-VMware May 8, 2023
5b570d7
Handle panics from goroutines
AJR-VMware May 9, 2023
60430ba
Set gp_quicklz_fallback GUC for GPDB7+
AJR-VMware May 11, 2023
fc807b0
Fix cleanup calls in plugin_test.sh
AJR-VMware May 5, 2023
2dfcba5
Add a plugin cleanup job to our CI
AJR-VMware May 6, 2023
d97ffef
Fix hang caused by piped copy commands failing
AJR-VMware May 12, 2023
7346404
Add test coverage for BackupAllTableData
AJR-VMware May 12, 2023
9473841
Don't print "no data" warning for metadata-only backups
jmcatamney Apr 24, 2023
b290ade
Fix filtering of tables with inheritance
jmcatamney Apr 24, 2023
b28ae00
Update test coverage script
jmcatamney May 8, 2023
11ff552
Fix expected error message in test
AJR-VMware May 17, 2023
3b3ebcf
Make directories for taking local lockfiles
AJR-VMware May 17, 2023
d2bb78a
Update gpbackup CI to use gppkg_v2
AJR-VMware May 14, 2023
9becc93
Handle missing SegmentCount for non-resize restore
jmcatamney May 18, 2023
98c7d82
restore_helper: handle files with dots in the name
jmcatamney May 18, 2023
0ec0250
Improve BackupDataForAllTables
AJR-VMware May 18, 2023
5987f5f
Add missing get statement to regression pipeline
AJR-VMware May 19, 2023
e444cc5
Update layout of the gpbackup builds
AJR-VMware May 23, 2023
efb42ca
Fix regression pipeline usage of gppgkv2
AJR-VMware May 24, 2023
8bdcb48
Change "no-inherit" flag to "no-inherits"
jmcatamney May 25, 2023
f624469
Change how we install dummy_seclabel in local CI
AJR-VMware May 30, 2023
3952a05
Rework the data backup test to handle flakes
AJR-VMware May 30, 2023
aa96aba
Refactor ENXIO timeout for restore_helper agents
AJR-VMware Jun 1, 2023
f5f3f7e
Remove prodataaccess for GP7 and above
khuddlefish Jun 2, 2023
657173c
Allow gen_pipeline.py to support python2 or python3
khuddlefish Jun 2, 2023
81507af
Reduce CI flakiness
AJR-VMware Jun 5, 2023
c0e3e66
Move ginkgorecover call
AJR-VMware Jun 6, 2023
d5eb8bc
Update tests to support gp_toolkit extension
AJR-VMware Jun 6, 2023
4a7cfb4
Fix statistics tests for GP7
khuddlefish Jun 7, 2023
8439916
Modify partition filtering behavior
jmcatamney Jun 9, 2023
0896ede
Don't fail if unable to write a restore report
jmcatamney Jun 12, 2023
d1774c9
Prevent report test flakes due to test pollution
jmcatamney Jun 12, 2023
0990667
Improve end-to-end test cleanup
jmcatamney Jun 16, 2023
11de4b5
Update resource group parameters for GPDB 7
jmcatamney Jun 15, 2023
9c0ac44
Explicitly create gp_toolkit in end-to-end tests
jmcatamney Jun 16, 2023
ed05d39
Ignore public.legacy_enum in plugin tests
jmcatamney Jun 16, 2023
3f48845
Don't fail if unable to write an error file
jmcatamney Jul 5, 2023
61acef3
Fix extension tests for GPDB 7
jmcatamney Jul 10, 2023
4ea2f68
Revert "Implement --report-dir option for gprestore (#30)"
Stolb27 Jul 25, 2023
bf4485e
Merge branch '1.29.1-conflicts' into 1.29.1-sync
Stolb27 Jul 26, 2023
718cd8b
Implement --report-dir option for gprestore (#30)
Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func DoBackup() {
gplog.Info("Gathering table state information")
metadataTables, dataTables := RetrieveAndProcessTables()
dataTables, numExtOrForeignTables := GetBackupDataSet(dataTables)
if len(dataTables) == 0 {
if len(dataTables) == 0 && !backupReport.MetadataOnly {
gplog.Warn("No tables in backup set contain data. Performing metadata-only backup instead.")
backupReport.MetadataOnly = true
}
Expand All @@ -141,26 +141,13 @@ func DoBackup() {
gplog.Info("Metadata will be written to %s", metadataFilename)
metadataFile := utils.NewFileWithByteCountFromFile(metadataFilename)

backupSessionGUC(metadataFile)
if !MustGetFlagBool(options.DATA_ONLY) {
isFullBackup := len(MustGetFlagStringArray(options.INCLUDE_RELATION)) == 0
if isFullBackup && !MustGetFlagBool(options.WITHOUT_GLOBALS) {
backupGlobals(metadataFile)
}

isFilteredBackup := !isFullBackup
backupPredata(metadataFile, metadataTables, isFilteredBackup)
backupPostdata(metadataFile)
}

/*
* We check this in the backup report rather than the flag because we
* perform a metadata only backup if the database contains no tables
* or only external tables
*/
backupSetTables := dataTables
if !backupReport.MetadataOnly {
backupSetTables := dataTables

targetBackupRestorePlan := make([]history.RestorePlanEntry, 0)
if targetBackupTimestamp != "" {
gplog.Info("Basing incremental backup off of backup with timestamp = %s", targetBackupTimestamp)
Expand All @@ -171,8 +158,37 @@ func DoBackup() {
}

backupReport.RestorePlan = PopulateRestorePlan(backupSetTables, targetBackupRestorePlan, dataTables)
}

// As soon as all necessary data is available, capture the backup into history database
if !MustGetFlagBool(options.NO_HISTORY) {
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.FatalOnError(err)
} else {
err = history.StoreBackupHistory(historyDB, &backupReport.BackupConfig)
historyDB.Close()
gplog.FatalOnError(err)
}
}

backupSessionGUC(metadataFile)
if !MustGetFlagBool(options.DATA_ONLY) {
isFullBackup := len(MustGetFlagStringArray(options.INCLUDE_RELATION)) == 0
if isFullBackup && !MustGetFlagBool(options.WITHOUT_GLOBALS) {
backupGlobals(metadataFile)
}

isFilteredBackup := !isFullBackup
backupPredata(metadataFile, metadataTables, isFilteredBackup)
backupPostdata(metadataFile)
}

if !backupReport.MetadataOnly {
backupData(backupSetTables)
}

printDataBackupWarnings(numExtOrForeignTables)
if MustGetFlagBool(options.WITH_STATS) {
backupStatistics(metadataTables)
Expand Down Expand Up @@ -290,7 +306,7 @@ func backupData(tables []Table) {
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes, true, false, 0, 0)
}
gplog.Info("Writing data to file")
rowsCopiedMaps := backupDataForAllTables(tables)
rowsCopiedMaps := BackupDataForAllTables(tables)
AddTableDataEntriesToTOC(tables, rowsCopiedMaps)
if MustGetFlagBool(options.SINGLE_DATA_FILE) && MustGetFlagString(options.PLUGIN_CONFIG) != "" {
pluginConfig.BackupSegmentTOCs(globalCluster, globalFPInfo)
Expand Down Expand Up @@ -382,7 +398,6 @@ func DoTeardown() {
if statErr != nil { // Even if this isn't os.IsNotExist, don't try to write a report file in case of further errors
return
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyFileLegacyName := globalFPInfo.GetBackupHistoryFilePath()
reportFilename := globalFPInfo.GetBackupReportFilePath()
configFilename := globalFPInfo.GetConfigFilePath()
Expand All @@ -396,24 +411,12 @@ func DoTeardown() {
}

if backupReport != nil {
if !backupFailed {
if backupFailed {
backupReport.BackupConfig.Status = history.BackupStatusFailed
} else {
backupReport.BackupConfig.Status = history.BackupStatusSucceed
}
backupReport.ConstructBackupParamsString()
backupReport.BackupConfig.SegmentCount = len(globalCluster.ContentIDs) - 1

if !MustGetFlagBool(options.NO_HISTORY) {
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.Error(fmt.Sprintf("%v", err))
} else {
err = history.StoreBackupHistory(historyDB, &backupReport.BackupConfig)
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("%v", err))
}
}
}

history.WriteConfigFile(&backupReport.BackupConfig, configFilename)
if backupReport.BackupConfig.EndTime == "" {
Expand Down Expand Up @@ -473,6 +476,31 @@ func DoCleanup(backupFailed bool) {
}
utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo)
}

// The gpbackup_history entry is written to the DB with an "In Progress" status very early
// on. If we get to cleanup and the backup succeeded, mark it as a success, otherwise mark
// it as a failure. Between our signal handler and recovering panics, there should be no
// way for gpbackup to exit that leaves the entry in the initial status.

if !MustGetFlagBool(options.NO_HISTORY) {
var statusString string
if backupFailed {
statusString = history.BackupStatusFailed
} else {
statusString = history.BackupStatusSucceed
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
} else {
_, err := historyDB.Exec(fmt.Sprintf("UPDATE backups SET status='%s' WHERE timestamp='%s'", statusString, globalFPInfo.Timestamp))
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
}
}
}
}
err := backupLockFile.Unlock()
if err != nil && backupLockFile != "" {
Expand Down
79 changes: 51 additions & 28 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters
* If synchronized snapshot is not supported and worker is unable to acquire a lock, the
* worker must be terminated because the session no longer has a valid distributed snapshot
*
* FIXME: Simplify backupDataForAllTables by having one function for snapshot workflow and
* FIXME: Simplify BackupDataForAllTables by having one function for snapshot workflow and
* another without, then extract common portions into their own functions.
*/
func backupDataForAllTables(tables []Table) []map[uint32]int64 {
func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
counters.ProgressBar.Start()
Expand All @@ -149,8 +149,8 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
*/
tasks := make(chan Table, len(tables))
var oidMap sync.Map
var isErroredBackup atomic.Bool
var workerPool sync.WaitGroup
var copyErr error
// Record and track tables in a hashmap of oids and table states (preloaded with value Unknown).
// The tables are loaded into the tasks channel for the subsequent goroutines to work on.
for _, table := range tables {
Expand All @@ -165,46 +165,51 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
* 3) Processes tables only in the event that the other workers encounter locking issues.
* Worker 0 already has all locks on the tables so it will not run into locking issues.
*/
panicChan := make(chan error)
rowsCopiedMaps[0] = make(map[uint32]int64)
for connNum := 1; connNum < connectionPool.NumConns; connNum++ {
rowsCopiedMaps[connNum] = make(map[uint32]int64)
workerPool.Add(1)
go func(whichConn int) {
defer func() {
if panicErr := recover(); panicErr != nil {
panicChan <- fmt.Errorf("%v", panicErr)
}
}()
defer workerPool.Done()
/* If the --leaf-partition-data flag is not set, the parent and all leaf
* partition data are treated as a single table and will be assigned to a single worker.
* Large partition hierarchies result in a large number of locks being held until the
* transaction commits and the locks are released.
*/
for table := range tasks {
if wasTerminated || copyErr != nil {
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
return
}
if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil {
err := SetSynchronizedSnapshot(connectionPool, whichConn, backupSnapshot)
if err != nil {
gplog.FatalOnError(err)
}
gplog.FatalOnError(err)
}
// If a random external SQL command had queued an AccessExclusiveLock acquisition request
// against this next table, the --job worker thread would deadlock on the COPY attempt.
// To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the
// relation with the NOWAIT option before we run COPY. If the LOCK TABLE NOWAIT call
// fails, we catch the error and defer the table to the main worker thread, worker 0.
// Afterwards, we break early and terminate the worker since its transaction is now in an
// aborted state. We do not need to do this with the main worker thread because it has
// already acquired AccessShareLocks on all tables before the metadata dumping part.
// If a random external SQL command had queued an AccessExclusiveLock acquisition
// request against this next table, the --job worker thread would deadlock on the
// COPY attempt. To prevent gpbackup from hanging, we attempt to acquire an
// AccessShareLock on the relation with the NOWAIT option before we run COPY. If
// the LOCK TABLE NOWAIT call fails, we catch the error and defer the table to the
// main worker thread, worker 0. Afterwards, if we are in a local transaction
// instead of a distributed snapshot, we break early and terminate the worker since
// its transaction is now in an aborted state. We do not need to do this with the
// main worker thread because it has already acquired AccessShareLocks on all
// tables before the metadata dumping part.
err := LockTableNoWait(table, whichConn)
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE {
copyErr = err
oidMap.Store(table.Oid, Deferred)
isErroredBackup.Store(true)
err = connectionPool.Rollback(whichConn)
if err != nil {
gplog.Warn("Worker %d: %s", whichConn, err)
}
continue
gplog.Fatal(fmt.Errorf("Unexpectedly unable to take lock on table %s, %s", table.FQN(), pgErr.Error()), "")
}
if gplog.GetVerbosity() < gplog.LOGVERBOSE {
// Add a newline to interrupt the progress bar so that
Expand All @@ -229,8 +234,11 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
copyErr = err
break
// if copy isn't working, skip remaining backups, and let downstream panic
// handling deal with it
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
isErroredBackup.Store(true)
gplog.Fatal(err, "")
} else {
oidMap.Store(table.Oid, Complete)
}
Expand All @@ -243,21 +251,31 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
}(connNum)
}

// Special goroutine to handle deferred tables
// Handle all tables deferred by the deadlock detection. This can only be
// done with the main worker thread, worker 0, because it has
// AccessShareLocks on all the tables already.
deferredWorkerDone := make(chan bool)
go func() {
defer func() {
if panicErr := recover(); panicErr != nil {
panicChan <- fmt.Errorf("%v", panicErr)
}
}()
for _, table := range tables {
for {
if wasTerminated || isErroredBackup.Load() {
return
}
state, _ := oidMap.Load(table.Oid)
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
copyErr = err
isErroredBackup.Store(true)
gplog.Fatal(err, "")
}
oidMap.Store(table.Oid, Complete)
break
Expand All @@ -270,14 +288,24 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
deferredWorkerDone <- true
}()

close(tasks)
workerPool.Wait()

// Allow panics to crash from the main process, invoking DoCleanup
select {
case err := <-panicChan:
gplog.Fatal(err, "")
default:
// no panic, nothing to do
}

// If not using synchronized snapshots,
// check if all workers were terminated due to lock issues.
if backupSnapshot == "" {
allWorkersTerminatedLogged := false
for _, table := range tables {
if wasTerminated || copyErr != nil {
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
break
}
Expand All @@ -295,12 +323,7 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
// Main goroutine waits for deferred worker 0 by waiting on this channel
<-deferredWorkerDone
agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if copyErr != nil && agentErr != nil {
gplog.Error(agentErr.Error())
gplog.Fatal(copyErr, "")
} else if copyErr != nil {
gplog.Fatal(copyErr, "")
} else if agentErr != nil {
if agentErr != nil {
gplog.Fatal(agentErr, "")
}

Expand Down
14 changes: 13 additions & 1 deletion backup/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
*/

/*
Table backup state constants
Table backup state constants
*/
const (
Unknown int = iota
Expand Down Expand Up @@ -80,6 +80,18 @@ func SetFPInfo(fpInfo filepath.FilePathInfo) {
globalFPInfo = fpInfo
}

func GetFPInfo() filepath.FilePathInfo {
return globalFPInfo
}

func SetBackupSnapshot(snapshot string) {
backupSnapshot = snapshot
}

func GetBackupSnapshot() string {
return backupSnapshot
}

func SetPluginConfig(config *utils.PluginConfig) {
pluginConfig = config
}
Expand Down
5 changes: 3 additions & 2 deletions backup/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ func GetLatestMatchingBackupConfig(historyDBPath string, currentBackupConfig *hi
historyDB, _ := history.InitializeHistoryDatabase(historyDBPath)

whereClause := fmt.Sprintf(`backup_dir = '%s' AND database_name = '%s' AND leaf_partition_data = %v
AND plugin = '%s' AND single_data_file = %v AND compressed = %v AND date_deleted = ''`,
AND plugin = '%s' AND single_data_file = %v AND compressed = %v AND date_deleted = '' AND status = '%s'`,
MustGetFlagString(options.BACKUP_DIR),
currentBackupConfig.DatabaseName,
MustGetFlagBool(options.LEAF_PARTITION_DATA),
currentBackupConfig.Plugin,
MustGetFlagBool(options.SINGLE_DATA_FILE),
currentBackupConfig.Compressed)
currentBackupConfig.Compressed,
history.BackupStatusSucceed)

getBackupTimetampsQuery := fmt.Sprintf(`
SELECT timestamp
Expand Down
3 changes: 3 additions & 0 deletions backup/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test1",
Timestamp: "timestamp3",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand All @@ -104,6 +105,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test2",
Timestamp: "timestamp2",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand All @@ -113,6 +115,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test1",
Timestamp: "timestamp1",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand Down
Loading
Loading