Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions internal/reaper/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/cybertec-postgresql/pgwatch/v3/internal/db"
"github.com/cybertec-postgresql/pgwatch/v3/internal/log"
"github.com/cybertec-postgresql/pgwatch/v3/internal/metrics"
"github.com/cybertec-postgresql/pgwatch/v3/internal/sinks"
Expand All @@ -17,30 +16,16 @@ import (
)

func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) {
var conn db.PgxIface
var err error
var tx pgx.Tx
if strings.TrimSpace(sql) == "" {
return nil, errors.New("empty SQL")
}

conn = md.Conn
if md.IsPostgresSource() {
// we don't want transaction for non-postgres sources, e.g. pgbouncer
if tx, err = conn.Begin(ctx); err != nil {
return nil, err
}
defer func() { _ = tx.Commit(ctx) }()
_, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
if err != nil {
return nil, err
}
conn = tx
} else {
// we want simple protocol for non-postgres connections, e.g. pgpool
// For non-postgres connections (e.g. pgbouncer, pgpool), use simple protocol
if !md.IsPostgresSource() {
args = append([]any{pgx.QueryExecModeSimpleProtocol}, args...)
}
rows, err := conn.Query(ctx, sql, args...)
// lock_timeout is set at connection level via RuntimeParams, no need for transaction wrapper
rows, err := md.Conn.Query(ctx, sql, args...)
if err == nil {
return pgx.CollectRows(rows, metrics.RowToMeasurement)
}
Expand Down
38 changes: 18 additions & 20 deletions internal/reaper/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIfa
return md, mock
}

// Helper function to set up transaction expectations for PostgreSQL sources
func expectTransaction(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) {
mock.ExpectBegin()
mock.ExpectExec("SET LOCAL lock_timeout").WillReturnResult(pgxmock.NewResult("SET", 0))
// Helper function to set up query expectations for PostgreSQL sources
// (lock_timeout is now set at connection level, no transaction needed)
func expectQuery(mock pgxmock.PgxPoolIface, queryRows *pgxmock.Rows) {
mock.ExpectQuery("SELECT").WillReturnRows(queryRows)
mock.ExpectCommit()
}

func TestTryCreateMetricsFetchingHelpers(t *testing.T) {
Expand Down Expand Up @@ -95,7 +93,7 @@ func TestDetectSprocChanges(t *testing.T) {
initialRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
AddRow("func1", "123", "hash1", time.Now().UnixNano()).
AddRow("func2", "456", "hash2", time.Now().UnixNano())
expectTransaction(mock, initialRows)
expectQuery(mock, initialRows)

result := reaper.DetectSprocChanges(ctx, md)
assert.Equal(t, 0, result.Created) // First run should not count anything as created
Expand All @@ -109,7 +107,7 @@ func TestDetectSprocChanges(t *testing.T) {
modifiedRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
AddRow("func2", "456", "hash2", time.Now().UnixNano())
expectTransaction(mock, modifiedRows)
expectQuery(mock, modifiedRows)

result = reaper.DetectSprocChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand All @@ -121,7 +119,7 @@ func TestDetectSprocChanges(t *testing.T) {
AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
AddRow("func2", "456", "hash2", time.Now().UnixNano()).
AddRow("func3", "789", "hash3", time.Now().UnixNano()) // new sproc
expectTransaction(mock, newSprocRows)
expectQuery(mock, newSprocRows)

result = reaper.DetectSprocChanges(ctx, md)
assert.Equal(t, 1, result.Created) // func3 was created
Expand All @@ -140,7 +138,7 @@ func TestDetectSprocChanges(t *testing.T) {
droppedSprocRows := pgxmock.NewRows([]string{"tag_sproc", "tag_oid", "md5", "epoch_ns"}).
AddRow("func1", "123", "new_hash", time.Now().UnixNano()).
AddRow("func3", "789", "hash3", time.Now().UnixNano()) // func2 dropped
expectTransaction(mock, droppedSprocRows)
expectQuery(mock, droppedSprocRows)

result = reaper.DetectSprocChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand Down Expand Up @@ -172,7 +170,7 @@ func TestDetectTableChanges(t *testing.T) {
initialRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
AddRow("table1", "123", "hash1", time.Now().UnixNano()).
AddRow("table2", "456", "hash2", time.Now().UnixNano())
expectTransaction(mock, initialRows)
expectQuery(mock, initialRows)

result := reaper.DetectTableChanges(ctx, md)
assert.Equal(t, 0, result.Created) // First run should not count anything as created
Expand All @@ -184,7 +182,7 @@ func TestDetectTableChanges(t *testing.T) {
modifiedRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
AddRow("table2", "456", "hash2", time.Now().UnixNano())
expectTransaction(mock, modifiedRows)
expectQuery(mock, modifiedRows)

result = reaper.DetectTableChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand All @@ -196,7 +194,7 @@ func TestDetectTableChanges(t *testing.T) {
AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
AddRow("table2", "456", "hash2", time.Now().UnixNano()).
AddRow("table3", "789", "hash3", time.Now().UnixNano()) // new table
expectTransaction(mock, newTableRows)
expectQuery(mock, newTableRows)

result = reaper.DetectTableChanges(ctx, md)
assert.Equal(t, 1, result.Created) // table3 was created
Expand All @@ -216,7 +214,7 @@ func TestDetectTableChanges(t *testing.T) {
droppedTableRows := pgxmock.NewRows([]string{"tag_table", "tag_oid", "md5", "epoch_ns"}).
AddRow("table1", "123", "new_hash", time.Now().UnixNano()).
AddRow("table3", "789", "hash3", time.Now().UnixNano()) // table2 dropped
expectTransaction(mock, droppedTableRows)
expectQuery(mock, droppedTableRows)

result = reaper.DetectTableChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand Down Expand Up @@ -252,7 +250,7 @@ func TestDetectIndexChanges(t *testing.T) {
initialRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
AddRow("idx1", "table1", "hash1", "t", time.Now().UnixNano()).
AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
expectTransaction(mock, initialRows)
expectQuery(mock, initialRows)

result := reaper.DetectIndexChanges(ctx, md)
assert.Equal(t, 0, result.Created) // First run should not count anything as created
Expand All @@ -264,7 +262,7 @@ func TestDetectIndexChanges(t *testing.T) {
modifiedRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()). // now invalid
AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano())
expectTransaction(mock, modifiedRows)
expectQuery(mock, modifiedRows)

result = reaper.DetectIndexChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand All @@ -276,7 +274,7 @@ func TestDetectIndexChanges(t *testing.T) {
AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
AddRow("idx2", "table1", "hash2", "t", time.Now().UnixNano()).
AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // new index
expectTransaction(mock, newIndexRows)
expectQuery(mock, newIndexRows)

result = reaper.DetectIndexChanges(ctx, md)
assert.Equal(t, 1, result.Created) // idx3 was created
Expand All @@ -296,7 +294,7 @@ func TestDetectIndexChanges(t *testing.T) {
droppedIndexRows := pgxmock.NewRows([]string{"tag_index", "table", "md5", "is_valid", "epoch_ns"}).
AddRow("idx1", "table1", "hash1", "f", time.Now().UnixNano()).
AddRow("idx3", "table2", "hash3", "t", time.Now().UnixNano()) // idx2 dropped
expectTransaction(mock, droppedIndexRows)
expectQuery(mock, droppedIndexRows)

result = reaper.DetectIndexChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand Down Expand Up @@ -332,7 +330,7 @@ func TestDetectPrivilegeChanges(t *testing.T) {
initialRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
expectTransaction(mock, initialRows)
expectQuery(mock, initialRows)

result := reaper.DetectPrivilegeChanges(ctx, md)
assert.Equal(t, 0, result.Created) // First run should not count anything as created
Expand All @@ -345,7 +343,7 @@ func TestDetectPrivilegeChanges(t *testing.T) {
AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
AddRow("table", "user1", "table1", "INSERT", time.Now().UnixNano()). // new privilege
AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano())
expectTransaction(mock, newPrivilegeRows)
expectQuery(mock, newPrivilegeRows)

result = reaper.DetectPrivilegeChanges(ctx, md)
assert.Equal(t, 1, result.Created) // new privilege was granted
Expand All @@ -365,7 +363,7 @@ func TestDetectPrivilegeChanges(t *testing.T) {
revokedPrivilegeRows := pgxmock.NewRows([]string{"object_type", "tag_role", "tag_object", "privilege_type", "epoch_ns"}).
AddRow("table", "user1", "table1", "SELECT", time.Now().UnixNano()).
AddRow("table", "user2", "table2", "INSERT", time.Now().UnixNano()) // user1 INSERT privilege revoked
expectTransaction(mock, revokedPrivilegeRows)
expectQuery(mock, revokedPrivilegeRows)

result = reaper.DetectPrivilegeChanges(ctx, md)
assert.Equal(t, 0, result.Created)
Expand Down
Loading