diff --git a/internal/reaper/database.go b/internal/reaper/database.go index 7d8b2445b8..5bb60e5914 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/cybertec-postgresql/pgwatch/v3/internal/db" "github.com/cybertec-postgresql/pgwatch/v3/internal/log" "github.com/cybertec-postgresql/pgwatch/v3/internal/metrics" "github.com/cybertec-postgresql/pgwatch/v3/internal/sinks" @@ -17,30 +16,16 @@ import ( ) func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) { - var conn db.PgxIface - var err error - var tx pgx.Tx if strings.TrimSpace(sql) == "" { return nil, errors.New("empty SQL") } - conn = md.Conn - if md.IsPostgresSource() { - // we don't want transaction for non-postgres sources, e.g. pgbouncer - if tx, err = conn.Begin(ctx); err != nil { - return nil, err - } - defer func() { _ = tx.Commit(ctx) }() - _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'") - if err != nil { - return nil, err - } - conn = tx - } else { - // we want simple protocol for non-postgres connections, e.g. pgpool + // For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol + if !md.IsPostgresSource() { args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...) } - rows, err := conn.Query(ctx, sql, args...) + // lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper + rows, err := md.Conn.Query(ctx, sql, args...) if err == nil { return pgx.CollectRows(rows, metrics.RowToMeasurement) } diff --git a/internal/reaper/database_test.go b/internal/reaper/database_test.go index 0766cfe059..08cba0028d 100644 --- a/internal/reaper/database_test.go +++ b/internal/reaper/database_test.go @@ -28,12 +28,10 @@ func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIfa return md, mock } -// Helper function to set up transaction expectations for PostgreSQL sources -func expectTransaction(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) { - mock.ExpectBegin() - mock.ExpectExec("SET LOCAL lock_timeout").WillReturnResult(pgxmock.NewResult("SET", 0)) +// Helper function to set up query expectations for PostgreSQL sources +// (lock_timeout is now set at connection level, no transaction needed) +func expectQuery(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) { mock.ExpectQuery("SELECT").WillReturnRows(queryRows) - mock.ExpectCommit() } func TestTryCreateMetricsFetchingHelpers(t *testing.T) { @@ -95,7 +93,7 @@ func TestDetectSprocChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "hash1", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -109,7 +107,7 @@ func TestDetectSprocChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -121,7 +119,7 @@ func TestDetectSprocChanges(t *testing.T) { AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func2", "456", "hash2", time.Now().UnixNano()). AddRow("func3", "789", "hash3", time.Now().UnixNano()) // new sproc - expectTransaction(mock, newSprocRows) + expectQuery(mock, newSprocRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 1, result.Created) // func3 was created @@ -140,7 +138,7 @@ func TestDetectSprocChanges(t *testing.T) { droppedSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}). AddRow("func1", "123", "new_hash", time.Now().UnixNano()). AddRow("func3", "789", "hash3", time.Now().UnixNano()) // func2 dropped - expectTransaction(mock, droppedSprocRows) + expectQuery(mock, droppedSprocRows) result = reaper.DetectSprocChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -172,7 +170,7 @@ func TestDetectTableChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "hash1", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -184,7 +182,7 @@ func TestDetectTableChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -196,7 +194,7 @@ func TestDetectTableChanges(t *testing.T) { AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table2", "456", "hash2", time.Now().UnixNano()). AddRow("table3", "789", "hash3", time.Now().UnixNano()) // new table - expectTransaction(mock, newTableRows) + expectQuery(mock, newTableRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 1, result.Created) // table3 was created @@ -216,7 +214,7 @@ func TestDetectTableChanges(t *testing.T) { droppedTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}). AddRow("table1", "123", "new_hash", time.Now().UnixNano()). AddRow("table3", "789", "hash3", time.Now().UnixNano()) // table2 dropped - expectTransaction(mock, droppedTableRows) + expectQuery(mock, droppedTableRows) result = reaper.DetectTableChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -252,7 +250,7 @@ func TestDetectIndexChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "t", time.Now().UnixNano()). AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -264,7 +262,7 @@ func TestDetectIndexChanges(t *testing.T) { modifiedRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). // now invalid AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()) - expectTransaction(mock, modifiedRows) + expectQuery(mock, modifiedRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -276,7 +274,7 @@ func TestDetectIndexChanges(t *testing.T) { AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()). AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // new index - expectTransaction(mock, newIndexRows) + expectQuery(mock, newIndexRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 1, result.Created) // idx3 was created @@ -296,7 +294,7 @@ func TestDetectIndexChanges(t *testing.T) { droppedIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}). AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // idx2 dropped - expectTransaction(mock, droppedIndexRows) + expectQuery(mock, droppedIndexRows) result = reaper.DetectIndexChanges(ctx, md) assert.Equal(t, 0, result.Created) @@ -332,7 +330,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { initialRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}). AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) - expectTransaction(mock, initialRows) + expectQuery(mock, initialRows) result := reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 0, result.Created) // First run should not count anything as created @@ -345,7 +343,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user1", "table1", "INSERT", time.Now().UnixNano()). // new privilege AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) - expectTransaction(mock, newPrivilegeRows) + expectQuery(mock, newPrivilegeRows) result = reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 1, result.Created) // new privilege was granted @@ -365,7 +363,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { revokedPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}). AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()). AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) // user1 INSERT privilege revoked - expectTransaction(mock, revokedPrivilegeRows) + expectQuery(mock, revokedPrivilegeRows) result = reaper.DetectPrivilegeChanges(ctx, md) assert.Equal(t, 0, result.Created)