Skip to content

Commit

Permalink
Merge pull request #17 from arenadata/1.23.0-sync
Browse files Browse the repository at this point in the history
ADBDEV-2272 1.23.0 sync
  • Loading branch information
deart2k authored Dec 14, 2021
2 parents aee5be9 + 39cc1e8 commit a00e65e
Show file tree
Hide file tree
Showing 22 changed files with 779 additions and 83 deletions.
27 changes: 22 additions & 5 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func backupData(tables []Table) {
gplog.Info("Data backup complete")
return
}

if MustGetFlagBool(options.SINGLE_DATA_FILE) {
gplog.Verbose("Initializing pipes and gpbackup_helper on segments for single data file backup")
utils.VerifyHelperVersionOnSegments(version, globalCluster)
Expand All @@ -274,22 +273,26 @@ func backupData(tables []Table) {
oidList = append(oidList, fmt.Sprintf("%d", table.Oid))
}
utils.WriteOidListToSegments(oidList, globalCluster, globalFPInfo)
utils.CreateFirstSegmentPipeOnAllHosts(oidList[0], globalCluster, globalFPInfo)
compressStr := fmt.Sprintf(" --compression-level %d --compression-type %s", MustGetFlagInt(options.COMPRESSION_LEVEL), MustGetFlagString(options.COMPRESSION_TYPE))
if MustGetFlagBool(options.NO_COMPRESSION) {
compressStr = " --compression-level 0"
}
initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, globalFPInfo)
// Do not pass through the --on-error-continue flag because it does not apply to gpbackup
utils.StartGpbackupHelpers(globalCluster, globalFPInfo, "--backup-agent",
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated)
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes)
}
gplog.Info("Writing data to file")
rowsCopiedMaps := backupDataForAllTables(tables)
var rowsCopiedMaps []map[uint32]int64
if FlagChanged(options.COPY_QUEUE_SIZE) {
rowsCopiedMaps = backupDataForAllTablesCopyQueue(tables)
} else {
rowsCopiedMaps = backupDataForAllTables(tables)
}
AddTableDataEntriesToTOC(tables, rowsCopiedMaps)
if MustGetFlagBool(options.SINGLE_DATA_FILE) && MustGetFlagString(options.PLUGIN_CONFIG) != "" {
pluginConfig.BackupSegmentTOCs(globalCluster, globalFPInfo)
}

logCompletionMessage("Data backup")
}

Expand Down Expand Up @@ -505,3 +508,17 @@ func logCompletionMessage(msg string) {
gplog.Info("%s complete", msg)
}
}

func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionPool *dbconn.DBConn, fpInfo filepath.FilePathInfo) int {
// Create min(connections, tables) segment pipes on each host
var maxPipes int
if connectionPool.NumConns < len(oidList) {
maxPipes = connectionPool.NumConns
} else {
maxPipes = len(oidList)
}
for i := 0; i < maxPipes; i++ {
utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo)
}
return maxPipes
}
155 changes: 149 additions & 6 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package backup
*/

import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/greenplum-db/gp-common-go-libs/dbconn"
"github.com/greenplum-db/gp-common-go-libs/gplog"
Expand Down Expand Up @@ -85,13 +87,14 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
}

func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
atomic.AddInt64(&counters.NumRegTables, 1)
numTables := counters.NumRegTables //We save this so it won't be modified before we log it
logMessage := fmt.Sprintf("Worker %d: Writing data for table %s to file", whichConn, table.FQN())
// Avoid race condition by incrementing counters in call to sprintf
tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&counters.NumRegTables, 1), counters.TotalRegTables)
if gplog.GetVerbosity() > gplog.LOGINFO {
// No progress bar at this log level, so we note table count here
gplog.Verbose("Writing data for table %s to file (table %d of %d)", table.FQN(), numTables, counters.TotalRegTables)
gplog.Verbose(logMessage + tableCount)
} else {
gplog.Verbose("Writing data for table %s to file", table.FQN())
gplog.Verbose(logMessage)
}

destinationToWrite := ""
Expand All @@ -109,6 +112,147 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters
return nil
}

// backupDataForAllTablesCopyQueue does not backup tables in parallel. This is
// specifically for single-data-file. While tables backed up one at a time, it
// is possible to save a significant amount of time by queuing up the next
// table to copy. Worker 0 does not have tables pre-assigned to it. The copy
// queue uses worker 0 a special deferred worker in the event that the other
// workers encounter locking issues. Worker 0 already has all locks on the
// tables so it will not run into locking issues.
func backupDataForAllTablesCopyQueue(tables []Table) []map[uint32]int64 {
counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
counters.ProgressBar.Start()
rowsCopiedMaps := make([]map[uint32]int64, connectionPool.NumConns)
/*
* We break when an interrupt is received and rely on
* TerminateHangingCopySessions to kill any COPY statements
* in progress if they don't finish on their own.
*/
tasks := make(chan Table, len(tables))
var oidMap sync.Map
var workerPool sync.WaitGroup
var copyErr error
// Record and track tables in a hashmap of oids and table states (preloaded with value Unknown).
// The tables are loaded into the tasks channel for the subsequent goroutines to work on.
for _, table := range tables {
oidMap.Store(table.Oid, Unknown)
tasks <- table
}
// We incremented numConns by 1 to treat connNum 0 as a special worker
rowsCopiedMaps[0] = make(map[uint32]int64)
for connNum := 1; connNum < connectionPool.NumConns; connNum++ {
rowsCopiedMaps[connNum] = make(map[uint32]int64)
workerPool.Add(1)
go func(whichConn int) {
defer workerPool.Done()
for table := range tasks {
if wasTerminated || copyErr != nil {
counters.ProgressBar.(*pb.ProgressBar).NotPrint = true
return
}

// If a random external SQL command had queued an AccessExclusiveLock acquisition request
// against this next table, the --job worker thread would deadlock on the COPY attempt.
// To prevent gpbackup from hanging, we attempt to acquire an AccessShareLock on the
// relation with the NOWAIT option before we run COPY. If the LOCK TABLE NOWAIT call
// fails, we catch the error and defer the table to the main worker thread, worker 0.
// Afterwards, we break early and terminate the worker since its transaction is now in an
// aborted state. We do not need to do this with the main worker thread because it has
// already acquired AccessShareLocks on all tables before the metadata dumping part.
err := LockTableNoWait(table, whichConn)
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE {
copyErr = err
continue
}

if gplog.GetVerbosity() < gplog.LOGVERBOSE {
// Add a newline to interrupt the progress bar so that
// the following WARN message is nicely outputted.
fmt.Printf("\n")
}
gplog.Warn("Worker %d could not acquire AccessShareLock for table %s. Terminating worker and deferring table to main worker thread.",
whichConn, table.FQN())

oidMap.Store(table.Oid, Deferred)
// Rollback transaction since it's in an aborted state
connectionPool.MustRollback(whichConn)

// Worker no longer has a valid distributed transaction snapshot
break
}

err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn)
if err != nil {
copyErr = err
}
oidMap.Store(table.Oid, Complete)
}
}(connNum)
}

// Special goroutine to handle deferred tables
// Handle all tables deferred by the deadlock detection. This can only be
// done with the main worker thread, worker 0, because it has
// AccessShareLocks on all the tables already.
deferredWorkerDone := make(chan bool)
go func() {
for _, table := range tables {
for {
state, _ := oidMap.Load(table.Oid)
if state.(int) == Unknown {
time.Sleep(time.Millisecond * 50)
} else if state.(int) == Deferred {
err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0)
if err != nil {
copyErr = err
}
oidMap.Store(table.Oid, Complete)
break
} else if state.(int) == Complete {
break
} else {
gplog.Fatal(errors.New("Encountered unknown table state"), "")
}
}
}
deferredWorkerDone <- true
}()

close(tasks)
workerPool.Wait()

// Check if all the workers were terminated. If they did, defer all remaining tables to worker 0
allWorkersTerminatedLogged := false
for _, table := range tables {
state, _ := oidMap.Load(table.Oid)
if state == Unknown {
if !allWorkersTerminatedLogged {
gplog.Warn("All copy queue workers terminated due to lock issues. Falling back to single main worker.")
allWorkersTerminatedLogged = true
}
oidMap.Store(table.Oid, Deferred)
}
}
// Main goroutine waits for deferred worker 0 by waiting on this channel
<-deferredWorkerDone

agentErr := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)

if copyErr != nil && agentErr != nil {
gplog.Error(agentErr.Error())
gplog.Fatal(copyErr, "")
} else if copyErr != nil {
gplog.Fatal(copyErr, "")
} else if agentErr != nil {
gplog.Fatal(agentErr, "")
}

counters.ProgressBar.Finish()
return rowsCopiedMaps
}

func backupDataForAllTables(tables []Table) []map[uint32]int64 {
counters := BackupProgressCounters{NumRegTables: 0, TotalRegTables: int64(len(tables))}
counters.ProgressBar = utils.NewProgressBar(int(counters.TotalRegTables), "Tables backed up: ", utils.PB_INFO)
Expand Down Expand Up @@ -146,8 +290,7 @@ func backupDataForAllTables(tables []Table) []map[uint32]int64 {
if whichConn != 0 {
err := LockTableNoWait(table, whichConn)
if err != nil {
// Postgres Error Code 55P03 translates to LOCK_NOT_AVAILABLE
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != "55P03" {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code != PG_LOCK_NOT_AVAILABLE {
copyErr = err
continue
}
Expand Down
14 changes: 14 additions & 0 deletions backup/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ import (
* used in testing.
*/

/*
Table backup state constants
*/
const (
Unknown int = iota
Deferred
Complete
PG_LOCK_NOT_AVAILABLE = "55P03"
)

/*
* Non-flag variables
*/
Expand Down Expand Up @@ -99,6 +109,10 @@ func SetQuotedRoleNames(quotedRoles map[string]string) {

// Util functions to enable ease of access to global flag values

func FlagChanged(flagName string) bool {
return cmdFlags.Changed(flagName)
}

func MustGetFlagString(flagName string) string {
return options.MustGetFlagString(cmdFlags, flagName)
}
Expand Down
7 changes: 7 additions & 0 deletions backup/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func validateFlagCombinations(flags *pflag.FlagSet) {
options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_TYPE)
options.CheckExclusiveFlags(flags, options.NO_COMPRESSION, options.COMPRESSION_LEVEL)
options.CheckExclusiveFlags(flags, options.PLUGIN_CONFIG, options.BACKUP_DIR)
if FlagChanged(options.COPY_QUEUE_SIZE) && !MustGetFlagBool(options.SINGLE_DATA_FILE) {
gplog.Fatal(errors.Errorf("--copy-queue-size must be specified with --single-data-file"), "")
}
if MustGetFlagString(options.FROM_TIMESTAMP) != "" && !MustGetFlagBool(options.INCREMENTAL) {
gplog.Fatal(errors.Errorf("--from-timestamp must be specified with --incremental"), "")
}
Expand All @@ -124,6 +127,10 @@ func validateFlagValues() {
gplog.Fatal(errors.Errorf("Timestamp %s is invalid. Timestamps must be in the format YYYYMMDDHHMMSS.",
MustGetFlagString(options.FROM_TIMESTAMP)), "")
}
if FlagChanged(options.COPY_QUEUE_SIZE) && MustGetFlagInt(options.COPY_QUEUE_SIZE) < 2 {
gplog.Fatal(errors.Errorf("--copy-queue-size %d is invalid. Must be at least 2",
MustGetFlagInt(options.COPY_QUEUE_SIZE)), "")
}
}

func validateFromTimestamp(fromTimestamp string) {
Expand Down
13 changes: 12 additions & 1 deletion backup/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,18 @@ func SetLoggerVerbosity() {

func initializeConnectionPool(timestamp string) {
connectionPool = dbconn.NewDBConnFromEnvironment(MustGetFlagString(options.DBNAME))
connectionPool.MustConnect(MustGetFlagInt(options.JOBS))
var numConns int
switch true {
case FlagChanged(options.COPY_QUEUE_SIZE):
// Connection 0 is reserved for deferred worker, initialize 1 additional connection.
numConns = MustGetFlagInt(options.COPY_QUEUE_SIZE) + 1
case FlagChanged(options.JOBS):
numConns = MustGetFlagInt(options.JOBS)
default:
numConns = 1
}
gplog.Verbose(fmt.Sprintf("Initializing %d worker connections", numConns))
connectionPool.MustConnect(numConns)
utils.ValidateGPDBVersionCompatibility(connectionPool)
InitializeMetadataParams(connectionPool)
for connNum := 0; connNum < connectionPool.NumConns; connNum++ {
Expand Down
33 changes: 33 additions & 0 deletions ci/scripts/scale-tests.bash
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ set -e
### Data scale tests ###
log_file=/tmp/gpbackup.log
echo "## Populating database for copy queue test ##"
createdb copyqueuedb
for j in {1..20000}
do
psql -d copyqueuedb -q -c "CREATE TABLE tbl_1k_\$j(i int) DISTRIBUTED BY (i);"
psql -d copyqueuedb -q -c "INSERT INTO tbl_1k_\$j SELECT generate_series(1,1000)"
done
echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 backup for copy queue test ##"
time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 2 | tee "\$log_file"
timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}")
gpbackup_manager display-report \$timestamp
echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 backup for copy queue test ##"
time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 4 | tee "\$log_file"
timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}")
gpbackup_manager display-report \$timestamp
echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 backup for copy queue test ##"
time gpbackup --dbname copyqueuedb --backup-dir /data/gpdata/ --single-data-file --no-compression --copy-queue-size 8 | tee "\$log_file"
timestamp=\$(head -10 "\$log_file" | grep "Backup Timestamp " | grep -Eo "[[:digit:]]{14}")
gpbackup_manager display-report \$timestamp
echo "## Performing single-data-file, --no-compression, --copy-queue-size 2 restore for copy queue test ##"
time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore2 --copy-queue-size 2
echo "## Performing single-data-file, --no-compression, --copy-queue-size 4 restore for copy queue test ##"
time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore4 --copy-queue-size 4
echo "## Performing single-data-file, --no-compression, --copy-queue-size 8 restore for copy queue test ##"
time gprestore --timestamp "\$timestamp" --backup-dir /data/gpdata/ --create-db --redirect-db copyqueuerestore8 --copy-queue-size 8
echo "## Populating database for data scale test ##"
createdb datascaledb
for j in {1..5000}
Expand Down
Loading

0 comments on commit a00e65e

Please sign in to comment.