Skip to content

Commit 95eb0b8

Browse files
Add jobs.CreateTx to create a job inside a current transaction (#39)
1 parent e6219ed commit 95eb0b8

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

internal/testing/testing.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ func NewDB(t testing.TB, path string) *sql.DB {
5454
func NewQ(t testing.TB, opts goqite.NewOpts, path string) *goqite.Queue {
5555
t.Helper()
5656

57-
opts.DB = NewDB(t, path)
57+
if opts.DB == nil {
58+
opts.DB = NewDB(t, path)
59+
}
5860

5961
if opts.Name == "" {
6062
opts.Name = "test"

jobs/runner.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package jobs
1010
import (
1111
"bytes"
1212
"context"
13+
"database/sql"
1314
"encoding/gob"
1415
"errors"
1516
"fmt"
@@ -197,6 +198,7 @@ func (r *Runner) Register(name string, job Func) {
197198
r.jobs[name] = job
198199
}
199200

201+
// Create a message for the named job in the given queue.
200202
func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error {
201203
var buf bytes.Buffer
202204
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
@@ -205,6 +207,15 @@ func Create(ctx context.Context, q *goqite.Queue, name string, m []byte) error {
205207
return q.Send(ctx, goqite.Message{Body: buf.Bytes()})
206208
}
207209

210+
// CreateTx is like Create, but within an existing transaction.
211+
func CreateTx(ctx context.Context, tx *sql.Tx, q *goqite.Queue, name string, m []byte) error {
212+
var buf bytes.Buffer
213+
if err := gob.NewEncoder(&buf).Encode(message{Name: name, Message: m}); err != nil {
214+
return err
215+
}
216+
return q.SendTx(ctx, tx, goqite.Message{Body: buf.Bytes()})
217+
}
218+
208219
// logger matches the info level method from the slog.Logger.
209220
type logger interface {
210221
Info(msg string, args ...any)

jobs/runner_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
_ "github.com/mattn/go-sqlite3"
1313

1414
"github.com/maragudk/goqite"
15+
internalsql "github.com/maragudk/goqite/internal/sql"
1516
internaltesting "github.com/maragudk/goqite/internal/testing"
1617
"github.com/maragudk/goqite/jobs"
1718
)
@@ -121,6 +122,31 @@ func TestRunner_Start(t *testing.T) {
121122
})
122123
}
123124

125+
func TestCreateTx(t *testing.T) {
126+
t.Run("can create a job inside a transaction", func(t *testing.T) {
127+
db := internaltesting.NewDB(t, ":memory:")
128+
q := internaltesting.NewQ(t, goqite.NewOpts{DB: db}, ":memory:")
129+
r := jobs.NewRunner(jobs.NewRunnerOpts{Log: internaltesting.NewLogger(t), Queue: q})
130+
131+
var ran bool
132+
ctx, cancel := context.WithCancel(context.Background())
133+
r.Register("test", func(ctx context.Context, m []byte) error {
134+
ran = true
135+
is.Equal(t, "yo", string(m))
136+
cancel()
137+
return nil
138+
})
139+
140+
err := internalsql.InTx(db, func(tx *sql.Tx) error {
141+
return jobs.CreateTx(ctx, tx, q, "test", []byte("yo"))
142+
})
143+
is.NotError(t, err)
144+
145+
r.Start(ctx)
146+
is.True(t, ran)
147+
})
148+
}
149+
124150
func ExampleRunner_Start() {
125151
log := slog.Default()
126152

0 commit comments

Comments
 (0)