diff --git a/worker/README.md b/worker/README.md index 8c6ba0ca..207f2386 100644 --- a/worker/README.md +++ b/worker/README.md @@ -16,6 +16,7 @@ * [Logging](#logging) * [Tracing](#tracing) * [Metrics](#metrics) + * [Healthcheck](#healthcheck) ## Installation @@ -135,10 +136,10 @@ import ( func main() { // create the pool pool, _ := worker.NewDefaultWorkerPoolFactory().Create( - worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second - worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers - worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second - worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max + worker.WithGlobalDeferredStartThreshold(1), // will defer all workers start of 1 second + worker.WithGlobalMaxExecutionsAttempts(2), // will run 2 times max failing workers + worker.WithWorker(workers.NewClassicWorker(), worker.WithDeferredStartThreshold(3)), // registers the ClassicWorker, with a deferred start of 3 second + worker.WithWorker(workers.NewCancellableWorker(), worker.WithMaxExecutionsAttempts(4)), // registers the CancellableWorker, with 4 runs max ) // start the pool @@ -297,4 +298,39 @@ func main() { // start the pool pool.Start(context.Background()) } -``` \ No newline at end of file +``` + +### Healthcheck + +This module provides an [WorkerProbe](healthcheck/probe.go), compatible with +the [healthcheck module](https://github.com/ankorstore/yokai/tree/main/healthcheck): + +```go +package main + +import ( + "context" + + yokaihc "github.com/ankorstore/yokai/healthcheck" + "github.com/ankorstore/yokai/worker" + "github.com/ankorstore/yokai/worker/healthcheck" +) + +func main() { + // create the pool + pool, _ := worker.NewDefaultWorkerPoolFactory().Create() + + // create the checker with the worker probe + checker, _ := yokaihc.NewDefaultCheckerFactory().Create( + yokaihc.WithProbe(healthcheck.NewWorkerProbe(pool)), + ) + + // start the pool + pool.Start(context.Background()) + + // run the checker + res, _ := checker.Check(context.Background(), yokaihc.Readiness) +} +``` + +This probe is successful if all the executions status of the [WorkerPool](pool.go) are not in `error`. diff --git a/worker/go.mod b/worker/go.mod index ea05f7f6..f7bbf9eb 100644 --- a/worker/go.mod +++ b/worker/go.mod @@ -3,10 +3,11 @@ module github.com/ankorstore/yokai/worker go 1.20 require ( - github.com/ankorstore/yokai/generate v1.1.0 + github.com/ankorstore/yokai/generate v1.2.0 + github.com/ankorstore/yokai/healthcheck v1.1.0 github.com/ankorstore/yokai/log v1.2.0 - github.com/ankorstore/yokai/trace v1.2.0 - github.com/prometheus/client_golang v1.19.0 + github.com/ankorstore/yokai/trace v1.3.0 + github.com/prometheus/client_golang v1.19.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/sdk v1.24.0 diff --git a/worker/go.sum b/worker/go.sum index 5a495009..795c82f0 100644 --- a/worker/go.sum +++ b/worker/go.sum @@ -1,9 +1,11 @@ -github.com/ankorstore/yokai/generate v1.1.0 h1:tu3S+uEYh+2qNo8Rf/WxWneDjh49YgDPzSnJfF8JkXA= -github.com/ankorstore/yokai/generate v1.1.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4= +github.com/ankorstore/yokai/generate v1.2.0 h1:37siukjPGSS2kRnCnPhiuiF373+0tgwp0teXHnMsBhA= +github.com/ankorstore/yokai/generate v1.2.0/go.mod h1:gqS/i20wnvCOhcXydYdiGcASzBaeuW7GK6YYg/kkuY4= +github.com/ankorstore/yokai/healthcheck v1.1.0 h1:PXkEccym7iaVnQltpM5UFi0Xl0n+5rZDzlQju6HmGms= +github.com/ankorstore/yokai/healthcheck v1.1.0/go.mod h1:IiYgjRa4G3OLZMwAuacuryZZAfDHsBH8PQoK4PgRdZ4= github.com/ankorstore/yokai/log v1.2.0 h1:jiuDiC0dtqIGIOsFQslUHYoFJ1qjI+rOMa6dI1LBf2Y= github.com/ankorstore/yokai/log v1.2.0/go.mod h1:MVvUcms1AYGo0BT6l88B9KJdvtK6/qGKdgyKVXfbmyc= -github.com/ankorstore/yokai/trace v1.2.0 h1:Jnl++IGNpDYumsZJXP3qjhMdvyHbejiajQwIlU604w0= -github.com/ankorstore/yokai/trace v1.2.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA= +github.com/ankorstore/yokai/trace v1.3.0 h1:0ji32oymIcxTmH5h6GRWLo5ypwBbWrZkXRf9rWF9070= +github.com/ankorstore/yokai/trace v1.3.0/go.mod h1:m7EL2MRBilgCtrly5gA4F0jkGSXR2EbG6LsotbTJ4nA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -37,8 +39,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= -github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= diff --git a/worker/healthcheck/probe.go b/worker/healthcheck/probe.go new file mode 100644 index 00000000..56d309a4 --- /dev/null +++ b/worker/healthcheck/probe.go @@ -0,0 +1,57 @@ +package healthcheck + +import ( + "context" + "fmt" + "strings" + + "github.com/ankorstore/yokai/healthcheck" + "github.com/ankorstore/yokai/worker" +) + +// DefaultProbeName is the name of the worker probe. +const DefaultProbeName = "worker" + +// WorkerProbe is a probe compatible with the [healthcheck] module. +// +// [healthcheck]: https://github.com/ankorstore/yokai/tree/main/healthcheck +type WorkerProbe struct { + name string + pool *worker.WorkerPool +} + +// NewWorkerProbe returns a new [WorkerProbe]. +func NewWorkerProbe(pool *worker.WorkerPool) *WorkerProbe { + return &WorkerProbe{ + name: DefaultProbeName, + pool: pool, + } +} + +// Name returns the name of the [WorkerProbe]. +func (p *WorkerProbe) Name() string { + return p.name +} + +// SetName sets the name of the [WorkerProbe]. +func (p *WorkerProbe) SetName(name string) *WorkerProbe { + p.name = name + + return p +} + +// Check returns a successful [healthcheck.CheckerProbeResult] if the worker pool executions are all in running status. +func (p *WorkerProbe) Check(ctx context.Context) *healthcheck.CheckerProbeResult { + success := true + messages := []string{} + + for name, execution := range p.pool.Executions() { + if execution.Status() == worker.Unknown || execution.Status() == worker.Error { + success = false + } + + messages = append(messages, fmt.Sprintf("%s: %s", name, execution.Status())) + } + + return healthcheck.NewCheckerProbeResult(success, strings.Join(messages, ", ")) +} diff --git a/worker/healthcheck/probe_test.go b/worker/healthcheck/probe_test.go new file mode 100644 index 00000000..d751da20 --- /dev/null +++ b/worker/healthcheck/probe_test.go @@ -0,0 +1,72 @@ +package healthcheck_test + +import ( + "context" + "testing" + "time" + + "github.com/ankorstore/yokai/worker" + "github.com/ankorstore/yokai/worker/healthcheck" + "github.com/ankorstore/yokai/worker/testdata/workers" + "github.com/stretchr/testify/assert" +) + +func TestWorkerProbe(t *testing.T) { + t.Parallel() + + t.Run("empty pool", func(t *testing.T) { + t.Parallel() + + pool := worker.NewWorkerPool() + + probe := healthcheck.NewWorkerProbe(pool) + + res := probe.Check(context.Background()) + + assert.True(t, res.Success) + assert.Empty(t, res.Message) + }) + + t.Run("success pool", func(t *testing.T) { + t.Parallel() + + pool, err := worker.NewDefaultWorkerPoolFactory().Create( + worker.WithWorker(workers.NewClassicWorker()), + ) + assert.NoError(t, err) + + probe := healthcheck.NewWorkerProbe(pool) + + err = pool.Start(context.Background()) + assert.NoError(t, err) + + time.Sleep(15 * time.Millisecond) + + res := probe.Check(context.Background()) + + assert.True(t, res.Success) + assert.Equal(t, "ClassicWorker: success", res.Message) + }) + + t.Run("error pool", func(t *testing.T) { + t.Parallel() + + pool, err := worker.NewDefaultWorkerPoolFactory().Create( + worker.WithWorker(workers.NewClassicWorker()), + worker.WithWorker(workers.NewErrorWorker()), + ) + assert.NoError(t, err) + + probe := healthcheck.NewWorkerProbe(pool) + + err = pool.Start(context.Background()) + assert.NoError(t, err) + + time.Sleep(15 * time.Millisecond) + + res := probe.Check(context.Background()) + + assert.False(t, res.Success) + assert.Equal(t, "ClassicWorker: success, ErrorWorker: error", res.Message) + }) +}