From 83c58ad6c6989a6a9783fb1f440ab721c93c4898 Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Wed, 11 Feb 2026 16:50:33 +0200 Subject: [PATCH 1/3] CROSSLINK-203 Add creation of crosslink_broker schema --- README.md | 5 +++++ broker/dbutil/dbutil.go | 36 ++++++++++++++++++++++++++++++++++- broker/ill_db/illrepo_test.go | 1 + 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 34d46982..b4b2b712 100644 --- a/README.md +++ b/README.md @@ -61,3 +61,8 @@ Charts use the `LoadBalancer` service type by default, you can change this durin ``` --set service.type=ClusterIP ``` + +For any installation, the application database user must have privileges to create schemas in the database +(for example, the `CREATE` privilege on the database or the ability to run `CREATE SCHEMA`). +Database migrations will create and update all required tables and other objects in the `crosslink_broker` schema, +which is selected via the PostgreSQL `search_path`. \ No newline at end of file diff --git a/broker/dbutil/dbutil.go b/broker/dbutil/dbutil.go index 09f90a0d..6f790907 100644 --- a/broker/dbutil/dbutil.go +++ b/broker/dbutil/dbutil.go @@ -2,7 +2,9 @@ package dbutil import ( "context" + "database/sql" "fmt" + "strings" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" @@ -10,8 +12,10 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +const SchemaParam = "&search_path=crosslink_broker" + func GetConnectionString(typ, user, pass, host, port, db string) string { - return fmt.Sprintf("%s://%s:%s@%s:%s/%s?sslmode=disable", typ, user, pass, host, port, db) + return fmt.Sprintf("%s://%s:%s@%s:%s/%s?sslmode=disable"+SchemaParam, typ, user, pass, host, port, db) } func InitDbPool(connStr string) (*pgxpool.Pool, error) { @@ -21,6 +25,10 @@ func InitDbPool(connStr string) (*pgxpool.Pool, error) { func RunMigrateScripts(migrateDir, connStr string) (uint, uint, bool, error) { var versionFrom, versionTo uint var dirty bool + err := initDBSchema(connStr) + if err != nil { + return versionFrom, versionTo, dirty, fmt.Errorf("failed to initiate schema: %w", err) + } m, err := migrate.New(migrateDir, connStr) if err != nil { return versionFrom, versionTo, dirty, fmt.Errorf("failed to initiate migration: %w", err) @@ -43,3 +51,29 @@ func RunMigrateScripts(migrateDir, connStr string) (uint, uint, bool, error) { } return versionFrom, versionTo, dirty, nil } + +func initDBSchema(connStr string) error { + connStr = strings.Replace(connStr, SchemaParam, "", 1) + db, err := sql.Open("postgres", connStr) + if err != nil { + return fmt.Errorf("error opening database: : %w", err) + } + defer db.Close() + + script := "DO $$ " + + "BEGIN " + + " IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'crosslink_broker') THEN " + + " CREATE ROLE crosslink_broker PASSWORD 'tenant' NOSUPERUSER NOCREATEDB INHERIT LOGIN; " + + " END IF; " + + "END " + + "$$; " + + "CREATE SCHEMA IF NOT EXISTS crosslink_broker AUTHORIZATION crosslink_broker; " + + "SET search_path TO crosslink_broker;" + + _, err = db.Exec(script) + if err != nil { + return fmt.Errorf("error executing script: %w", err) + } + + return nil +} diff --git a/broker/ill_db/illrepo_test.go b/broker/ill_db/illrepo_test.go index 86c2de3a..8477a6d6 100644 --- a/broker/ill_db/illrepo_test.go +++ b/broker/ill_db/illrepo_test.go @@ -29,6 +29,7 @@ func createDirectoryAdapter(urls ...string) adapter.DirectoryLookupAdapter { func TestMain(m *testing.M) { ctx, pgc, connStr, err := test.StartPGContainer() + connStr = connStr + dbutil.SchemaParam test.Expect(err, "failed to start db container") pgIllRepo := new(PgIllRepo) pgIllRepo.Pool, err = dbutil.InitDbPool(connStr) From 9ef55ecbafdf1937ba2f403ee638c51515def64e Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Wed, 11 Feb 2026 17:03:50 +0200 Subject: [PATCH 2/3] CROSSLINK-203 Make schema configurable --- broker/README.md | 1 + broker/dbutil/dbutil.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/broker/README.md b/broker/README.md index 2fb67463..c89796f7 100644 --- a/broker/README.md +++ b/broker/README.md @@ -75,6 +75,7 @@ Configuration is provided via environment variables: | `DB_HOST` | Database host | `localhost` | | `DB_DATABASE` | Database name | `crosslink` | | `DB_PORT` | Database port | `25432` | +| `DB_SCHEMA` | Database schema to use | `crosslink_broker` | | `LOG_LEVEL` | Log level: `ERROR`, `WARN`, `INFO`, `DEBUG` | `INFO` | | `ENABLE_JSON_LOG` | Should JSON log format be enabled | `false` | | `BROKER_MODE` | Default broker mode if not configured for a peer: `opaque` or `transparent` | `opaque` | diff --git a/broker/dbutil/dbutil.go b/broker/dbutil/dbutil.go index 6f790907..5ad58ad9 100644 --- a/broker/dbutil/dbutil.go +++ b/broker/dbutil/dbutil.go @@ -9,10 +9,12 @@ import ( "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" + "github.com/indexdata/go-utils/utils" "github.com/jackc/pgx/v5/pgxpool" ) -const SchemaParam = "&search_path=crosslink_broker" +var DB_SCHEMA = utils.GetEnv("DB_SCHEMA", "crosslink_broker") +var SchemaParam = "&search_path=" + DB_SCHEMA func GetConnectionString(typ, user, pass, host, port, db string) string { return fmt.Sprintf("%s://%s:%s@%s:%s/%s?sslmode=disable"+SchemaParam, typ, user, pass, host, port, db) From 22bbc87520d7a77c2154c4337b505e4f31779e91 Mon Sep 17 00:00:00 2001 From: Janis Saldabols Date: Wed, 11 Feb 2026 17:17:52 +0200 Subject: [PATCH 3/3] CROSSLINK-203 Make schema configurable --- broker/dbutil/dbutil.go | 42 +++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/broker/dbutil/dbutil.go b/broker/dbutil/dbutil.go index 5ad58ad9..3bd5cc6f 100644 --- a/broker/dbutil/dbutil.go +++ b/broker/dbutil/dbutil.go @@ -1,16 +1,19 @@ package dbutil import ( + "bytes" "context" "database/sql" "fmt" "strings" + "text/template" "github.com/golang-migrate/migrate/v4" _ "github.com/golang-migrate/migrate/v4/database/postgres" _ "github.com/golang-migrate/migrate/v4/source/file" "github.com/indexdata/go-utils/utils" "github.com/jackc/pgx/v5/pgxpool" + "github.com/lib/pq" ) var DB_SCHEMA = utils.GetEnv("DB_SCHEMA", "crosslink_broker") @@ -62,17 +65,36 @@ func initDBSchema(connStr string) error { } defer db.Close() - script := "DO $$ " + - "BEGIN " + - " IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'crosslink_broker') THEN " + - " CREATE ROLE crosslink_broker PASSWORD 'tenant' NOSUPERUSER NOCREATEDB INHERIT LOGIN; " + - " END IF; " + - "END " + - "$$; " + - "CREATE SCHEMA IF NOT EXISTS crosslink_broker AUTHORIZATION crosslink_broker; " + - "SET search_path TO crosslink_broker;" + const setupSQL = ` + DO $$ + BEGIN + IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = {{.Literal}}) THEN + CREATE ROLE {{.Identifier}} WITH PASSWORD 'tenant' LOGIN; + END IF; + END + $$; + CREATE SCHEMA IF NOT EXISTS {{.Identifier}} AUTHORIZATION {{.Identifier}}; + SET search_path TO {{.Identifier}};` - _, err = db.Exec(script) + tmpl, err := template.New("setup").Parse(setupSQL) + if err != nil { + return err + } + + var buf bytes.Buffer + data := struct { + Literal string + Identifier string + }{ + Literal: pq.QuoteLiteral(DB_SCHEMA), + Identifier: pq.QuoteIdentifier(DB_SCHEMA), + } + + if err = tmpl.Execute(&buf, data); err != nil { + return err + } + + _, err = db.Exec(buf.String()) if err != nil { return fmt.Errorf("error executing script: %w", err) }