Skip to content

Commit

Permalink
implement job queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Michelle Laurenti committed May 6, 2024
1 parent 78f4932 commit 9385b3d
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 2 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: test

on:
push:
branches: [ "main" ]

jobs:
test:
runs-on: ubuntu-latest
container:
image: ghcr.io/moveaxlab/oracle-db-job-queue-devcontainer:latest
services:
postgres:
image: kartoza/postgis:15
env:
POSTGRES_USER: postgres
POSTGRES_PASS: postgres
POSTGRES_DBNAME: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

oracle:
image: container-registry.oracle.com/database/free:latest
ports:
- 1521:1521
- 5500:5500
env:
ORACLE_PWD: password

steps:
- uses: actions/checkout@v4
- run: go test -v ./...
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Oracle DB Job Queue

This repo contains an example implementation of a transactional job queue
using Oracle DB.

14 changes: 12 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,23 @@ services:
devcontainer:
image: ghcr.io/moveaxlab/oracle-db-job-queue-devcontainer:latest
volumes:
- .:/workspaces/cotral-go:cached
- .:/workspaces/oracle-db-job-queue:cached
command: sleep infinity

database:
oracle:
image: container-registry.oracle.com/database/free:latest
ports:
- 1521:1521
- 5500:5500
environment:
ORACLE_PWD: password

postgres:
image: kartoza/postgis:15
ports:
- 5432:5432
hostname: database
environment:
POSTGRES_USER: postgres
POSTGRES_PASS: postgres
POSTGRES_DBNAME: test
23 changes: 23 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module github.com/moveaxlab/oracle-db-job-queue

go 1.20

require (
github.com/godror/godror v0.42.2
github.com/moveaxlab/go-optional v1.0.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/godror/knownpb v0.1.1 // indirect
github.com/lib/pq v1.10.9
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
31 changes: 31 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
github.com/UNO-SOFT/zlog v0.8.1 h1:TEFkGJHtUfTRgMkLZiAjLSHALjwSBdw6/zByMC5GJt4=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/godror/godror v0.42.2 h1:TmOV0fr4jJxwDD6vWtSieQFOZ75LnSbkJaudfuJOsVQ=
github.com/godror/godror v0.42.2/go.mod h1:82Uc/HdjsFVnzR5c9Yf6IkTBalK80jzm/U6xojbTo94=
github.com/godror/knownpb v0.1.1 h1:A4J7jdx7jWBhJm18NntafzSC//iZDHkDi1+juwQ5pTI=
github.com/godror/knownpb v0.1.1/go.mod h1:4nRFbQo1dDuwKnblRXDxrfCFYeT4hjg3GjMqef58eRE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/moveaxlab/go-optional v1.0.1 h1:98USw1u4e9Idl+tYX5R4iqqlcOFbcb2flOfUZrXFGZ4=
github.com/moveaxlab/go-optional v1.0.1/go.mod h1:/yi0bSNha6vvDiYRrgqeAyTU1RA5YUeK3FCgIZnn5q8=
github.com/oklog/ulid/v2 v2.0.2 h1:r4fFzBm+bv0wNKNh5eXTwU7i85y5x+uwkxCUTNVQqLc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81 h1:6R2FC06FonbXQ8pK11/PDFY6N6LWlf9KlzibaCapmqc=
golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
24 changes: 24 additions & 0 deletions queue/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package queue

import (
"context"

"github.com/moveaxlab/go-optional"
)

type txKey string

type Email struct {
Recipient string
Subject string
Body string
}

type Queue interface {
Migrate()
Truncate()
Begin(context.Context) context.Context
Commit(context.Context)
Enqueue(context.Context, Email)
Dequeue(context.Context) optional.Optional[Email]
}
145 changes: 145 additions & 0 deletions queue/oracle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package queue

import (
"context"
"database/sql"
"log"

_ "github.com/godror/godror"
"github.com/moveaxlab/go-optional"
)

type oracleQueue struct {
db *sql.DB
}

const createTable = `
CREATE TABLE email_outbox (
id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
recipient VARCHAR2(255) NOT NULL,
subject VARCHAR2(255) NOT NULL,
body VARCHAR2(4000) NOT NULL
)
`

const createFunction = `
CREATE OR REPLACE FUNCTION next_email RETURN NUMBER IS
row_locked EXCEPTION;
PRAGMA EXCEPTION_INIT(row_locked, -54);
v_return NUMBER;
CURSOR c_id IS
SELECT id FROM email_outbox ORDER BY id;
BEGIN
FOR r_id IN c_id LOOP
BEGIN
SELECT id INTO v_return FROM email_outbox WHERE id = r_id.id FOR UPDATE NOWAIT;
EXIT;
EXCEPTION WHEN row_locked THEN
NULL;
END;
END LOOP;
RETURN v_return;
END;
`

func NewOracleQueue() Queue {
db, err := sql.Open("godror", `user="system" password="password" connectString="oracle:1521/free"`)
if err != nil {
log.Fatal(err)
}
return &oracleQueue{db: db}
}

const oracleTxKey txKey = "oracle_tx"

func (q *oracleQueue) Begin(ctx context.Context) context.Context {
tx, err := q.db.Begin()
if err != nil {
log.Fatalf("failed to start transaction: %v", err)
}
return context.WithValue(ctx, oracleTxKey, tx)
}

func (q *oracleQueue) Commit(ctx context.Context) {
tx, ok := ctx.Value(oracleTxKey).(*sql.Tx)
if !ok {
log.Fatal("cannot commit outside of a transaction")
}
err := tx.Commit()
if err != nil {
log.Fatalf("failed to commit transaction: %v", err)
}
}

func (q *oracleQueue) Migrate() {
var err error

row := q.db.QueryRow("SELECT COUNT(*) FROM user_tables WHERE table_name = 'EMAIL_OUTBOX'")
if row.Err() != nil {
log.Fatalf("failed to check if email outbox table exists: %v", row.Err())
}

var count int
err = row.Scan(&count)
if err != nil {
log.Fatalf("failed to scan table check: %v", err)
}

if count == 0 {
_, err = q.db.Exec(createTable)
if err != nil {
log.Fatalf("failed to create outbox table: %v", err)
}
}

_, err = q.db.Exec(createFunction)
if err != nil {
log.Fatalf("failed to create dequeue function: %v", err)
}
}

func (q *oracleQueue) Truncate() {
_, err := q.db.Exec("TRUNCATE TABLE email_outbox")
if err != nil {
log.Fatalf("failed to truncate outbox table: %v", err)
}
}

func (q *oracleQueue) Enqueue(ctx context.Context, email Email) {
_, err := q.db.Exec("INSERT INTO email_outbox (recipient, subject, body) VALUES (:1, :2, :3)", email.Recipient, email.Subject, email.Body)
if err != nil {
log.Fatalf("failed to insert email: %v", err)
}
}

func (q *oracleQueue) Dequeue(ctx context.Context) optional.Optional[Email] {
tx, ok := ctx.Value(oracleTxKey).(*sql.Tx)
if !ok {
log.Fatal("cannot dequeue outside of a transaction")
}

var id sql.NullInt64
_, err := tx.Exec("BEGIN :1 := NEXT_EMAIL(); END;", sql.Out{Dest: &id})
if err != nil {
log.Fatalf("failed to get next email id: %v", err)
}

ok = id.Valid
if !ok {
return optional.Empty[Email]()
}

var email Email

row := tx.QueryRow("SELECT recipient, subject, body FROM email_outbox WHERE id = :1", id.Int64)
if row.Err() != nil {
log.Fatalf("failed to retrieve row: %v", err)
}

err = row.Scan(&email.Recipient, &email.Subject, &email.Body)
if err != nil {
log.Fatalf("failed to scan row: %v", err)
}

return optional.Of(&email)
}
101 changes: 101 additions & 0 deletions queue/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package queue

import (
"context"
"database/sql"
"errors"
"log"

_ "github.com/lib/pq"
"github.com/moveaxlab/go-optional"
)

type postgresQueue struct {
db *sql.DB
}

const createPostgresTable = `
CREATE TABLE IF NOT EXISTS email_outbox (
id SERIAL PRIMARY KEY,
recipient TEXT NOT NULL,
subject TEXT NOT NULL,
body TEXT NOT NULL
)
`

func NewPostgresQueue() Queue {
db, err := sql.Open("postgres", "postgres://postgres:postgres@postgres:5432/test")
if err != nil {
log.Fatal(err)
}
return &postgresQueue{db: db}
}

func (q *postgresQueue) Truncate() {
_, err := q.db.Exec("TRUNCATE TABLE email_outbox")
if err != nil {
log.Fatalf("failed to truncate outbox table: %v", err)
}
}

const postgresTxKey txKey = "postgres_tx"

func (q *postgresQueue) Begin(ctx context.Context) context.Context {
tx, err := q.db.Begin()
if err != nil {
log.Fatalf("failed to start transaction: %v", err)
}
return context.WithValue(ctx, postgresTxKey, tx)
}

func (q *postgresQueue) Commit(ctx context.Context) {
tx, ok := ctx.Value(postgresTxKey).(*sql.Tx)
if !ok {
log.Fatal("cannot commit outside of a transaction")
}
err := tx.Commit()
if err != nil {
log.Fatalf("failed to commit transaction: %v", err)
}
}

func (q *postgresQueue) Migrate() {
_, err := q.db.Exec(createPostgresTable)
if err != nil {
log.Fatalf("failed to create tables: %v", err)
}
}

func (q *postgresQueue) Enqueue(ctx context.Context, email Email) {
_, err := q.db.Exec("INSERT INTO email_outbox (recipient, subject, body) VALUES ($1, $2, $3)", email.Recipient, email.Subject, email.Body)
if err != nil {
log.Fatalf("failed to insert email: %v", err)
}
}

func (q *postgresQueue) Dequeue(ctx context.Context) optional.Optional[Email] {
tx, ok := ctx.Value(postgresTxKey).(*sql.Tx)
if !ok {
log.Fatal("cannot dequeue outside of a transaction")
}

var err error

var email Email

row := tx.QueryRow("SELECT recipient, subject, body FROM email_outbox LIMIT 1 FOR UPDATE SKIP LOCKED")
if row.Err() != nil {
log.Fatalf("failed to retrieve row: %v", err)
}

err = row.Scan(&email.Recipient, &email.Subject, &email.Body)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return optional.Empty[Email]()
} else {
log.Fatalf("failed to scan row: %v", err)
}
}

return optional.Of(&email)
}
Loading

0 comments on commit 9385b3d

Please sign in to comment.