From ed03b776e4a951fb49daeb3583b827a5a601c69e Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Dec 2025 02:40:08 +0000 Subject: [PATCH 1/3] [*] set lock_timeout via connection RuntimeParams instead of per-query transaction Reduces metric queries from 4 roundtrips (BEGIN/SET/query/COMMIT) to 1 by setting lock_timeout at connection level via pgx RuntimeParams. - Add --conn-lock-timeout option (default: 100ms, env: PW_CONN_LOCK_TIMEOUT) - Set lock_timeout in RuntimeParams for PostgreSQL sources on connect - Simplify QueryMeasurements by removing transaction wrapper - Set to "0" to disable lock_timeout Ref: https://brandur.org/fragments/postgres-parameters --- internal/reaper/database.go | 23 ++++------------------- internal/sources/cmdopts.go | 1 + internal/sources/conn.go | 5 +++++ 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/internal/reaper/database.go b/internal/reaper/database.go index 7d8b2445b8..5bb60e5914 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/cybertec-postgresql/pgwatch/v3/internal/db" "github.com/cybertec-postgresql/pgwatch/v3/internal/log" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" @@ -17,30 +16,16 @@ import ( ) func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) { - var conn db.PgxIface - var err error - var tx pgx.Tx if strings.TrimSpace(sql) == "" { return nil, errors.New("empty SQL") } - conn = md.Conn - if md.IsPostgresSource() { - // we don't want transaction for non-postgres sources, e.g. pgbouncer - if tx, err = conn.Begin(ctx); err != nil { - return nil, err - } - defer func() { _ = tx.Commit(ctx) }() - _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'") - if err != nil { - return nil, err - } - conn = tx - } else { - // we want simple protocol for non-postgres connections, e.g. pgpool + // For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol + if !md.IsPostgresSource() { args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...) } - rows, err := conn.Query(ctx, sql, args...) + // lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper + rows, err := md.Conn.Query(ctx, sql, args...) if err == nil { return pgx.CollectRows(rows, metrics.RowToMeasurement) } diff --git a/internal/sources/cmdopts.go b/internal/sources/cmdopts.go index accc82d0c7..3acc6d6e9f 100644 --- a/internal/sources/cmdopts.go +++ b/internal/sources/cmdopts.go @@ -9,4 +9,5 @@ type CmdOpts struct { MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"4"` TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""` CreateHelpers bool `long:"create-helpers" mapstructure:"create-helpers" description:"Create helper database objects from metric definitions" env:"PW_CREATE_HELPERS"` + ConnLockTimeout string `long:"conn-lock-timeout" mapstructure:"conn-lock-timeout" description:"PostgreSQL lock_timeout for metric query connections. Set to 0 to disable" env:"PW_CONN_LOCK_TIMEOUT" default:"100ms"` } diff --git a/internal/sources/conn.go b/internal/sources/conn.go index 2a47529c73..0abaa45042 100644 --- a/internal/sources/conn.go +++ b/internal/sources/conn.go @@ -89,6 +89,11 @@ func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error) { if opts.MaxParallelConnectionsPerDb > 0 { md.ConnConfig.MaxConns = int32(opts.MaxParallelConnectionsPerDb) } + // Set lock_timeout at connection level for PostgreSQL sources to avoid + // wrapping every query in a transaction with SET LOCAL lock_timeout + if md.IsPostgresSource() && opts.ConnLockTimeout != "" && opts.ConnLockTimeout != "0" { + md.ConnConfig.ConnConfig.RuntimeParams["lock_timeout"] = opts.ConnLockTimeout + } md.Conn, err = NewConnWithConfig(ctx, md.ConnConfig) if err != nil { return err From b41b64e905cbe425f48d4bcb19021edcc0fbf04a Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Dec 2025 23:01:22 +0000 Subject: [PATCH 2/3] [*] update tests to match new connection-level lock_timeout Tests no longer expect transaction wrapping (BEGIN/SET/COMMIT) since lock_timeout is now set via RuntimeParams at connection time. --- internal/reaper/database_test.go | 38 +++++++++++++++----------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/internal/reaper/database_test.go b/internal/reaper/database_test.go index 0766cfe059..08cba0028d 100644 --- a/internal/reaper/database_test.go +++ b/internal/reaper/database_test.go @@ -28,12 +28,10 @@ func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIfa return md, mock } -// Helper function to set up transaction expectations for PostgreSQL sources -func expectTransaction(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) { - mock.ExpectBegin() - mock.ExpectExec("SET LOCAL lock_timeout").WillReturnResult(pgxmock.NewResult("SET", 0)) +// Helper function to set up query expectations for PostgreSQL sources +// (lock_timeout is now set at connection level, no transaction needed) +func expectQuery(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) { mock.ExpectQuery("SELECT").WillReturnRows(queryRows) - mock.ExpectCommit() } func TestTryCreateMetricsFetchingHelpers(t *testing.T) { @@ -95,7 +93,7 @@ func TestDetectSprocChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "hash1", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -109,7 +107,7 @@ func TestDetectSprocChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -121,7 +119,7 @@ func TestDetectSprocChanges(t *testing.T) { AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()). AddRow("func3", "789", "hash3", time.Now().UnixNano()) // new sproc - expectTransaction(mock, newSprocRows) + expectQuery(mock, newSprocRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 1, result.Created) // func3 was created @@ -140,7 +138,7 @@ func TestDetectSprocChanges(t *testing.T) { droppedSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func3", "789", "hash3", time.Now().UnixNano()) // func2 dropped - expectTransaction(mock, droppedSprocRows) + expectQuery(mock, droppedSprocRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -172,7 +170,7 @@ func TestDetectTableChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "hash1", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -184,7 +182,7 @@ func TestDetectTableChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -196,7 +194,7 @@ func TestDetectTableChanges(t *testing.T) { AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()). AddRow("table3", "789", "hash3", time.Now().UnixNano()) // new table - expectTransaction(mock, newTableRows) + expectQuery(mock, newTableRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 1, result.Created) // table3 was created @@ -216,7 +214,7 @@ func TestDetectTableChanges(t *testing.T) { droppedTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table3", "789", "hash3", time.Now().UnixNano()) // table2 dropped - expectTransaction(mock, droppedTableRows) + expectQuery(mock, droppedTableRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -252,7 +250,7 @@ func TestDetectIndexChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "t", time.Now().UnixNano()). AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -264,7 +262,7 @@ func TestDetectIndexChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). // now invalid AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -276,7 +274,7 @@ func TestDetectIndexChanges(t *testing.T) { AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()). AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // new index - expectTransaction(mock, newIndexRows) + expectQuery(mock, newIndexRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 1, result.Created) // idx3 was created @@ -296,7 +294,7 @@ func TestDetectIndexChanges(t *testing.T) { droppedIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // idx2 dropped - expectTransaction(mock, droppedIndexRows) + expectQuery(mock, droppedIndexRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -332,7 +330,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}). AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -345,7 +343,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user1", "table1", "INSERT", time.Now().UnixNano()). // new privilege AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) - expectTransaction(mock, newPrivilegeRows) + expectQuery(mock, newPrivilegeRows) result = reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 1, result.Created) // new privilege was granted @@ -365,7 +363,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { revokedPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}). AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) // user1 INSERT privilege revoked - expectTransaction(mock, revokedPrivilegeRows) + expectQuery(mock, revokedPrivilegeRows) result = reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 0, result.Created) From b97d7f38f8ced501e758de5de45eb7f850ec533b Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Mon, 8 Dec 2025 14:33:29 +0100 Subject: [PATCH 3/3] remove proposed `ConnLockTimeout` option User may control connection options through connection string, e.g. `--source=postgresql://foo:baz@bar/db?lock_timeout=100` --- internal/sources/cmdopts.go | 1 - internal/sources/conn.go | 5 ----- 2 files changed, 6 deletions(-) diff --git a/internal/sources/cmdopts.go b/internal/sources/cmdopts.go index 3acc6d6e9f..accc82d0c7 100644 --- a/internal/sources/cmdopts.go +++ b/internal/sources/cmdopts.go @@ -9,5 +9,4 @@ type CmdOpts struct { MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" mapstructure:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"4"` TryCreateListedExtsIfMissing string `long:"try-create-listed-exts-if-missing" mapstructure:"try-create-listed-exts-if-missing" description:"Try creating the listed extensions (comma sep.) on first connect for all monitored DBs when missing. Main usage - pg_stat_statements" env:"PW_TRY_CREATE_LISTED_EXTS_IF_MISSING" default:""` CreateHelpers bool `long:"create-helpers" mapstructure:"create-helpers" description:"Create helper database objects from metric definitions" env:"PW_CREATE_HELPERS"` - ConnLockTimeout string `long:"conn-lock-timeout" mapstructure:"conn-lock-timeout" description:"PostgreSQL lock_timeout for metric query connections. Set to 0 to disable" env:"PW_CONN_LOCK_TIMEOUT" default:"100ms"` } diff --git a/internal/sources/conn.go b/internal/sources/conn.go index 0abaa45042..2a47529c73 100644 --- a/internal/sources/conn.go +++ b/internal/sources/conn.go @@ -89,11 +89,6 @@ func (md *SourceConn) Connect(ctx context.Context, opts CmdOpts) (err error) { if opts.MaxParallelConnectionsPerDb > 0 { md.ConnConfig.MaxConns = int32(opts.MaxParallelConnectionsPerDb) } - // Set lock_timeout at connection level for PostgreSQL sources to avoid - // wrapping every query in a transaction with SET LOCAL lock_timeout - if md.IsPostgresSource() && opts.ConnLockTimeout != "" && opts.ConnLockTimeout != "0" { - md.ConnConfig.ConnConfig.RuntimeParams["lock_timeout"] = opts.ConnLockTimeout - } md.Conn, err = NewConnWithConfig(ctx, md.ConnConfig) if err != nil { return err