From 9385b3d4b13fa9c8bf2403d8fd035a48738c86a1 Mon Sep 17 00:00:00 2001 From: Michelle Laurenti Date: Mon, 6 May 2024 08:43:05 +0000 Subject: [PATCH] implement job queues --- .github/workflows/main.yml | 37 ++++++++++ README.md | 5 ++ docker-compose.yml | 14 +++- go.mod | 23 ++++++ go.sum | 31 ++++++++ queue/interface.go | 24 ++++++ queue/oracle.go | 145 +++++++++++++++++++++++++++++++++++++ queue/postgres.go | 101 ++++++++++++++++++++++++++ queue/queue_test.go | 110 ++++++++++++++++++++++++++++ 9 files changed, 488 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/main.yml create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 queue/interface.go create mode 100644 queue/oracle.go create mode 100644 queue/postgres.go create mode 100644 queue/queue_test.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..3d14e7d --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,37 @@ +name: test + +on: + push: + branches: [ "main" ] + +jobs: + test: + runs-on: ubuntu-latest + container: + image: ghcr.io/moveaxlab/oracle-db-job-queue-devcontainer:latest + services: + postgres: + image: kartoza/postgis:15 + env: + POSTGRES_USER: postgres + POSTGRES_PASS: postgres + POSTGRES_DBNAME: test + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + oracle: + image: container-registry.oracle.com/database/free:latest + ports: + - 1521:1521 + - 5500:5500 + env: + ORACLE_PWD: password + + steps: + - uses: actions/checkout@v4 + - run: go test -v ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..1d1bcbe --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# Oracle DB Job Queue + +This repo contains an example implementation of a transactional job queue +using Oracle DB. + diff --git a/docker-compose.yml b/docker-compose.yml index 5fc636d..d29abf0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,13 +4,23 @@ services: devcontainer: image: ghcr.io/moveaxlab/oracle-db-job-queue-devcontainer:latest volumes: - - .:/workspaces/cotral-go:cached + - .:/workspaces/oracle-db-job-queue:cached command: sleep infinity - database: + oracle: image: container-registry.oracle.com/database/free:latest ports: - 1521:1521 - 5500:5500 environment: ORACLE_PWD: password + + postgres: + image: kartoza/postgis:15 + ports: + - 5432:5432 + hostname: database + environment: + POSTGRES_USER: postgres + POSTGRES_PASS: postgres + POSTGRES_DBNAME: test diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e5908d8 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module github.com/moveaxlab/oracle-db-job-queue + +go 1.20 + +require ( + github.com/godror/godror v0.42.2 + github.com/moveaxlab/go-optional v1.0.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +require ( + github.com/go-logfmt/logfmt v0.6.0 // indirect + github.com/godror/knownpb v0.1.1 // indirect + github.com/lib/pq v1.10.9 + github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81 // indirect + google.golang.org/protobuf v1.33.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d3a41d3 --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/UNO-SOFT/zlog v0.8.1 h1:TEFkGJHtUfTRgMkLZiAjLSHALjwSBdw6/zByMC5GJt4= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/godror/godror v0.42.2 h1:TmOV0fr4jJxwDD6vWtSieQFOZ75LnSbkJaudfuJOsVQ= +github.com/godror/godror v0.42.2/go.mod h1:82Uc/HdjsFVnzR5c9Yf6IkTBalK80jzm/U6xojbTo94= +github.com/godror/knownpb v0.1.1 h1:A4J7jdx7jWBhJm18NntafzSC//iZDHkDi1+juwQ5pTI= +github.com/godror/knownpb v0.1.1/go.mod h1:4nRFbQo1dDuwKnblRXDxrfCFYeT4hjg3GjMqef58eRE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/moveaxlab/go-optional v1.0.1 h1:98USw1u4e9Idl+tYX5R4iqqlcOFbcb2flOfUZrXFGZ4= +github.com/moveaxlab/go-optional v1.0.1/go.mod h1:/yi0bSNha6vvDiYRrgqeAyTU1RA5YUeK3FCgIZnn5q8= +github.com/oklog/ulid/v2 v2.0.2 h1:r4fFzBm+bv0wNKNh5eXTwU7i85y5x+uwkxCUTNVQqLc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81 h1:6R2FC06FonbXQ8pK11/PDFY6N6LWlf9KlzibaCapmqc= +golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/queue/interface.go b/queue/interface.go new file mode 100644 index 0000000..ef85b43 --- /dev/null +++ b/queue/interface.go @@ -0,0 +1,24 @@ +package queue + +import ( + "context" + + "github.com/moveaxlab/go-optional" +) + +type txKey string + +type Email struct { + Recipient string + Subject string + Body string +} + +type Queue interface { + Migrate() + Truncate() + Begin(context.Context) context.Context + Commit(context.Context) + Enqueue(context.Context, Email) + Dequeue(context.Context) optional.Optional[Email] +} diff --git a/queue/oracle.go b/queue/oracle.go new file mode 100644 index 0000000..4f1cb56 --- /dev/null +++ b/queue/oracle.go @@ -0,0 +1,145 @@ +package queue + +import ( + "context" + "database/sql" + "log" + + _ "github.com/godror/godror" + "github.com/moveaxlab/go-optional" +) + +type oracleQueue struct { + db *sql.DB +} + +const createTable = ` +CREATE TABLE email_outbox ( + id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY, + recipient VARCHAR2(255) NOT NULL, + subject VARCHAR2(255) NOT NULL, + body VARCHAR2(4000) NOT NULL +) +` + +const createFunction = ` +CREATE OR REPLACE FUNCTION next_email RETURN NUMBER IS + row_locked EXCEPTION; + PRAGMA EXCEPTION_INIT(row_locked, -54); + v_return NUMBER; + CURSOR c_id IS + SELECT id FROM email_outbox ORDER BY id; +BEGIN + FOR r_id IN c_id LOOP + BEGIN + SELECT id INTO v_return FROM email_outbox WHERE id = r_id.id FOR UPDATE NOWAIT; + EXIT; + EXCEPTION WHEN row_locked THEN + NULL; + END; + END LOOP; + RETURN v_return; +END; +` + +func NewOracleQueue() Queue { + db, err := sql.Open("godror", `user="system" password="password" connectString="oracle:1521/free"`) + if err != nil { + log.Fatal(err) + } + return &oracleQueue{db: db} +} + +const oracleTxKey txKey = "oracle_tx" + +func (q *oracleQueue) Begin(ctx context.Context) context.Context { + tx, err := q.db.Begin() + if err != nil { + log.Fatalf("failed to start transaction: %v", err) + } + return context.WithValue(ctx, oracleTxKey, tx) +} + +func (q *oracleQueue) Commit(ctx context.Context) { + tx, ok := ctx.Value(oracleTxKey).(*sql.Tx) + if !ok { + log.Fatal("cannot commit outside of a transaction") + } + err := tx.Commit() + if err != nil { + log.Fatalf("failed to commit transaction: %v", err) + } +} + +func (q *oracleQueue) Migrate() { + var err error + + row := q.db.QueryRow("SELECT COUNT(*) FROM user_tables WHERE table_name = 'EMAIL_OUTBOX'") + if row.Err() != nil { + log.Fatalf("failed to check if email outbox table exists: %v", row.Err()) + } + + var count int + err = row.Scan(&count) + if err != nil { + log.Fatalf("failed to scan table check: %v", err) + } + + if count == 0 { + _, err = q.db.Exec(createTable) + if err != nil { + log.Fatalf("failed to create outbox table: %v", err) + } + } + + _, err = q.db.Exec(createFunction) + if err != nil { + log.Fatalf("failed to create dequeue function: %v", err) + } +} + +func (q *oracleQueue) Truncate() { + _, err := q.db.Exec("TRUNCATE TABLE email_outbox") + if err != nil { + log.Fatalf("failed to truncate outbox table: %v", err) + } +} + +func (q *oracleQueue) Enqueue(ctx context.Context, email Email) { + _, err := q.db.Exec("INSERT INTO email_outbox (recipient, subject, body) VALUES (:1, :2, :3)", email.Recipient, email.Subject, email.Body) + if err != nil { + log.Fatalf("failed to insert email: %v", err) + } +} + +func (q *oracleQueue) Dequeue(ctx context.Context) optional.Optional[Email] { + tx, ok := ctx.Value(oracleTxKey).(*sql.Tx) + if !ok { + log.Fatal("cannot dequeue outside of a transaction") + } + + var id sql.NullInt64 + _, err := tx.Exec("BEGIN :1 := NEXT_EMAIL(); END;", sql.Out{Dest: &id}) + if err != nil { + log.Fatalf("failed to get next email id: %v", err) + } + + ok = id.Valid + if !ok { + return optional.Empty[Email]() + } + + var email Email + + row := tx.QueryRow("SELECT recipient, subject, body FROM email_outbox WHERE id = :1", id.Int64) + if row.Err() != nil { + log.Fatalf("failed to retrieve row: %v", err) + } + + err = row.Scan(&email.Recipient, &email.Subject, &email.Body) + if err != nil { + log.Fatalf("failed to scan row: %v", err) + } + + return optional.Of(&email) +} diff --git a/queue/postgres.go b/queue/postgres.go new file mode 100644 index 0000000..9f927b8 --- /dev/null +++ b/queue/postgres.go @@ -0,0 +1,101 @@ +package queue + +import ( + "context" + "database/sql" + "errors" + "log" + + _ "github.com/lib/pq" + "github.com/moveaxlab/go-optional" +) + +type postgresQueue struct { + db *sql.DB +} + +const createPostgresTable = ` +CREATE TABLE IF NOT EXISTS email_outbox ( + id SERIAL PRIMARY KEY, + recipient TEXT NOT NULL, + subject TEXT NOT NULL, + body TEXT NOT NULL +) +` + +func NewPostgresQueue() Queue { + db, err := sql.Open("postgres", "postgres://postgres:postgres@postgres:5432/test") + if err != nil { + log.Fatal(err) + } + return &postgresQueue{db: db} +} + +func (q *postgresQueue) Truncate() { + _, err := q.db.Exec("TRUNCATE TABLE email_outbox") + if err != nil { + log.Fatalf("failed to truncate outbox table: %v", err) + } +} + +const postgresTxKey txKey = "postgres_tx" + +func (q *postgresQueue) Begin(ctx context.Context) context.Context { + tx, err := q.db.Begin() + if err != nil { + log.Fatalf("failed to start transaction: %v", err) + } + return context.WithValue(ctx, postgresTxKey, tx) +} + +func (q *postgresQueue) Commit(ctx context.Context) { + tx, ok := ctx.Value(postgresTxKey).(*sql.Tx) + if !ok { + log.Fatal("cannot commit outside of a transaction") + } + err := tx.Commit() + if err != nil { + log.Fatalf("failed to commit transaction: %v", err) + } +} + +func (q *postgresQueue) Migrate() { + _, err := q.db.Exec(createPostgresTable) + if err != nil { + log.Fatalf("failed to create tables: %v", err) + } +} + +func (q *postgresQueue) Enqueue(ctx context.Context, email Email) { + _, err := q.db.Exec("INSERT INTO email_outbox (recipient, subject, body) VALUES ($1, $2, $3)", email.Recipient, email.Subject, email.Body) + if err != nil { + log.Fatalf("failed to insert email: %v", err) + } +} + +func (q *postgresQueue) Dequeue(ctx context.Context) optional.Optional[Email] { + tx, ok := ctx.Value(postgresTxKey).(*sql.Tx) + if !ok { + log.Fatal("cannot dequeue outside of a transaction") + } + + var err error + + var email Email + + row := tx.QueryRow("SELECT recipient, subject, body FROM email_outbox LIMIT 1 FOR UPDATE SKIP LOCKED") + if row.Err() != nil { + log.Fatalf("failed to retrieve row: %v", err) + } + + err = row.Scan(&email.Recipient, &email.Subject, &email.Body) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return optional.Empty[Email]() + } else { + log.Fatalf("failed to scan row: %v", err) + } + } + + return optional.Of(&email) +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 0000000..75b9aa7 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,110 @@ +package queue + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +func TestOracleQueue(t *testing.T) { + s := new(BaseTestSuite) + s.constructor = NewOracleQueue + suite.Run(t, s) +} + +func TestPostgresQueue(t *testing.T) { + s := new(BaseTestSuite) + s.constructor = NewPostgresQueue + suite.Run(t, s) +} + +type BaseTestSuite struct { + suite.Suite + + constructor func() Queue + q Queue +} + +func (s *BaseTestSuite) SetupSuite() { + s.q = s.constructor() + s.q.Migrate() +} + +func (s *BaseTestSuite) SetupTest() { + s.q.Truncate() +} + +func (s *BaseTestSuite) TestNothingToDo() { + ctx := s.q.Begin(context.Background()) + defer s.q.Commit(ctx) + + maybeEmail := s.q.Dequeue(ctx) + s.False(maybeEmail.IsPresent()) +} + +func (s *BaseTestSuite) TestBenchmark() { + for i := 0; i < 1000; i++ { + s.q.Enqueue(context.Background(), Email{ + Recipient: fmt.Sprintf("test_%d", i), + Subject: "hello", + Body: "world", + }) + } + + txs := make([]context.Context, 0, 100) + + for i := 0; i < 100; i++ { + ctx := s.q.Begin(context.Background()) + txs = append(txs, ctx) + + start := time.Now() + maybeEmail := s.q.Dequeue(ctx) + log.Printf("dequeue #%d done in %v", i, time.Since(start)) + s.True(maybeEmail.IsPresent()) + } + + for _, tx := range txs { + s.q.Commit(tx) + } +} + +func (s *BaseTestSuite) TestWorkDivision() { + s.q.Enqueue(context.Background(), Email{ + Recipient: "test1", + Subject: "hello", + Body: "world", + }) + s.q.Enqueue(context.Background(), Email{ + Recipient: "test2", + Subject: "hello", + Body: "world", + }) + + tx1 := s.q.Begin(context.Background()) + defer s.q.Commit(tx1) + + email1 := s.q.Dequeue(tx1) + s.True(email1.IsPresent()) + s.Equal("test1", email1.Get().Recipient) + s.Equal("hello", email1.Get().Subject) + s.Equal("world", email1.Get().Body) + + tx2 := s.q.Begin(context.Background()) + defer s.q.Commit(tx2) + + email2 := s.q.Dequeue(tx2) + s.True(email2.IsPresent()) + s.Equal("test2", email2.Get().Recipient) + s.Equal("hello", email2.Get().Subject) + s.Equal("world", email2.Get().Body) + + tx3 := s.q.Begin(context.Background()) + defer s.q.Commit(tx3) + + email3 := s.q.Dequeue(tx3) + s.False(email3.IsPresent()) +}