Skip to content

Commit

Permalink
Merge pull request #33 from arenadata/1.29.1-sync
Browse files Browse the repository at this point in the history
Sync 1.29.1 changes
  • Loading branch information
Stolb27 authored Jul 27, 2023
2 parents f702d5f + 718cd8b commit 29c117b
Show file tree
Hide file tree
Showing 66 changed files with 1,948 additions and 552 deletions.
92 changes: 60 additions & 32 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func DoBackup() {
gplog.Info("Gathering table state information")
metadataTables, dataTables := RetrieveAndProcessTables()
dataTables, numExtOrForeignTables := GetBackupDataSet(dataTables)
if len(dataTables) == 0 {
if len(dataTables) == 0 && !backupReport.MetadataOnly {
gplog.Warn("No tables in backup set contain data. Performing metadata-only backup instead.")
backupReport.MetadataOnly = true
}
Expand All @@ -141,26 +141,13 @@ func DoBackup() {
gplog.Info("Metadata will be written to %s", metadataFilename)
metadataFile := utils.NewFileWithByteCountFromFile(metadataFilename)

backupSessionGUC(metadataFile)
if !MustGetFlagBool(options.DATA_ONLY) {
isFullBackup := len(MustGetFlagStringArray(options.INCLUDE_RELATION)) == 0
if isFullBackup && !MustGetFlagBool(options.WITHOUT_GLOBALS) {
backupGlobals(metadataFile)
}

isFilteredBackup := !isFullBackup
backupPredata(metadataFile, metadataTables, isFilteredBackup)
backupPostdata(metadataFile)
}

/*
* We check this in the backup report rather than the flag because we
* perform a metadata only backup if the database contains no tables
* or only external tables
*/
backupSetTables := dataTables
if !backupReport.MetadataOnly {
backupSetTables := dataTables

targetBackupRestorePlan := make([]history.RestorePlanEntry, 0)
if targetBackupTimestamp != "" {
gplog.Info("Basing incremental backup off of backup with timestamp = %s", targetBackupTimestamp)
Expand All @@ -171,8 +158,37 @@ func DoBackup() {
}

backupReport.RestorePlan = PopulateRestorePlan(backupSetTables, targetBackupRestorePlan, dataTables)
}

// As soon as all necessary data is available, capture the backup into history database
if !MustGetFlagBool(options.NO_HISTORY) {
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.FatalOnError(err)
} else {
err = history.StoreBackupHistory(historyDB, &backupReport.BackupConfig)
historyDB.Close()
gplog.FatalOnError(err)
}
}

backupSessionGUC(metadataFile)
if !MustGetFlagBool(options.DATA_ONLY) {
isFullBackup := len(MustGetFlagStringArray(options.INCLUDE_RELATION)) == 0
if isFullBackup && !MustGetFlagBool(options.WITHOUT_GLOBALS) {
backupGlobals(metadataFile)
}

isFilteredBackup := !isFullBackup
backupPredata(metadataFile, metadataTables, isFilteredBackup)
backupPostdata(metadataFile)
}

if !backupReport.MetadataOnly {
backupData(backupSetTables)
}

printDataBackupWarnings(numExtOrForeignTables)
if MustGetFlagBool(options.WITH_STATS) {
backupStatistics(metadataTables)
Expand Down Expand Up @@ -290,7 +306,7 @@ func backupData(tables []Table) {
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes, true, false, 0, 0)
}
gplog.Info("Writing data to file")
rowsCopiedMaps := backupDataForAllTables(tables)
rowsCopiedMaps := BackupDataForAllTables(tables)
AddTableDataEntriesToTOC(tables, rowsCopiedMaps)
if MustGetFlagBool(options.SINGLE_DATA_FILE) && MustGetFlagString(options.PLUGIN_CONFIG) != "" {
pluginConfig.BackupSegmentTOCs(globalCluster, globalFPInfo)
Expand Down Expand Up @@ -382,7 +398,6 @@ func DoTeardown() {
if statErr != nil { // Even if this isn't os.IsNotExist, don't try to write a report file in case of further errors
return
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyFileLegacyName := globalFPInfo.GetBackupHistoryFilePath()
reportFilename := globalFPInfo.GetBackupReportFilePath()
configFilename := globalFPInfo.GetConfigFilePath()
Expand All @@ -396,24 +411,12 @@ func DoTeardown() {
}

if backupReport != nil {
if !backupFailed {
if backupFailed {
backupReport.BackupConfig.Status = history.BackupStatusFailed
} else {
backupReport.BackupConfig.Status = history.BackupStatusSucceed
}
backupReport.ConstructBackupParamsString()
backupReport.BackupConfig.SegmentCount = len(globalCluster.ContentIDs) - 1

if !MustGetFlagBool(options.NO_HISTORY) {
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.Error(fmt.Sprintf("%v", err))
} else {
err = history.StoreBackupHistory(historyDB, &backupReport.BackupConfig)
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("%v", err))
}
}
}

history.WriteConfigFile(&backupReport.BackupConfig, configFilename)
if backupReport.BackupConfig.EndTime == "" {
Expand Down Expand Up @@ -473,6 +476,31 @@ func DoCleanup(backupFailed bool) {
}
utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo)
}

// The gpbackup_history entry is written to the DB with an "In Progress" status very early
// on. If we get to cleanup and the backup succeeded, mark it as a success, otherwise mark
// it as a failure. Between our signal handler and recovering panics, there should be no
// way for gpbackup to exit that leaves the entry in the initial status.

if !MustGetFlagBool(options.NO_HISTORY) {
var statusString string
if backupFailed {
statusString = history.BackupStatusFailed
} else {
statusString = history.BackupStatusSucceed
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
} else {
_, err := historyDB.Exec(fmt.Sprintf("UPDATE backups SET status='%s' WHERE timestamp='%s'", statusString, globalFPInfo.Timestamp))
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
}
}
}
}
err := backupLockFile.Unlock()
if err != nil && backupLockFile != "" {
Expand Down
79 changes: 51 additions & 28 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters
* If synchronized snapshot is not supported and worker is unable to acquire a lock, the
* worker must be terminated because the session no longer has a valid distributed snapshot
*
* FIXME: Simplify backupDataForAllTables by having one function for snapshot workflow and
* FIXME: Simplify BackupDataForAllTables by having one function for snapshot workflow and
* another without, then extract common portions into their own functions.
*/
func backupDataForAllTables(tables []Table) []map[uint32]int64 {
func BackupDataForAllTables(tables []Table) []map[uint32]int64 {
counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
counters.ProgressBar.Start()
Expand All @@ -149,8 +149,8 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
*/
tasks := make(chan Table, len(tables))
var oidMap sync.Map
var isErroredBackup atomic.Bool
var workerPool sync.WaitGroup
var copyErr error
// Record and track tables in a hashmap of oids and table states (preloaded with value Unknown).
// The tables are loaded into the tasks channel for the subsequent goroutines to work on.
for _, table := range tables {
Expand All @@ -165,46 +165,51 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
* 3) Processes tables only in the event that the other workers encounter locking issues.
* Worker 0 already has all locks on the tables so it will not run into locking issues.
*/
panicChan := make(chan error)
rowsCopiedMaps[0] = make(map[uint32]int64)
for connNum := 1; connNum < connectionPool.NumConns; connNum++ {
rowsCopiedMaps[connNum] = make(map[uint32]int64)
workerPool.Add(1)
go func(whichConn int) {
defer func() {
if panicErr := recover(); panicErr != nil {
panicChan <- fmt.Errorf("%v", panicErr)
}
}()
defer workerPool.Done()
/* If the --leaf-partition-data flag is not set, the parent and all leaf
* partition data are treated as a single table and will be assigned to a single worker.
* Large partition hierarchies result in a large number of locks being held until the
* transaction commits and the locks are released.
*/
for table := range tasks {
if wasTerminated || copyErr != nil {
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
return
}
if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil {
err := SetSynchronizedSnapshot(connectionPool, whichConn, backupSnapshot)
if err != nil {
gplog.FatalOnError(err)
}
gplog.FatalOnError(err)
}
// If a random external SQL command had queued an AccessExclusiveLock acquisition request
// against this next table, the --job worker thread would deadlock on the COPY attempt.
// To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the
// relation with the NOWAIT option before we run COPY. If the LOCK TABLE NOWAIT call
// fails, we catch the error and defer the table to the main worker thread, worker 0.
// Afterwards, we break early and terminate the worker since its transaction is now in an
// aborted state. We do not need to do this with the main worker thread because it has
// already acquired AccessShareLocks on all tables before the metadata dumping part.
// If a random external SQL command had queued an AccessExclusiveLock acquisition
// request against this next table, the --job worker thread would deadlock on the
// COPY attempt. To prevent gpbackup from hanging, we attempt to acquire an
// AccessShareLock on the relation with the NOWAIT option before we run COPY. If
// the LOCK TABLE NOWAIT call fails, we catch the error and defer the table to the
// main worker thread, worker 0. Afterwards, if we are in a local transaction
// instead of a distributed snapshot, we break early and terminate the worker since
// its transaction is now in an aborted state. We do not need to do this with the
// main worker thread because it has already acquired AccessShareLocks on all
// tables before the metadata dumping part.
err := LockTableNoWait(table, whichConn)
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE {
copyErr = err
oidMap.Store(table.Oid, Deferred)
isErroredBackup.Store(true)
err = connectionPool.Rollback(whichConn)
if err != nil {
gplog.Warn("Worker %d: %s", whichConn, err)
}
continue
gplog.Fatal(fmt.Errorf("Unexpectedly unable to take lock on table %s, %s", table.FQN(), pgErr.Error()), "")
}
if gplog.GetVerbosity() < gplog.LOGVERBOSE {
// Add a newline to interrupt the progress bar so that
Expand All @@ -229,8 +234,11 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
copyErr = err
break
// if copy isn't working, skip remaining backups, and let downstream panic
// handling deal with it
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
isErroredBackup.Store(true)
gplog.Fatal(err, "")
} else {
oidMap.Store(table.Oid, Complete)
}
Expand All @@ -243,21 +251,31 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
}(connNum)
}

// Special goroutine to handle deferred tables
// Handle all tables deferred by the deadlock detection. This can only be
// done with the main worker thread, worker 0, because it has
// AccessShareLocks on all the tables already.
deferredWorkerDone := make(chan bool)
go func() {
defer func() {
if panicErr := recover(); panicErr != nil {
panicChan <- fmt.Errorf("%v", panicErr)
}
}()
for _, table := range tables {
for {
if wasTerminated || isErroredBackup.Load() {
return
}
state, _ := oidMap.Load(table.Oid)
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
copyErr = err
isErroredBackup.Store(true)
gplog.Fatal(err, "")
}
oidMap.Store(table.Oid, Complete)
break
Expand All @@ -270,14 +288,24 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
}
deferredWorkerDone <- true
}()

close(tasks)
workerPool.Wait()

// Allow panics to crash from the main process, invoking DoCleanup
select {
case err := <-panicChan:
gplog.Fatal(err, "")
default:
// no panic, nothing to do
}

// If not using synchronized snapshots,
// check if all workers were terminated due to lock issues.
if backupSnapshot == "" {
allWorkersTerminatedLogged := false
for _, table := range tables {
if wasTerminated || copyErr != nil {
if wasTerminated || isErroredBackup.Load() {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
break
}
Expand All @@ -295,12 +323,7 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
// Main goroutine waits for deferred worker 0 by waiting on this channel
<-deferredWorkerDone
agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if copyErr != nil && agentErr != nil {
gplog.Error(agentErr.Error())
gplog.Fatal(copyErr, "")
} else if copyErr != nil {
gplog.Fatal(copyErr, "")
} else if agentErr != nil {
if agentErr != nil {
gplog.Fatal(agentErr, "")
}

Expand Down
14 changes: 13 additions & 1 deletion backup/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
*/

/*
Table backup state constants
Table backup state constants
*/
const (
Unknown int = iota
Expand Down Expand Up @@ -80,6 +80,18 @@ func SetFPInfo(fpInfo filepath.FilePathInfo) {
globalFPInfo = fpInfo
}

func GetFPInfo() filepath.FilePathInfo {
return globalFPInfo
}

func SetBackupSnapshot(snapshot string) {
backupSnapshot = snapshot
}

func GetBackupSnapshot() string {
return backupSnapshot
}

func SetPluginConfig(config *utils.PluginConfig) {
pluginConfig = config
}
Expand Down
5 changes: 3 additions & 2 deletions backup/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ func GetLatestMatchingBackupConfig(historyDBPath string, currentBackupConfig *hi
historyDB, _ := history.InitializeHistoryDatabase(historyDBPath)

whereClause := fmt.Sprintf(`backup_dir = '%s' AND database_name = '%s' AND leaf_partition_data = %v
AND plugin = '%s' AND single_data_file = %v AND compressed = %v AND date_deleted = ''`,
AND plugin = '%s' AND single_data_file = %v AND compressed = %v AND date_deleted = '' AND status = '%s'`,
MustGetFlagString(options.BACKUP_DIR),
currentBackupConfig.DatabaseName,
MustGetFlagBool(options.LEAF_PARTITION_DATA),
currentBackupConfig.Plugin,
MustGetFlagBool(options.SINGLE_DATA_FILE),
currentBackupConfig.Compressed)
currentBackupConfig.Compressed,
history.BackupStatusSucceed)

getBackupTimetampsQuery := fmt.Sprintf(`
SELECT timestamp
Expand Down
3 changes: 3 additions & 0 deletions backup/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test1",
Timestamp: "timestamp3",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand All @@ -104,6 +105,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test2",
Timestamp: "timestamp2",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand All @@ -113,6 +115,7 @@ var _ = Describe("backup/incremental tests", func() {
{
DatabaseName: "test1",
Timestamp: "timestamp1",
Status: history.BackupStatusSucceed,
ExcludeRelations: []string{},
ExcludeSchemas: []string{},
IncludeRelations: []string{},
Expand Down
Loading

0 comments on commit 29c117b

Please sign in to comment.