Skip to content

Commit

Permalink
Merge branch 'master' into ADBDEV-5861
Browse files Browse the repository at this point in the history
  • Loading branch information
whitehawk committed Aug 4, 2024
2 parents 12215a8 + 54089ed commit db5968c
Show file tree
Hide file tree
Showing 118 changed files with 2,564 additions and 3,186 deletions.
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ depend :
go mod download

$(GINKGO) :
go install github.com/onsi/ginkgo/v2/ginkgo@v2.8.4
go install github.com/onsi/ginkgo/v2/ginkgo

$(GOIMPORTS) :
go install golang.org/x/tools/cmd/goimports@latest
Expand All @@ -63,7 +63,6 @@ unit : $(GINKGO)
ginkgo $(GINKGO_FLAGS) $(SUBDIRS_HAS_UNIT) 2>&1

unit_all_gpdb_versions : $(GINKGO)
TEST_GPDB_VERSION=4.3.999 ginkgo $(GINKGO_FLAGS) $(SUBDIRS_HAS_UNIT) 2>&1
TEST_GPDB_VERSION=5.999.0 ginkgo $(GINKGO_FLAGS) $(SUBDIRS_HAS_UNIT) 2>&1
TEST_GPDB_VERSION=6.999.0 ginkgo $(GINKGO_FLAGS) $(SUBDIRS_HAS_UNIT) 2>&1
TEST_GPDB_VERSION=7.999.0 ginkgo $(GINKGO_FLAGS) $(SUBDIRS_HAS_UNIT) 2>&1 # GPDB main
Expand Down Expand Up @@ -142,7 +141,7 @@ info-report:
test-s3-local: build install
${PWD}/plugins/generate_minio_config.sh
mkdir -p /tmp/minio/gpbackup-s3-test
docker run -d --name s3-minio -p 9000:9000 -p 9001:9001 -v /tmp/minio:/data/minio quay.io/minio/minio server /data/minio --console-address ":9001"
docker run -d --name s3-minio --memory="2g" -p 9000:9000 -p 9001:9001 -v /tmp/minio:/data/minio quay.io/minio/minio server /data/minio --console-address ":9001"
sleep 2 # Wait for minio server to start up
${PWD}/plugins/plugin_test.sh $(BIN_DIR)/gpbackup_s3_plugin /tmp/minio_config.yaml
docker stop s3-minio
Expand Down
87 changes: 52 additions & 35 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ func backupPredata(metadataFile *utils.FileWithByteCount, tables []Table, tableO
}

func backupData(tables []Table) {
if wasTerminated {
return
}
if len(tables) == 0 {
// No incremental data changes to backup
gplog.Info("No tables to backup")
Expand All @@ -311,7 +314,7 @@ func backupData(tables []Table) {
initialPipes := CreateInitialSegmentPipes(oidList, globalCluster, connectionPool, globalFPInfo)
// Do not pass through the --on-error-continue flag or the resizeClusterMap because neither apply to gpbackup
utils.StartGpbackupHelpers(globalCluster, globalFPInfo, "--backup-agent",
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes, true, false, 0, 0)
MustGetFlagString(options.PLUGIN_CONFIG), compressStr, false, false, &wasTerminated, initialPipes, true, false, 0, 0, gplog.GetVerbosity())
}
gplog.Info("Writing data to file")
rowsCopiedMaps := BackupDataForAllTables(tables)
Expand Down Expand Up @@ -361,7 +364,12 @@ func backupStatistics(tables []Table) {
func DoTeardown() {
backupFailed := false
defer func() {
DoCleanup(backupFailed)
// If the backup was terminated, the signal handler will handle cleanup
if wasTerminated {
CleanupGroup.Wait()
} else {
DoCleanup(backupFailed)
}

errorCode := gplog.GetErrorCode()
if errorCode == 0 {
Expand Down Expand Up @@ -453,60 +461,69 @@ func DoTeardown() {
}

func DoCleanup(backupFailed bool) {
cleanupTimeout := 60 * time.Second

defer func() {
if err := recover(); err != nil {
gplog.Warn("Encountered error during cleanup: %v", err)
}
if connectionPool != nil {
connectionPool.Close()
}
gplog.Verbose("Cleanup complete")
gplog.Info("Cleanup complete")
CleanupGroup.Done()
}()

gplog.Verbose("Beginning cleanup")
gplog.Info("Beginning cleanup")
if connectionPool != nil {
cancelBlockedQueries(globalFPInfo.Timestamp)
}
if globalFPInfo.Timestamp != "" {
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
// Copy sessions must be terminated before cleaning up gpbackup_helper processes to avoid a potential deadlock
// If the terminate query is sent via a connection with an active COPY command, and the COPY's pipe is cleaned up, the COPY query will hang.
// This results in the DoCleanup function passed to the signal handler to never return, blocking the os.Exit call
if wasTerminated {
// It is possible for the COPY command to become orphaned if an agent process is stopped
utils.TerminateHangingCopySessions(connectionPool, globalFPInfo, fmt.Sprintf("gpbackup_%s", globalFPInfo.Timestamp))
if globalFPInfo.Timestamp != "" && MustGetFlagBool(options.SINGLE_DATA_FILE) {
// Copy sessions must be terminated before cleaning up gpbackup_helper processes to avoid a potential deadlock
// If the terminate query is sent via a connection with an active COPY command, and the COPY's pipe is cleaned up, the COPY query will hang.
// This results in the DoCleanup function passed to the signal handler to never return, blocking the os.Exit call

// All COPY commands should end on their own for a successful restore, however we cleanup any hanging COPY sessions here as a precaution
utils.TerminateHangingCopySessions(globalFPInfo, fmt.Sprintf("gpbackup_%s", globalFPInfo.Timestamp), cleanupTimeout, 5*time.Second)

// Ensure we don't leave anything behind on the segments
utils.CleanUpSegmentHelperProcesses(globalCluster, globalFPInfo, "backup", cleanupTimeout)
utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo, cleanupTimeout)

// Check gpbackup_helper errors here if backup was terminated
if wasTerminated {
err := utils.CheckAgentErrorsOnSegments(globalCluster, globalFPInfo)
if err != nil {
gplog.Error(err.Error())
}
// We can have helper processes hanging around even without failures, so call this cleanup routine whether successful or not.
utils.CleanUpSegmentHelperProcesses(globalCluster, globalFPInfo, "backup")
utils.CleanUpHelperFilesOnAllHosts(globalCluster, globalFPInfo)
}
}

// The gpbackup_history entry is written to the DB with an "In Progress" status and a preliminary EndTime value
// very early on. If we get to cleanup and the backup succeeded, mark it as a success, otherwise mark it as a
// failure; in either case, update the end time to the actual value. Between our signal handler and recovering
// panics, there should be no way for gpbackup to exit that leaves the entry in the initial status.
// The gpbackup_history entry is written to the DB with an "In Progress" status and a preliminary EndTime value
// very early on. If we get to cleanup and the backup succeeded, mark it as a success, otherwise mark it as a
// failure; in either case, update the end time to the actual value. Between our signal handler and recovering
// panics, there should be no way for gpbackup to exit that leaves the entry in the initial status.

if !MustGetFlagBool(options.NO_HISTORY) {
var statusString string
if backupFailed {
statusString = history.BackupStatusFailed
} else {
statusString = history.BackupStatusSucceed
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if !MustGetFlagBool(options.NO_HISTORY) {
var statusString string
if backupFailed {
statusString = history.BackupStatusFailed
} else {
statusString = history.BackupStatusSucceed
}
historyDBName := globalFPInfo.GetBackupHistoryDatabasePath()
historyDB, err := history.InitializeHistoryDatabase(historyDBName)
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
} else {
_, err := historyDB.Exec(fmt.Sprintf("UPDATE backups SET status='%s', end_time='%s' WHERE timestamp='%s'", statusString, backupReport.BackupConfig.EndTime, globalFPInfo.Timestamp))
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
} else {
_, err := historyDB.Exec(fmt.Sprintf("UPDATE backups SET status='%s', end_time='%s' WHERE timestamp='%s'", statusString, backupReport.BackupConfig.EndTime, globalFPInfo.Timestamp))
historyDB.Close()
if err != nil {
gplog.Error(fmt.Sprintf("Unable to update history database. Error: %v", err))
}
}
}
}

err := backupLockFile.Unlock()
if err != nil && backupLockFile != "" {
gplog.Warn("Failed to remove lock file %s.", backupLockFile)
Expand Down Expand Up @@ -585,7 +602,7 @@ func CreateInitialSegmentPipes(oidList []string, c *cluster.Cluster, connectionP
maxPipes = len(oidList)
}
for i := 0; i < maxPipes; i++ {
utils.CreateSegmentPipeOnAllHosts(oidList[i], c, fpInfo)
utils.CreateSegmentPipeOnAllHostsForBackup(oidList[i], c, fpInfo)
}
return maxPipes
}
Expand Down
30 changes: 20 additions & 10 deletions backup/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func AddTableDataEntriesToTOC(tables []Table, rowsCopiedMaps []map[uint32]int64)
}
}
attributes := ConstructTableAttributesList(table.ColumnDefs)
globalTOC.AddCoordinatorDataEntry(table.Schema, table.Name, table.Oid, attributes, rowsCopied, table.PartitionLevelInfo.RootName, table.DistPolicy)
globalTOC.AddCoordinatorDataEntry(table.Schema, table.Name, table.Oid, attributes, rowsCopied, table.PartitionLevelInfo.RootName, table.DistPolicy.Policy, table.DistPolicy.DistByEnum)
}
}
}
Expand All @@ -63,6 +63,9 @@ type BackupProgressCounters struct {
}

func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) {
if wasTerminated {
return -1, nil
}
checkPipeExistsCommand := ""
customPipeThroughCommand := utils.GetPipeThroughProgram().OutputCommand
sendToDestinationCommand := ">"
Expand All @@ -74,7 +77,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
* of the data is backed up.
*/
checkPipeExistsCommand = fmt.Sprintf("(test -p \"%s\" || (echo \"Pipe not found %s\">&2; exit 1)) && ", destinationToWrite, destinationToWrite)
customPipeThroughCommand = "cat -"
customPipeThroughCommand = utils.DefaultPipeThroughProgram
} else if MustGetFlagString(options.PLUGIN_CONFIG) != "" {
sendToDestinationCommand = fmt.Sprintf("| %s backup_data %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath)
}
Expand All @@ -99,8 +102,16 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
}
}

workerInfo := ""
if gplog.GetVerbosity() >= gplog.LOGVERBOSE {
workerInfo = fmt.Sprintf("Worker %d: ", connNum)
}
query := fmt.Sprintf("COPY %s%s TO %s WITH CSV DELIMITER '%s' ON SEGMENT%s;", tableName, columnNames, copyCommand, tableDelim, ignoreExternalPartitions)
gplog.Verbose("Worker %d: %s", connNum, query)
if connectionPool.Version.AtLeast("7") {
utils.LogProgress(`%sExecuting "%s" on coordinator`, workerInfo, query)
} else {
utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query)
}
result, err := connectionPool.Exec(query, connNum)
if err != nil {
return 0, err
Expand All @@ -111,15 +122,14 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite
}

func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error {
logMessage := fmt.Sprintf("Worker %d: Writing data for table %s to file", whichConn, table.FQN())
workerInfo := ""
if gplog.GetVerbosity() >= gplog.LOGVERBOSE {
workerInfo = fmt.Sprintf("Worker %d: ", whichConn)
}
logMessage := fmt.Sprintf("%sWriting data for table %s to file", workerInfo, table.FQN())
// Avoid race condition by incrementing counters in call to sprintf
tableCount := fmt.Sprintf(" (table %d of %d)", atomic.AddInt64(&counters.NumRegTables, 1), counters.TotalRegTables)
if gplog.GetVerbosity() > gplog.LOGINFO {
// No progress bar at this log level, so we note table count here
gplog.Verbose(logMessage + tableCount)
} else {
gplog.Verbose(logMessage)
}
utils.LogProgress(logMessage + tableCount)

destinationToWrite := ""
if MustGetFlagBool(options.SINGLE_DATA_FILE) {
Expand Down
33 changes: 1 addition & 32 deletions backup/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,6 @@ import (
* - Tables
* - Protocols
*/
func AddProtocolDependenciesForGPDB4(depMap DependencyMap, tables []Table, protocols []ExternalProtocol) {
protocolMap := make(map[string]UniqueID, len(protocols))
for _, p := range protocols {
protocolMap[p.Name] = p.GetUniqueID()
}
for _, table := range tables {
extTableDef := table.ExtTableDef
if extTableDef.Location.Valid && extTableDef.Location.String != "" {
protocolName := extTableDef.Location.String[0:strings.Index(extTableDef.Location.String, "://")]
if protocolEntry, ok := protocolMap[protocolName]; ok {
tableEntry := table.GetUniqueID()
if _, ok := depMap[tableEntry]; !ok {
depMap[tableEntry] = make(map[UniqueID]bool)
}
depMap[tableEntry][protocolEntry] = true
}
}
}
}

var (
PG_AGGREGATE_OID uint32 = 1255
Expand Down Expand Up @@ -286,19 +267,7 @@ LEFT JOIN pg_depend id1 ON (d.objid = id1.objid and d.classid = id1.classid and
LEFT JOIN pg_depend id2 ON (d.refobjid = id2.objid and d.refclassid = id2.classid and id2.deptype='i')
LEFT JOIN pg_depend id3 ON (id2.refobjid = id3.objid and id2.refclassid = id3.classid and id3.deptype='i')
WHERE d.classid != 0
AND d.deptype != 'i'
UNION
-- this converts function dependencies on array types to the underlying type
-- this is needed because pg_depend in 4.3.x doesn't have the info we need
SELECT
d.classid,
d.objid,
d.refclassid,
t.typelem AS refobjid
FROM pg_depend d
JOIN pg_type t ON d.refobjid = t.oid
WHERE d.classid = 'pg_proc'::regclass::oid
AND typelem != 0`
AND d.deptype != 'i'`

pgDependDeps := make([]SortableDependency, 0)
err := connectionPool.Select(&pgDependDeps, query)
Expand Down
2 changes: 1 addition & 1 deletion backup/dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var _ = Describe("backup/dependencies tests", func() {
backup.Domain{Oid: 4, Schema: "public", Name: "domain", BaseType: "numeric"},
backup.Table{
Relation: backup.Relation{Oid: 5, Schema: "public", Name: "relation"},
TableDefinition: backup.TableDefinition{DistPolicy: "DISTRIBUTED RANDOMLY", ColumnDefs: []backup.ColumnDefinition{}},
TableDefinition: backup.TableDefinition{DistPolicy: backup.DistPolicy{Policy: "DISTRIBUTED RANDOMLY"}, ColumnDefs: []backup.ColumnDefinition{}},
},
backup.ExternalProtocol{Oid: 6, Name: "ext_protocol", Trusted: true, ReadFunction: 2, WriteFunction: 1, Validator: 0},
backup.RangeType{Oid: 7, Schema: "public", Name: "rangetype1"},
Expand Down
1 change: 1 addition & 0 deletions backup/global_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
Deferred
Complete
PG_LOCK_NOT_AVAILABLE = "55P03"
ENUM_TYPE_OID = 3500
)

/*
Expand Down
4 changes: 1 addition & 3 deletions backup/metadata_globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,7 @@ func PrintCreateRoleStatements(metadataFile *utils.FileWithByteCount, objToc *to

attrs = append(attrs, fmt.Sprintf("RESOURCE QUEUE %s", role.ResQueue))

if connectionPool.Version.AtLeast("5") {
attrs = append(attrs, fmt.Sprintf("RESOURCE GROUP %s", role.ResGroup))
}
attrs = append(attrs, fmt.Sprintf("RESOURCE GROUP %s", role.ResGroup))

if role.Createrexthttp {
attrs = append(attrs, "CREATEEXTTABLE (protocol='http')")
Expand Down
7 changes: 2 additions & 5 deletions backup/metadata_globals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,9 @@ GRANT TEMPORARY,CONNECT ON DATABASE testdb TO testrole;`,
}

getResourceGroupReplace := func() (string, string) {
resourceGroupReplace1, resourceGroupReplace2 := "", ""
if connectionPool.Version.AtLeast("5") {
resourceGroupReplace1 = ` RESOURCE GROUP default_group`
resourceGroupReplace2 = `RESOURCE GROUP "testGroup" `
}

resourceGroupReplace1 := ` RESOURCE GROUP default_group`
resourceGroupReplace2 := `RESOURCE GROUP "testGroup" `
return resourceGroupReplace1, resourceGroupReplace2
}

Expand Down
8 changes: 4 additions & 4 deletions backup/predata_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ func PrintObjectMetadata(metadataFile *utils.FileWithByteCount, objToc *toc.TOC,
}

if owner := metadata.GetOwnerStatement(obj.FQN(), objectType); owner != "" {
if !(connectionPool.Version.Before("5") && entry.ObjectType == toc.OBJ_LANGUAGE) {
// Languages have implicit owners in 4.3, but do not support ALTER OWNER
statements = append(statements, strings.TrimSpace(owner))
}
statements = append(statements, strings.TrimSpace(owner))
}
if privileges := metadata.GetPrivilegesStatements(obj.FQN(), entry.ObjectType); privileges != "" {
statements = append(statements, strings.TrimSpace(privileges))
Expand Down Expand Up @@ -380,6 +377,9 @@ func createPrivilegeStrings(acl ACL, objectType string) (string, string) {
case toc.OBJ_TYPE:
hasAllPrivileges = acl.Usage
hasAllPrivilegesWithGrant = acl.UsageWithGrant
case toc.OBJ_AGGREGATE:
hasAllPrivileges = acl.Execute
hasAllPrivilegesWithGrant = acl.ExecuteWithGrant
}
if hasAllPrivileges {
privStr = "ALL"
Expand Down
5 changes: 3 additions & 2 deletions backup/predata_acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ REVOKE ALL ON FOREIGN SERVER foreignserver FROM PUBLIC;
REVOKE ALL ON FOREIGN SERVER foreignserver FROM testrole;
GRANT ALL ON FOREIGN SERVER foreignserver TO testrole;`)
})
It("prints FUNCTION for REVOKE and AGGREGATE for ALTER for an aggregate function", func() {
It("prints FUNCTION for GRANT/REVOKE and AGGREGATE for ALTER for an aggregate function", func() {
aggregate := backup.Aggregate{Schema: "public", Name: "testagg"}
aggregatePrivileges := testutils.DefaultACLForType("testrole", toc.OBJ_AGGREGATE)
aggregateMetadata := backup.ObjectMetadata{Privileges: []backup.ACL{aggregatePrivileges}, Owner: "testrole"}
Expand All @@ -160,7 +160,8 @@ ALTER AGGREGATE public.testagg(*) OWNER TO testrole;
REVOKE ALL ON FUNCTION public.testagg(*) FROM PUBLIC;
REVOKE ALL ON FUNCTION public.testagg(*) FROM testrole;`)
REVOKE ALL ON FUNCTION public.testagg(*) FROM testrole;
GRANT ALL ON FUNCTION public.testagg(*) TO testrole;`)
})
It("prints TABLE for a block of REVOKE and GRANT statements for a foreign table", func() {
table := backup.Table{Relation: backup.Relation{Schema: "public", Name: "foreigntablename"}}
Expand Down
2 changes: 1 addition & 1 deletion backup/predata_externals.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func PrintExternalTableCreateStatement(metadataFile *utils.FileWithByteCount, ob
metadataFile.MustPrintf(") ")
PrintExternalTableStatements(metadataFile, table.FQN(), extTableDef)
if extTableDef.Writable {
metadataFile.MustPrintf("\n%s", table.DistPolicy)
metadataFile.MustPrintf("\n%s", table.DistPolicy.Policy)
}
metadataFile.MustPrintf(";")
if objToc != nil {
Expand Down
Loading

0 comments on commit db5968c

Please sign in to comment.