Skip to content

Commit

Permalink
feat(worker): Added worker pool health check probe
Browse files Browse the repository at this point in the history
  • Loading branch information
ekkinox committed Jul 17, 2024
1 parent 27e0de4 commit 4c9f729
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 14 deletions.
46 changes: 41 additions & 5 deletions worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [Logging](#logging)
* [Tracing](#tracing)
* [Metrics](#metrics)
* [Healthcheck](#healthcheck)
<!-- TOC -->

## Installation
Expand Down Expand Up @@ -135,10 +136,10 @@ import (
func main() {
// create the pool
pool, _ := worker.NewDefaultWorkerPoolFactory().Create(
worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second
worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers
worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second
worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second
worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers
worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second
worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max
)

// start the pool
Expand Down Expand Up @@ -297,4 +298,39 @@ func main() {
// start the pool
pool.Start(context.Background())
}
```
```

### Healthcheck

This module provides an [WorkerProbe](healthcheck/probe.go), compatible with
the [healthcheck module](https://github.com/ankorstore/yokai/tree/main/healthcheck):

```go
package main

import (
"context"

yokaihc "github.com/ankorstore/yokai/healthcheck"
"github.com/ankorstore/yokai/worker"
"github.com/ankorstore/yokai/worker/healthcheck"
)

func main() {
// create the pool
pool, _ := worker.NewDefaultWorkerPoolFactory().Create()

// create the checker with the worker probe
checker, _ := yokaihc.NewDefaultCheckerFactory().Create(
yokaihc.WithProbe(healthcheck.NewWorkerProbe(pool)),
)

// start the pool
pool.Start(context.Background())

// run the checker
res, _ := checker.Check(context.Background(), yokaihc.Readiness)
}
```

This probe is successful if all the executions status of the [WorkerPool](pool.go) are not in `error`.
7 changes: 4 additions & 3 deletions worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ module github.com/ankorstore/yokai/worker
go 1.20

require (
github.com/ankorstore/yokai/generate v1.1.0
github.com/ankorstore/yokai/generate v1.2.0
github.com/ankorstore/yokai/healthcheck v1.1.0
github.com/ankorstore/yokai/log v1.2.0
github.com/ankorstore/yokai/trace v1.2.0
github.com/prometheus/client_golang v1.19.0
github.com/ankorstore/yokai/trace v1.3.0
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
Expand Down
14 changes: 8 additions & 6 deletions worker/go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
github.com/ankorstore/yokai/generate v1.1.0 h1:tu3S+uEYh+2qNo8Rf/WxWneDjh49YgDPzSnJfF8JkXA=
github.com/ankorstore/yokai/generate v1.1.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4=
github.com/ankorstore/yokai/generate v1.2.0 h1:37siukjPGSS2kRnCnPhiuiF373+0tgwp0teXHnMsBhA=
github.com/ankorstore/yokai/generate v1.2.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4=
github.com/ankorstore/yokai/healthcheck v1.1.0 h1:PXkEccym7iaVnQltpM5UFi0Xl0n+5rZDzlQju6HmGms=
github.com/ankorstore/yokai/healthcheck v1.1.0/go.mod h1:IiYgjRa4G3OLZMwAuacuryZZAfDHsBH8PQoK4PgRdZ4=
github.com/ankorstore/yokai/log v1.2.0 h1:jiuDiC0dtqIGIOsFQslUHYoFJ1qjI+rOMa6dI1LBf2Y=
github.com/ankorstore/yokai/log v1.2.0/go.mod h1:MVvUcms1AYGo0BT6l88B9KJdvtK6/qGKdgyKVXfbmyc=
github.com/ankorstore/yokai/trace v1.2.0 h1:Jnl++IGNpDYumsZJXP3qjhMdvyHbejiajQwIlU604w0=
github.com/ankorstore/yokai/trace v1.2.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
github.com/ankorstore/yokai/trace v1.3.0 h1:0ji32oymIcxTmH5h6GRWLo5ypwBbWrZkXRf9rWF9070=
github.com/ankorstore/yokai/trace v1.3.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down Expand Up @@ -37,8 +39,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
Expand Down
57 changes: 57 additions & 0 deletions worker/healthcheck/probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package healthcheck

import (
"context"
"fmt"
"strings"

"github.com/ankorstore/yokai/healthcheck"
"github.com/ankorstore/yokai/worker"
)

// DefaultProbeName is the name of the worker probe.
const DefaultProbeName = "worker"

// WorkerProbe is a probe compatible with the [healthcheck] module.
//
// [healthcheck]: https://github.com/ankorstore/yokai/tree/main/healthcheck
type WorkerProbe struct {
name string
pool *worker.WorkerPool
}

// NewWorkerProbe returns a new [WorkerProbe].
func NewWorkerProbe(pool *worker.WorkerPool) *WorkerProbe {
return &WorkerProbe{
name: DefaultProbeName,
pool: pool,
}
}

// Name returns the name of the [WorkerProbe].
func (p *WorkerProbe) Name() string {
return p.name
}

// SetName sets the name of the [WorkerProbe].
func (p *WorkerProbe) SetName(name string) *WorkerProbe {
p.name = name

return p
}

// Check returns a successful [healthcheck.CheckerProbeResult] if the worker pool executions are all in running status.
func (p *WorkerProbe) Check(ctx context.Context) *healthcheck.CheckerProbeResult {
success := true
messages := []string{}

for name, execution := range p.pool.Executions() {
if execution.Status() == worker.Unknown || execution.Status() == worker.Error {
success = false
}

messages = append(messages, fmt.Sprintf("%s: %s", name, execution.Status()))
}

return healthcheck.NewCheckerProbeResult(success, strings.Join(messages, ", "))
}
72 changes: 72 additions & 0 deletions worker/healthcheck/probe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package healthcheck_test

import (
"context"
"testing"
"time"

"github.com/ankorstore/yokai/worker"
"github.com/ankorstore/yokai/worker/healthcheck"
"github.com/ankorstore/yokai/worker/testdata/workers"
"github.com/stretchr/testify/assert"
)

func TestWorkerProbe(t *testing.T) {
t.Parallel()

t.Run("empty pool", func(t *testing.T) {
t.Parallel()

pool := worker.NewWorkerPool()

probe := healthcheck.NewWorkerProbe(pool)

res := probe.Check(context.Background())

assert.True(t, res.Success)
assert.Empty(t, res.Message)
})

t.Run("success pool", func(t *testing.T) {
t.Parallel()

pool, err := worker.NewDefaultWorkerPoolFactory().Create(
worker.WithWorker(workers.NewClassicWorker()),
)
assert.NoError(t, err)

probe := healthcheck.NewWorkerProbe(pool)

err = pool.Start(context.Background())
assert.NoError(t, err)

time.Sleep(15 * time.Millisecond)

res := probe.Check(context.Background())

assert.True(t, res.Success)
assert.Equal(t, "ClassicWorker: success", res.Message)
})

t.Run("error pool", func(t *testing.T) {
t.Parallel()

pool, err := worker.NewDefaultWorkerPoolFactory().Create(
worker.WithWorker(workers.NewClassicWorker()),
worker.WithWorker(workers.NewErrorWorker()),
)
assert.NoError(t, err)

probe := healthcheck.NewWorkerProbe(pool)

err = pool.Start(context.Background())
assert.NoError(t, err)

time.Sleep(15 * time.Millisecond)

res := probe.Check(context.Background())

assert.False(t, res.Success)
assert.Equal(t, "ClassicWorker: success, ErrorWorker: error", res.Message)
})
}

0 comments on commit 4c9f729

Please sign in to comment.