Skip to content

Commit

Permalink
Fix duplicated column definition in the gpbackup metadata
Browse files Browse the repository at this point in the history
Problem description:
If a column in a table had privileges granted for two (or more) users, gpbackup
created metadata file, where this column was duplicated.

Cause:
The SQL query, that is used to retrieve information about all table columns in
the function 'GetColumnDefinitions', had the 'unnest' function applied to the
array of column-level access privileges from the 'pg_catalog.pg_attribute'.
Because of the 'unnest', if there were several aclitems for a column, the result
of the query had several rows returned for this column. But gpbackup treated all
these rows as results for separate columns, causing duplication in the metadata.

Fix:
'unnest' is replaced with the 'array_to_string' function in the query. Thus, for
each column, there is only one row in the result of the query. Now, the field
'Privileges' in the 'ColumnDefinition' struct contains all column-level access
privileges, separated by a comma. And later, in 'getColumnACL', this string with
privileges is split into a slice of strings, each string containing separate
aclitem, which is then parsed as before.
  • Loading branch information
whitehawk committed May 8, 2024
1 parent b36c671 commit 9abd7b2
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 10 deletions.
9 changes: 6 additions & 3 deletions backup/predata_acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,12 @@ func getColumnACL(privileges sql.NullString, kind string) []ACL {
privilegesStr = privileges.String
}
columnMetadata := make([]ACL, 0)
acl := ParseACL(privilegesStr)
if acl != nil {
columnMetadata = append(columnMetadata, *acl)
privilegesSlice := strings.Split(privilegesStr, ",")
for _, privilege := range privilegesSlice {
acl := ParseACL(privilege)
if acl != nil {
columnMetadata = append(columnMetadata, *acl)
}
}
return columnMetadata
}
Expand Down
8 changes: 2 additions & 6 deletions backup/queries_table_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,7 @@ func GetColumnDefinitions(connectionPool *dbconn.DBConn) map[uint32][]ColumnDefi
CASE WHEN a.attstorage != t.typstorage THEN a.attstorage ELSE '' END AS storagetype,
coalesce('('||pg_catalog.pg_get_expr(ad.adbin, ad.adrelid)||')', '') AS defaultval,
coalesce(d.description, '') AS comment,
CASE
WHEN a.attacl IS NULL THEN NULL
WHEN array_upper(a.attacl, 1) = 0 THEN a.attacl[0]
ELSE unnest(a.attacl) END AS privileges,
array_to_string(a.attacl, ',') AS privileges,
CASE
WHEN a.attacl IS NULL THEN ''
WHEN array_upper(a.attacl, 1) = 0 THEN 'Empty'
Expand Down Expand Up @@ -353,7 +350,7 @@ func GetColumnDefinitions(connectionPool *dbconn.DBConn) map[uint32][]ColumnDefi
coalesce('('||pg_catalog.pg_get_expr(ad.adbin, ad.adrelid)||')', '') AS defaultval,
coalesce(d.description, '') AS comment,
a.attgenerated,
ljl_unnest AS privileges,
array_to_string(a.attacl, ',') AS privileges,
CASE
WHEN a.attacl IS NULL THEN ''
WHEN array_upper(a.attacl, 1) = 0 THEN 'Empty'
Expand All @@ -375,7 +372,6 @@ func GetColumnDefinitions(connectionPool *dbconn.DBConn) map[uint32][]ColumnDefi
LEFT JOIN pg_collation coll ON a.attcollation = coll.oid
LEFT JOIN pg_namespace cn ON coll.collnamespace = cn.oid
LEFT JOIN pg_seclabel sec ON sec.objoid = a.attrelid AND sec.classoid = 'pg_class'::regclass AND sec.objsubid = a.attnum
LEFT JOIN LATERAL unnest(a.attacl) ljl_unnest ON a.attacl IS NOT NULL AND array_length(a.attacl, 1) != 0
WHERE %s
AND c.reltype <> 0
AND a.attnum > 0::pg_catalog.int2
Expand Down
21 changes: 21 additions & 0 deletions end_to_end/end_to_end_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,27 @@ LANGUAGE plpgsql NO SQL;`)
_, err := gprestoreCmd.CombinedOutput()
Expect(err).ToNot(HaveOccurred())
})
It("checks access privileges for multiple users on a column", func() {
testutils.SkipIfBefore6(backupConn)
if useOldBackupVersion {
Skip("This test is not needed for old backup versions")
}
testhelper.AssertQueryRuns(backupConn, "CREATE TABLE public.t1 (a int) DISTRIBUTED BY (a);")
testhelper.AssertQueryRuns(backupConn, "CREATE USER user1;")
testhelper.AssertQueryRuns(backupConn, "CREATE USER user2;")
testhelper.AssertQueryRuns(backupConn, "GRANT SELECT (a) on public.t1 to user1;")
testhelper.AssertQueryRuns(backupConn, "GRANT SELECT (a) on public.t1 to user2;")
defer testhelper.AssertQueryRuns(backupConn, "DROP USER user1;")
defer testhelper.AssertQueryRuns(backupConn, "DROP USER user2;")
defer testhelper.AssertQueryRuns(backupConn, "DROP TABLE public.t1;")

timestamp := gpbackup(gpbackupPath, backupHelperPath, "--backup-dir", backupDir)

contents := string(getMetdataFileContents(backupDir, timestamp, "metadata.sql"))
Expect(contents).To(ContainSubstring("CREATE TABLE public.t1 (\n\ta integer\n) DISTRIBUTED BY (a);"))
Expect(contents).To(ContainSubstring("GRANT SELECT (a) ON TABLE public.t1 TO user1;"))
Expect(contents).To(ContainSubstring("GRANT SELECT (a) ON TABLE public.t1 TO user2;"))
})
})
})
Describe("Restore to a different-sized cluster", FlakeAttempts(5), func() {
Expand Down
6 changes: 5 additions & 1 deletion integration/predata_table_defs_queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,20 @@ PARTITION BY RANGE (year)
testhelper.AssertQueryRuns(connectionPool, "ALTER TABLE public.atttable ALTER COLUMN e SET STORAGE PLAIN")
oid := testutils.OidFromObjectName(connectionPool, "public", "atttable", backup.TYPE_RELATION)
privileges := sql.NullString{String: "", Valid: false}
privilegesMulti := sql.NullString{String: "", Valid: false}
if connectionPool.Version.AtLeast("6") {
testhelper.AssertQueryRuns(connectionPool, "GRANT SELECT (c, d) ON TABLE public.atttable TO testrole")
privileges = sql.NullString{String: "testrole=r/testrole", Valid: true}
testhelper.AssertQueryRuns(connectionPool, "GRANT SELECT (e) ON TABLE public.atttable TO testrole")
testhelper.AssertQueryRuns(connectionPool, "GRANT SELECT (e) ON TABLE public.atttable TO anothertestrole")
privilegesMulti = sql.NullString{String: "testrole=r/testrole,anothertestrole=r/testrole", Valid: true}
}
tableAtts := backup.GetColumnDefinitions(connectionPool)[oid]

columnA := backup.ColumnDefinition{Oid: 0, Num: 1, Name: "a", NotNull: false, HasDefault: false, Type: "double precision", Encoding: "", StatTarget: -1, StorageType: "", DefaultVal: "", Comment: "att comment"}
columnC := backup.ColumnDefinition{Oid: 0, Num: 3, Name: "c", NotNull: true, HasDefault: false, Type: "text", Encoding: "", StatTarget: -1, StorageType: "", DefaultVal: "", Comment: "", Privileges: privileges}
columnD := backup.ColumnDefinition{Oid: 0, Num: 4, Name: "d", NotNull: false, HasDefault: true, Type: "integer", Encoding: "", StatTarget: -1, StorageType: "", DefaultVal: "(5)", Comment: "", Privileges: privileges}
columnE := backup.ColumnDefinition{Oid: 0, Num: 5, Name: "e", NotNull: false, HasDefault: false, Type: "text", Encoding: "", StatTarget: -1, StorageType: "PLAIN", DefaultVal: "", Comment: ""}
columnE := backup.ColumnDefinition{Oid: 0, Num: 5, Name: "e", NotNull: false, HasDefault: false, Type: "text", Encoding: "", StatTarget: -1, StorageType: "PLAIN", DefaultVal: "", Comment: "", Privileges: privilegesMulti}

Expect(tableAtts).To(HaveLen(4))

Expand Down

0 comments on commit 9abd7b2

Please sign in to comment.