Skip to content

Commit

Permalink
airbyte: run PostgreSQL queries with pgxpool & scany
Browse files Browse the repository at this point in the history
Signed-off-by: VirtualTam <virtualtam@flibidi.net>
  • Loading branch information
virtualtam committed Oct 25, 2023
1 parent 4c77f80 commit a2fc577
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 59 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/).

## [v2.2.0](https://github.com/botify-labs/airbyte_exporter/releases/tag/v2.2.0) - UNRELEASED
## [v2.2.0](https://github.com/botify-labs/airbyte_exporter/releases/tag/v2.2.0) - 2023-10-25

### Changed

- Require Go 1.21
- Bump base Docker image to Debian 12 Bookworm
- Update direct and transitive dependencies
- Rework the Repository to run PostgreSQL queries using the concurrency-safe `pgxpool` package
and scanning helpers from `scany`


## [v2.1.0](https://github.com/botify-labs/airbyte_exporter/releases/tag/v2.1.0) - 2023-06-19
Expand Down
22 changes: 17 additions & 5 deletions cmd/airbyte_exporter/exporter_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package main

import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -103,24 +104,35 @@ func NewExporterCommand() *cobra.Command {
)

// Database connection pool
db, err := sqlx.Connect(databaseDriver, databaseURI)
pgxPool, err := pgxpool.New(context.Background(), databaseURI)
if err != nil {
log.Error().
Err(err).
Str("database_driver", databaseDriver).
Str("database_addr", databaseAddr).
Str("database_name", databaseName).
Msg("failed to connect to database")
Msg("database: failed to create connection pool")
return err
}

if err := pgxPool.Ping(context.Background()); err != nil {
log.Error().
Err(err).
Str("database_driver", databaseDriver).
Str("database_addr", databaseAddr).
Str("database_name", databaseName).
Msg("database: failed to ping")
return err
}

log.Info().
Str("database_driver", databaseDriver).
Str("database_addr", databaseAddr).
Str("database_name", databaseName).
Msg("successfully connected to database")
Msg("database: successfully created connection pool")

// Airbyte Exporter services
airbyteRepository := airbyte.NewRepository(db)
airbyteRepository := airbyte.NewRepository(pgxPool)
airbyteService := airbyte.NewService(airbyteRepository)

httpServer := newServer(airbyteService, listenAddr)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/botify-labs/airbyte_exporter/v2
go 1.21

require (
github.com/georgysavva/scany/v2 v2.0.0
github.com/jackc/pgx/v5 v5.4.3
github.com/jmoiron/sqlx v1.3.5
github.com/justinas/alice v1.2.0
github.com/prometheus/client_golang v1.17.0
github.com/rs/zerolog v1.31.0
Expand All @@ -20,6 +20,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand All @@ -41,6 +42,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
22 changes: 14 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB05Ql//KWfeTFs=
github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -66,12 +68,14 @@ github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0X
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/georgysavva/scany/v2 v2.0.0 h1:RGXqxDv4row7/FYoK8MRXAZXqoWF/NM+NP0q50k3DKU=
github.com/georgysavva/scany/v2 v2.0.0/go.mod h1:sigOdh+0qb/+aOs3TVhehVT10p8qJL7K/Zhyz8vWo38=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -144,8 +148,8 @@ github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo=
Expand All @@ -159,8 +163,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand All @@ -169,14 +173,13 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -217,6 +220,7 @@ github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI=
github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down Expand Up @@ -337,6 +341,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
65 changes: 21 additions & 44 deletions internal/airbyte/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,37 @@

package airbyte

import "github.com/jmoiron/sqlx"
import (
"context"

"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgx/v5/pgxpool"
)

// Repository provides an abstraction layer to perform SQL queries against the
// Airbyte PostgreSQL database.
type Repository struct {
db *sqlx.DB
pool *pgxpool.Pool
}

// NewRepository initializes and returns an Airbyte Repository.
func NewRepository(db *sqlx.DB) *Repository {
func NewRepository(pool *pgxpool.Pool) *Repository {
return &Repository{
db: db,
pool: pool,
}
}

// actorCountQuery provides a helper to run a SQL query that returns rows to be marshaled
// as a slice of ActorCount.
func (r *Repository) actorCountQuery(query string) ([]ActorCount, error) {
rows, err := r.db.Queryx(query)
rows, err := r.pool.Query(context.Background(), query)
if err != nil {
return []ActorCount{}, err
}

var actorCounts []ActorCount

for rows.Next() {
var actorCount ActorCount

if err := rows.StructScan(&actorCount); err != nil {
return []ActorCount{}, err
}

actorCounts = append(actorCounts, actorCount)
if err := pgxscan.ScanAll(&actorCounts, rows); err != nil {
return []ActorCount{}, err
}

return actorCounts, nil
Expand All @@ -45,21 +43,14 @@ func (r *Repository) actorCountQuery(query string) ([]ActorCount, error) {
// connectionCountQuery provides a helper to run a SQL query that returns rows to be marshaled
// as a slice of ConnectionCount.
func (r *Repository) connectionCountQuery(query string) ([]ConnectionCount, error) {
rows, err := r.db.Queryx(query)
rows, err := r.pool.Query(context.Background(), query)
if err != nil {
return []ConnectionCount{}, err
}

var connectionCounts []ConnectionCount

for rows.Next() {
var connectionCount ConnectionCount

if err := rows.StructScan(&connectionCount); err != nil {
return []ConnectionCount{}, err
}

connectionCounts = append(connectionCounts, connectionCount)
if err := pgxscan.ScanAll(&connectionCounts, rows); err != nil {
return []ConnectionCount{}, err
}

return connectionCounts, nil
Expand All @@ -68,21 +59,14 @@ func (r *Repository) connectionCountQuery(query string) ([]ConnectionCount, erro
// connectionSyncAgeQuery provides a helper to run a SQL query that returns rows to be marshaled
// as a slice of ConnectionSyncAge.
func (r *Repository) connectionSyncAgeQuery(query string) ([]ConnectionSyncAge, error) {
rows, err := r.db.Queryx(query)
rows, err := r.pool.Query(context.Background(), query)
if err != nil {
return []ConnectionSyncAge{}, err
}

var connectionSyncAges []ConnectionSyncAge

for rows.Next() {
var connectionSyncAge ConnectionSyncAge

if err := rows.StructScan(&connectionSyncAge); err != nil {
return []ConnectionSyncAge{}, err
}

connectionSyncAges = append(connectionSyncAges, connectionSyncAge)
if err := pgxscan.ScanAll(&connectionSyncAges, rows); err != nil {
return []ConnectionSyncAge{}, err
}

return connectionSyncAges, nil
Expand All @@ -91,21 +75,14 @@ func (r *Repository) connectionSyncAgeQuery(query string) ([]ConnectionSyncAge,
// jobCountQuery provides a helper to run a SQL query that returns rows to be marshaled
// as a slice of JobCount.
func (r *Repository) jobCountQuery(query string) ([]JobCount, error) {
rows, err := r.db.Queryx(query)
rows, err := r.pool.Query(context.Background(), query)
if err != nil {
return []JobCount{}, err
}

var jobCounts []JobCount

for rows.Next() {
var jobCount JobCount

if err := rows.StructScan(&jobCount); err != nil {
return []JobCount{}, err
}

jobCounts = append(jobCounts, jobCount)
if err := pgxscan.ScanAll(&jobCounts, rows); err != nil {
return []JobCount{}, err
}

return jobCounts, nil
Expand Down

0 comments on commit a2fc577

Please sign in to comment.