Skip to content

Commit 6de7171

Browse files
Add job runner timeout option (#40)
Otherwise, the underlying queue could have a much smaller timeout, and the runner wouldn't work properly. Also: - Remove the `done` channel, we can just use `jobCtx` directly. - Some more doc comments. - Add logging before job run. - Add a test for the extension.
1 parent 95eb0b8 commit 6de7171

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

jobs/runner.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
// on the underlying queue.
33
//
44
// It provides:
5-
// - Limit on how many jobs can be run simultaneously
6-
// - Automatic message timeout extension while the job is running
7-
// - Graceful shutdown
5+
// - Limit on how many jobs can be run simultaneously
6+
// - Automatic message timeout extension while the job is running
7+
// - Graceful shutdown
88
package jobs
99

1010
import (
@@ -22,7 +22,12 @@ import (
2222
"github.com/maragudk/goqite"
2323
)
2424

25+
// NewRunnerOpts are options for [NewRunner].
26+
// - [NewRunner.Extend] is by how much a job message timeout is extended each time while the job is running.
27+
// - [NewRunnerOpts.Limit] is for how many jobs can be run simultaneously.
28+
// - [NewRunner.PollInterval] is how often the runner polls the queue for new messages.
2529
type NewRunnerOpts struct {
30+
Extend time.Duration
2631
Limit int
2732
Log logger
2833
PollInterval time.Duration
@@ -42,7 +47,12 @@ func NewRunner(opts NewRunnerOpts) *Runner {
4247
opts.PollInterval = 100 * time.Millisecond
4348
}
4449

50+
if opts.Extend == 0 {
51+
opts.Extend = 5 * time.Second
52+
}
53+
4554
return &Runner{
55+
extend: opts.Extend,
4656
jobCountLimit: opts.Limit,
4757
jobs: make(map[string]Func),
4858
log: opts.Log,
@@ -52,6 +62,7 @@ func NewRunner(opts NewRunnerOpts) *Runner {
5262
}
5363

5464
type Runner struct {
65+
extend time.Duration
5566
jobCount int
5667
jobCountLimit int
5768
jobCountLock sync.RWMutex
@@ -153,25 +164,24 @@ func (r *Runner) receiveAndRun(ctx context.Context, wg *sync.WaitGroup) {
153164
defer cancel()
154165

155166
// Extend the job message while the job is running
156-
done := make(chan struct{}, 1)
157-
defer func() {
158-
done <- struct{}{}
159-
}()
160-
161167
go func() {
168+
// Start by sleeping so we don't extend immediately
169+
time.Sleep(r.extend - r.extend/5)
162170
for {
163171
select {
164-
case <-done:
172+
case <-jobCtx.Done():
165173
return
166174
default:
167-
if err := r.queue.Extend(jobCtx, m.ID, 5*time.Second); err != nil {
175+
r.log.Info("Extending message timeout", "name", jm.Name)
176+
if err := r.queue.Extend(jobCtx, m.ID, r.extend); err != nil {
168177
r.log.Info("Error extending message timeout", "error", err)
169178
}
170-
time.Sleep(3 * time.Second)
179+
time.Sleep(r.extend - r.extend/5)
171180
}
172181
}
173182
}()
174183

184+
r.log.Info("Running job", "name", jm.Name)
175185
before := time.Now()
176186
if err := job(jobCtx, jm.Message); err != nil {
177187
r.log.Info("Error running job", "name", jm.Name, "error", err)
@@ -183,7 +193,7 @@ func (r *Runner) receiveAndRun(ctx context.Context, wg *sync.WaitGroup) {
183193
deleteCtx, cancel := context.WithTimeout(context.Background(), time.Second)
184194
defer cancel()
185195
if err := r.queue.Delete(deleteCtx, m.ID); err != nil {
186-
r.log.Info("Error deleting job from queue", "error", err)
196+
r.log.Info("Error deleting job from queue, it will be retried", "error", err)
187197
}
188198
}()
189199
}

jobs/runner_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,26 @@ func TestRunner_Start(t *testing.T) {
120120

121121
r.Start(ctx)
122122
})
123+
124+
t.Run("extends a job's timeout if it takes longer than the default timeout", func(t *testing.T) {
125+
q, r := newRunner(t)
126+
127+
var runCount int
128+
ctx, cancel := context.WithCancel(context.Background())
129+
r.Register("test", func(ctx context.Context, m []byte) error {
130+
runCount++
131+
// This is more than the default timeout, so it should extend
132+
time.Sleep(150 * time.Millisecond)
133+
cancel()
134+
return nil
135+
})
136+
137+
err := jobs.Create(ctx, q, "test", []byte("yo"))
138+
is.NotError(t, err)
139+
140+
r.Start(ctx)
141+
is.Equal(t, 1, runCount)
142+
})
123143
}
124144

125145
func TestCreateTx(t *testing.T) {
@@ -200,7 +220,7 @@ func ExampleRunner_Start() {
200220
func newRunner(t *testing.T) (*goqite.Queue, *jobs.Runner) {
201221
t.Helper()
202222

203-
q := internaltesting.NewQ(t, goqite.NewOpts{}, ":memory:")
204-
r := jobs.NewRunner(jobs.NewRunnerOpts{Log: internaltesting.NewLogger(t), Queue: q})
223+
q := internaltesting.NewQ(t, goqite.NewOpts{Timeout: 100 * time.Millisecond}, ":memory:")
224+
r := jobs.NewRunner(jobs.NewRunnerOpts{Limit: 10, Log: internaltesting.NewLogger(t), Queue: q, Extend: 100 * time.Millisecond})
205225
return q, r
206226
}

0 commit comments

Comments
 (0)