From 71893f9579392e287ddf6780efa4883d5a32211d Mon Sep 17 00:00:00 2001 From: Pavel Smejkal Date: Thu, 4 Jul 2024 17:59:08 +0200 Subject: [PATCH] feat: using async like contract --- adapter.go | 30 ---- looper.go | 8 +- orchestration.go | 329 +++++++++++++++++++++++------------------- orchestration_test.go | 88 ++++++++--- plumber.go | 12 +- plumber_test.go | 10 +- runner.go | 138 ++++++++++++++++++ runner_test.go | 42 ++++++ signal.go | 48 ++++++ 9 files changed, 490 insertions(+), 215 deletions(-) delete mode 100644 adapter.go create mode 100644 runner.go create mode 100644 runner_test.go create mode 100644 signal.go diff --git a/adapter.go b/adapter.go deleted file mode 100644 index e7e8523..0000000 --- a/adapter.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2024 Outreach Corporation. All Rights Reserved. - -// Description: This file contains a compatibility layer with https://github.com/getoutreach/gobox/blob/main/pkg/async/async.go -package plumber - -import ( - "context" - "io" -) - -// AsyncRunner provides a compatibility adapter with async.Runner interface -func AsyncRunner(runner interface { - Run(ctx context.Context) error -}) RunnerCloser { - type Closer interface { - Close(ctx context.Context) error - } - return GracefulRunner(func(ctx context.Context, ready ReadyFunc) error { - go ready() - return runner.Run(ctx) - }, func(ctx context.Context) error { - switch r := runner.(type) { - case Closer: - return r.Close(ctx) - case io.Closer: - return r.Close() - } - return nil - }) -} diff --git a/looper.go b/looper.go index ab81c9f..22c7f40 100644 --- a/looper.go +++ b/looper.go @@ -18,19 +18,19 @@ import "context" // .... // } type BaseLooper struct { - runner RunnerCloser + runner Runner } // Run executes runners workload. Pipelines are starting Run method in separated goroutine. // Runner must report its readiness using given callback -func (l *BaseLooper) Run(ctx context.Context, ready ReadyFunc) error { - return l.runner.Run(ctx, ready) +func (l *BaseLooper) Run(ctx context.Context) error { + return l.runner.Run(ctx) } // Close method triggers graceful shutdown on the task. It should block till task is properly closed. // When Close timeout is exceeded then given context is canceled. func (l *BaseLooper) Close(ctx context.Context) error { - return l.runner.Close(ctx) + return RunnerClose(ctx, l.runner) } func NewBaseLooper(looper func(ctx context.Context, loop *Loop) error) *BaseLooper { diff --git a/orchestration.go b/orchestration.go index 2a47090..98ab4a3 100644 --- a/orchestration.go +++ b/orchestration.go @@ -6,6 +6,7 @@ package plumber import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -28,16 +29,16 @@ func (f DoneFunc) Success() { f(nil) } -// RunnerCloser describes a runnable task -type RunnerCloser interface { - // Run executes runners workload. Pipelines are starting Run method in separated goroutine. - // Runner must report its readiness using given callback - Run(ctx context.Context, ready ReadyFunc) error +// // RunnerCloser describes a runnable task +// type RunnerCloser interface { +// // Run executes runners workload. Pipelines are starting Run method in separated goroutine. +// // Runner must report its readiness using given callback +// Run(ctx context.Context, ready ReadyFunc) error - // Close method triggers graceful shutdown on the task. It should block till task is properly closed. - // When Close timeout is exceeded then given context is canceled. - Close(ctx context.Context) error -} +// // Close method triggers graceful shutdown on the task. It should block till task is properly closed. +// // When Close timeout is exceeded then given context is canceled. +// Close(ctx context.Context) error +// } // ErrorCh is a channel of errors type ErrorCh chan error @@ -79,49 +80,17 @@ func (ec ErrorCh) Wait(ctx context.Context) []error { // CallbackFunc a callback function type for graceful runner type CallbackFunc func(context.Context) error -// gracefulRunner struct -type gracefulRunner struct { - run func(ctx context.Context, ready ReadyFunc) error - close func(ctx context.Context) error -} - -// GracefulRunner provides easy way to construct simple graceful runner providing run and close functions -func GracefulRunner( - runFunc func(ctx context.Context, ready ReadyFunc) error, - closeFunc func(ctx context.Context) error, -) RunnerCloser { - return &gracefulRunner{ - run: runFunc, - close: closeFunc, - } -} - -// Run executes internal run callback -// It partially implement RunnerCloser interface -func (r *gracefulRunner) Run(ctx context.Context, ready ReadyFunc) error { - return r.run(ctx, ready) -} - -// Close executes internal close callback -// It partially implement RunnerCloser interface -func (r *gracefulRunner) Close(ctx context.Context) error { - return r.close(ctx) -} - // Loop is a looper controlling struct type Loop struct { closeCh chan DoneFunc ready ReadyFunc } -// Run invokes given callback in the detached goroutine. The error returned from the callback will be returned when Close method is invoked -func (l *Loop) Run(run func(ready ReadyFunc) error) error { - return run(l.ready) -} - // Ready reports runner's readiness func (l *Loop) Ready() { - l.ready() + if l.ready != nil { + l.ready() + } } // Closing returns a channel that's closed when cancellation is requested @@ -156,7 +125,7 @@ func (l *Loop) Closing() <-chan DoneFunc { // }) // }) // }) -func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { +func Looper(run func(ctx context.Context, loop *Loop) error) Runner { var ( runOnce sync.Once closeOnce sync.Once @@ -165,11 +134,16 @@ func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { closeCh: make(chan DoneFunc, 1), } ) - return &gracefulRunner{ - run: func(ctx context.Context, ready ReadyFunc) error { + + signal := NewSignal() + + return NewRunner( + func(ctx context.Context) error { var err error runOnce.Do(func() { - l.ready = ready + l.ready = func() { + signal.Notify() + } defer closeOnce.Do(func() { close(l.closeCh) }) @@ -178,7 +152,7 @@ func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { }) return err }, - close: func(ctx context.Context) error { + WithClose(func(ctx context.Context) error { var ( errCh = make(chan error, 1) canceled DoneFunc = func(err error) { @@ -199,105 +173,113 @@ func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { case err := <-errCh: return err } - }, - } -} - -// Runner creates a RunnerCloser based on supplied run function with noop Clone method -func Runner(run func(ctx context.Context, ready ReadyFunc) error) RunnerCloser { - return &gracefulRunner{ - run: run, - close: func(ctx context.Context) error { - return nil - }, - } + }), + WithReady(signal), + ) } -// SimpleRunner creates a RunnerCloser based on supplied run function with noop Clone method, ready state is reported automatically -func SimpleRunner(run func(ctx context.Context) error) RunnerCloser { - return &gracefulRunner{ - run: func(ctx context.Context, ready ReadyFunc) error { - ready() - return run(ctx) - }, - close: func(ctx context.Context) error { - return nil - }, - } +// ReadyRunner creates a Runner based on supplied run function with callback to signal ready state +func ReadyRunner(run func(ctx context.Context, ready ReadyFunc) error) Runner { + signal := NewSignal() + return NewRunner(func(ctx context.Context) error { + return run(ctx, func() { + signal.Notify() + }) + }, WithReady(signal)) } // Closer creates a RunnerCloser based on supplied close function with noop Run method -func Closer(closeFunc CallbackFunc) RunnerCloser { - return &gracefulRunner{ - close: closeFunc, - run: func(ctx context.Context, ready ReadyFunc) error { - ready() - <-ctx.Done() - return nil - }, - } +func Closer(closeFunc CallbackFunc) Runner { + return NewRunner(func(ctx context.Context) error { + <-ctx.Done() + return nil + }, WithClose(closeFunc)) } // ParallelPipeline is a parallel runner closer orchestrator // The runners are started and closed in concurrent fashion. // The Run or Close are invoked independently type ParallelPipeline struct { - runners []RunnerCloser + runners []Runner wg sync.WaitGroup options *PipelineOptions closing atomic.Bool + + closed chan struct{} + closeOnce sync.Once + signal *Signal + errSignal *Signal } // Parallel creates a concurrent RunnerCloser executor. // When started it will execute runners Run and Close methods in parallel. // Run and Close will block till all runner's corresponding methods are returned. -func Parallel(runners ...RunnerCloser) *ParallelPipeline { +func Parallel(runners ...Runner) *ParallelPipeline { return &ParallelPipeline{ - runners: runners, - options: &PipelineOptions{}, + runners: runners, + options: &PipelineOptions{}, + signal: NewSignal(), + errSignal: NewSignal(), + closed: make(chan struct{}), } } +func (r *ParallelPipeline) Errored() <-chan struct{} { + return r.errSignal.C() +} + +func (r *ParallelPipeline) Ready() <-chan struct{} { + return r.signal.C() +} + // Run executes Run method on internal runners in parallel. // It partially implement RunnerCloser interface. // The it returns when all runner's Run methods are returned. -func (r *ParallelPipeline) Run(ctx context.Context, ready ReadyFunc) error { +func (r *ParallelPipeline) Run(ctx context.Context) error { var ( readyCh = make(chan struct{}, len(r.runners)) errs = make(ErrorCh, len(r.runners)) ) + r.wg.Add(len(r.runners)) + go func() { var counter = 0 - var isReady bool for { select { case <-readyCh: counter++ + case <-r.closed: + break case <-ctx.Done(): break } if counter == len(r.runners) { - isReady = true + r.signal.Notify() break } } - if isReady { - ready() - } r.wg.Wait() close(errs) }() - r.wg.Add(len(r.runners)) for _, runner := range r.runners { - go func(runner RunnerCloser) { + go func(runner Runner) { defer r.wg.Done() - err := runner.Run(ctx, func() { - // Signal that runner is ready - readyCh <- struct{}{} - }) - if r.options.ErrorSignaler != nil && !r.closing.Load() { - r.options.ErrorSignaler(err) + + // Wait for the runner to be ready + go func() { + select { + case <-RunnerReady(runner): + readyCh <- struct{}{} + case <-ctx.Done(): + } + }() + + go forwardErrorSignal(ctx, runner, r.closed, r.errSignal) + + err := runner.Run(ctx) + if err != nil && !r.closing.Load() { + r.errSignal.Notify() } errs <- err }(runner) @@ -310,6 +292,9 @@ func (r *ParallelPipeline) Run(ctx context.Context, ready ReadyFunc) error { // It partially implement RunnerCloser interface. // The it returns when all runner's Close methods are returned. func (r *ParallelPipeline) Close(ctx context.Context) error { + r.closeOnce.Do(func() { + close(r.closed) + }) r.closing.Store(true) var ( closeErrors = make(ErrorCh, len(r.runners)) @@ -320,7 +305,7 @@ func (r *ParallelPipeline) Close(ctx context.Context) error { var runner = r.runners[i] go func() { defer wg.Done() - if err := runner.Close(ctx); err != nil { + if err := RunnerClose(ctx, runner); err != nil { closeErrors <- err } }() @@ -340,36 +325,50 @@ func (r *ParallelPipeline) With(oo ...PipelineOption) *ParallelPipeline { // The runners are started and closed in serial fashion. // The Run or Close methods needs to return and only then next runner is evaluated type SerialPipeline struct { - runners []RunnerCloser - options *PipelineOptions - closing atomic.Bool + runners []Runner + options *PipelineOptions + closing atomic.Bool + closed chan struct{} + closeOnce sync.Once + signal *Signal + errSignal *Signal } // Pipeline creates a serial RunnerCloser executor. // When started it will execute Run method on given runners one by one with given order. // When closed it will execute Close method on given runners in revered order to achieve graceful shutdown sequence -func Pipeline(runners ...RunnerCloser) *SerialPipeline { +func Pipeline(runners ...Runner) *SerialPipeline { return &SerialPipeline{ - runners: runners, - options: &PipelineOptions{}, + runners: runners, + options: &PipelineOptions{}, + closed: make(chan struct{}), + signal: NewSignal(), + errSignal: NewSignal(), } } +func (r *SerialPipeline) Errored() <-chan struct{} { + return r.errSignal.C() +} + +func (r *SerialPipeline) Ready() <-chan struct{} { + return r.signal.C() +} + // Run executes Run method on internal runners one by one with given order. // It partially implement RunnerCloser interface -func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error { +func (r *SerialPipeline) Run(ctx context.Context) error { var ( wg sync.WaitGroup errs = make(ErrorCh, len(r.runners)) readyCh = make(chan struct{}, 1) + errored atomic.Bool ) - // started go routine - wg.Add(1) + // orchestration go routine go func() { defer wg.Done() var index = 0 - var errored atomic.Bool for { select { case _, ok := <-readyCh: @@ -377,48 +376,70 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error { if !ok { return } + // when all runners are running we cal report that pipeline is ready if index == len(r.runners) { - ready() - return - } - // when we are closing we need to mark remaining workers as finished - if r.closing.Load() || errored.Load() { + r.signal.Notify() return } - runner := r.runners[index] - index++ - // runner go routine - wg.Add(1) - go func() { - var once sync.Once - defer wg.Done() - err := runner.Run(ctx, func() { - // worker is ready we can start with next one - once.Do(func() { - if !r.closing.Load() && !errored.Load() { + + // We need to check those again since select does not guarantee the priority + select { + case <-r.closed: + case <-ctx.Done(): + default: + runner := r.runners[index] + index++ + + wg.Add(1) + // runner go routine + go func() { + defer wg.Done() + if errored.Load() && r.closing.Load() { + return + } + + wg.Add(1) + // Wait for the runner to become ready + // ready checking goroutine + go func() { + defer wg.Done() + select { + case <-r.closed: + case <-ctx.Done(): + case <-RunnerReady(runner): readyCh <- struct{}{} } - }) - }) - if r.options.ErrorSignaler != nil && !r.closing.Load() { - r.options.ErrorSignaler(err) - } - if err != nil { - errored.Store(true) - close(readyCh) - errs <- err - } - }() + }() + + go forwardErrorSignal(ctx, runner, r.closed, r.errSignal) + + err := runner.Run(ctx) + if err != nil && !r.closing.Load() { + fmt.Println("ERROER", err) + r.errSignal.Notify() + } + if err != nil { + errored.Store(true) + errs <- err + } + }() + } + case <-r.closed: + return case <-ctx.Done(): return } } }() // Lets start first worker + wg.Add(1) + readyCh <- struct{}{} wg.Wait() + close(errs) + close(readyCh) return errors.Join(errs.Errors()...) } @@ -426,14 +447,17 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error { // Close executes Close method on internal runners in revered order to achieve graceful shutdown sequence // It partially implement RunnerCloser interface func (r *SerialPipeline) Close(ctx context.Context) error { - r.closing.Store(true) var closeErrors []error - for i := len(r.runners) - 1; i >= 0; i-- { - var runner = r.runners[i] - if err := runner.Close(ctx); err != nil { - closeErrors = append(closeErrors, err) + r.closeOnce.Do(func() { + close(r.closed) + for i := len(r.runners) - 1; i >= 0; i-- { + var runner = r.runners[i] + if err := RunnerClose(ctx, runner); err != nil { + closeErrors = append(closeErrors, err) + } } - } + }) + r.closing.Store(true) return errors.Join(closeErrors...) } @@ -444,7 +468,7 @@ func (r *SerialPipeline) With(oo ...PipelineOption) *SerialPipeline { } // Start will execute given runner with optional configuration -func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { +func Start(ctx context.Context, runner Runner, opts ...Option) error { var ( options = &Options{} ) @@ -476,7 +500,7 @@ func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { defer closeCancel() // go routine handling close async so it can be canceled go func() { - err := runner.Close(closeCtx) + err := RunnerClose(closeCtx, runner) startCancel(closeCtx.Err()) errorCh <- err closeChannels() @@ -507,12 +531,21 @@ func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { closers.start(errorCh, &chanWriters, options.closers...) + if propagator, ok := runner.(ErrorNotifier); ok { + go func() { + select { + case <-startCtx.Done(): + return + case <-propagator.Errored(): + go terminate(ctx, true) + } + }() + } + // runner go routine go func() { defer chanWriters.Done() - err := runner.Run(startCtx, func() { - // We might expose ready callback later via pipeline options - }) + err := runner.Run(startCtx) if err != nil { // runner sequence had a problems calling close errorCh <- err diff --git a/orchestration_test.go b/orchestration_test.go index 8f9e775..f09a9c2 100644 --- a/orchestration_test.go +++ b/orchestration_test.go @@ -11,9 +11,8 @@ import ( "gotest.tools/v3/assert" ) -func reportingRunner(name string, fce ...func()) plumber.RunnerCloser { - return plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { - ready() +func reportingRunner(name string, fce ...func()) plumber.Runner { + return plumber.GracefulRunner(func(ctx context.Context) error { fmt.Println("runner", name, "started") for _, f := range fce { f() @@ -26,9 +25,8 @@ func reportingRunner(name string, fce ...func()) plumber.RunnerCloser { } // nolint: unparam //Why: not yet -func reportingBlockingRunner(name string, fce ...func()) plumber.RunnerCloser { - return plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { - ready() +func reportingBlockingRunner(name string, fce ...func()) plumber.Runner { + return plumber.GracefulRunner(func(ctx context.Context) error { fmt.Println("runner", name, "started") for _, f := range fce { f() @@ -41,14 +39,14 @@ func reportingBlockingRunner(name string, fce ...func()) plumber.RunnerCloser { }) } -func erroringRunner(name string) plumber.RunnerCloser { - return plumber.SimpleRunner(func(ctx context.Context) error { +func erroringRunner(name string) plumber.Runner { + return plumber.NewRunner(func(ctx context.Context) error { return errors.New("runner " + name + " failed") }) } // nolint: unused //Why: not yet -func erroringCloser(name string) plumber.RunnerCloser { +func erroringCloser(name string) plumber.Runner { return plumber.Closer(func(ctx context.Context) error { return errors.New("runner " + name + " failed") }) @@ -100,16 +98,13 @@ func TestPipelineErrors(t *testing.T) { func TestPipelineSignalerClosing(t *testing.T) { ctx := context.Background() - signaler := plumber.NewErrorSignaler() - err := plumber.Start(ctx, plumber.Pipeline( reportingRunner("runner 1", func() { // Let other job to start as well time.Sleep(10 * time.Millisecond) }), - plumber.Runner(func(ctx context.Context, ready plumber.ReadyFunc) error { - ready() + plumber.NewRunner(func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() @@ -117,9 +112,8 @@ func TestPipelineSignalerClosing(t *testing.T) { return errors.New("runner failed") } }), - ).With(plumber.Signaler(signaler)), + ), plumber.TTL(50*time.Millisecond), - plumber.Closing(signaler), ) // Context should be canceled since: // - first runner immediately closes @@ -130,15 +124,68 @@ func TestPipelineSignalerClosing(t *testing.T) { assert.Assert(t, errors.Is(err, context.Canceled)) } -func TestPipelineRunnerContextCanceled(t *testing.T) { +func TestPipelineCloseOnError(t *testing.T) { ctx := context.Background() - signaler := plumber.NewErrorSignaler() + closed := make(chan time.Time, 3) + started := time.Now() + + reportingClose := func(ctx context.Context) error { + closed <- time.Now() + return nil + } + + err := plumber.Start(ctx, + plumber.Pipeline( + plumber.NewRunner(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(25 * time.Second): + return errors.New("runner 1 timeout") + } + }, plumber.WithClose(reportingClose)), + plumber.Pipeline( + plumber.NewRunner(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(25 * time.Second): + return errors.New("runner 3 timeout") + } + }, plumber.WithClose(reportingClose)), + plumber.NewRunner(func(ctx context.Context) error { + return errors.New("runner 2 failed") + }, plumber.WithClose(reportingClose)), + ), + ), + plumber.TTL(3*time.Second), + ) + close(closed) + // Context should be canceled since: + // - first runner immediately closes + // - close sequence is initiated + // - closing is immediately done + // - start context is canceled + fmt.Println(err) + n := 0 + for tm := range closed { + n++ + //time.Now(). + diff := tm.Sub(started) + fmt.Println(diff) + assert.Assert(t, diff < 100*time.Millisecond) + } + assert.Equal(t, n, 3) +} + +func TestPipelineRunnerContextCanceled(t *testing.T) { + ctx := context.Background() err := plumber.Start(ctx, plumber.Pipeline( reportingBlockingRunner("runner 1"), - plumber.Runner(func(ctx context.Context, ready plumber.ReadyFunc) error { + plumber.ReadyRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { ready() select { case <-ctx.Done(): @@ -147,10 +194,9 @@ func TestPipelineRunnerContextCanceled(t *testing.T) { return errors.New("runner failed") } }), - ).With(plumber.Signaler(signaler)), + ), plumber.TTL(10*time.Millisecond), plumber.CloseTimeout(10*time.Millisecond), - plumber.Closing(signaler), ) assert.ErrorContains(t, err, "context canceled") } @@ -158,7 +204,7 @@ func TestPipelineRunnerContextCanceled(t *testing.T) { func TestParallelPipeline(t *testing.T) { ctx := context.Background() - runner := plumber.Runner(func(ctx context.Context, ready plumber.ReadyFunc) error { + runner := plumber.ReadyRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { ready() time.Sleep(10 * time.Millisecond) return nil diff --git a/plumber.go b/plumber.go index 45d8385..e121490 100644 --- a/plumber.go +++ b/plumber.go @@ -209,7 +209,7 @@ func (d *D[T]) Wrap(wrappers ...func(T) T) *D[T] { // It is meant to be supplied into the Pipeline() type R[T any] struct { D[T] - runnable RunnerCloser + runnable Runner } // Resolve returns a callback providing a resolution orchestrator @@ -223,14 +223,14 @@ func (r *R[T]) Resolve(callback func(*ResolutionR[T])) *R[T] { } // Run executes Run method on value and satisfies partially the RunnerCloser interface -func (r *R[T]) Run(ctx context.Context, ready ReadyFunc) error { +func (r *R[T]) Run(ctx context.Context) error { if err := r.D.Error(); err != nil { return err } if r.runnable == nil { return fmt.Errorf("Runnable %s not resolved", &r.D) } - return r.runnable.Run(ctx, ready) + return r.runnable.Run(ctx) } // Close executes Close method on value and satisfies partially the RunnerCloser interface @@ -241,7 +241,7 @@ func (r *R[T]) Close(ctx context.Context) error { if r.runnable == nil { return fmt.Errorf("Runnable %s not resolved", &r.D) } - return r.runnable.Close(ctx) + return RunnerClose(ctx, r.runnable) } // Resolution is value resolution orchestrator @@ -287,14 +287,14 @@ func (rr *ResolutionR[T]) Error(err error) { // Resolved ends the resolution with given runnable value // This instance will be executed once a R included int the started pipeline -func (rr *ResolutionR[T]) Resolve(v RunnerCloser) { +func (rr *ResolutionR[T]) Resolve(v Runner) { rr.resolution.Resolve(v.(T)) rr.r.runnable = v } // ResolveAdapter ends the resolution with given value and runnable adapter // that will be executed once a R is included int the started pipeline -func (rr *ResolutionR[T]) ResolveAdapter(v T, runnable RunnerCloser) { +func (rr *ResolutionR[T]) ResolveAdapter(v T, runnable Runner) { rr.resolution.Resolve(v) rr.r.runnable = runnable } diff --git a/plumber_test.go b/plumber_test.go index 087e776..f9305e2 100644 --- a/plumber_test.go +++ b/plumber_test.go @@ -236,8 +236,7 @@ func TestExamplePipeline(t *testing.T) { fmt.Println("pipeline is closing") return nil }), - plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { - ready() + plumber.GracefulRunner(func(ctx context.Context) error { fmt.Println("Task 1 starting") <-ctx.Done() return nil @@ -247,12 +246,12 @@ func TestExamplePipeline(t *testing.T) { }), // The parallel pipeline all task are stared and closed in parallel. plumber.Parallel( - plumber.SimpleRunner(func(ctx context.Context) error { + plumber.NewRunner(func(ctx context.Context) error { fmt.Println("Task 2 starting") <-ctx.Done() return nil }), - plumber.SimpleRunner(func(ctx context.Context) error { + plumber.NewRunner(func(ctx context.Context) error { fmt.Println("Task 3 starting") <-ctx.Done() return nil @@ -325,9 +324,8 @@ func fitHTTP(a *App) { http.HandleFunc("/hello", a.HTTP.HelloHandler.Instance()) http.HandleFunc("/echo", a.HTTP.EchoHandler.Instance()) - r.ResolveAdapter(httpServer, plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error { + r.ResolveAdapter(httpServer, plumber.GracefulRunner(func(ctx context.Context) error { // ready is async to give time to server start - go ready() fmt.Println("HTTP server is starting") if err := httpServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { err = fmt.Errorf("HTTP server ListenAndServe Error: %w", err) diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..897c0bf --- /dev/null +++ b/runner.go @@ -0,0 +1,138 @@ +// Copyright 2024 Outreach Corporation. All Rights Reserved. + +// Description: Runner interfaces +package plumber + +import ( + "context" +) + +// Runner describes basic runnable unit. Runner can be started. +// This interface is used as a common denominator used in api but, +// it is more that recommended to implement other interface Closable +// and optionally Readyable. +type Runner interface { + Run(ctx context.Context) error +} + +// Readyable describes Runner that can signal whether it is ready. +// This is useful when Runners needs to be execute with the Pipeline sequentially. +type Readyable interface { + Ready() <-chan struct{} +} + +// Closeble describes Runner that can be graceful closed. Close method must be idempotent. +// Once called runner is required exit from main Run method within defined duration otherwise run context will be canceled. +// Close can block for configured duration. When exceeded close context is canceled +type Closeble interface { + Close(ctx context.Context) error +} + +// ErrorNotifier describes Runner that can report that error has occurred before it returns from the main Run method. +// This is signal might bubble up the execution tree and on top it might be used to start Close sequence. +type ErrorNotifier interface { + Errored() <-chan struct{} +} + +// SmartRunner implements all interfaces that makes the runner good citizen +type SmartRunner interface { + Runner + Readyable + Closeble +} + +// RunnerOptions holds runner optional callbacks +type RunnerOptions struct { + close func(ctx context.Context) error + ready func() <-chan struct{} +} + +func (o *RunnerOptions) apply(opts ...RunnerOption) { + o.ready = func() <-chan struct{} { + return closedCh + } + for _, op := range opts { + op(o) + } +} + +// RunnerOption is option pattern function +type RunnerOption func(*RunnerOptions) + +func WithReady(s *Signal) RunnerOption { + return func(ro *RunnerOptions) { + ro.ready = func() <-chan struct{} { + return s.C() + } + } +} + +func WithClose(closeFunc func(context.Context) error) RunnerOption { + return func(ro *RunnerOptions) { + ro.close = closeFunc + } +} + +// runner represent a struct that complies with Runner interfaces +type runner struct { + options RunnerOptions + run func(ctx context.Context) error +} + +// NewRunner returns an instance of the runner. Optionally supplied options might redefine other Runner method Close and Ready +func NewRunner(run func(ctx context.Context) error, opts ...RunnerOption) SmartRunner { + r := &runner{ + run: run, + } + r.options.apply(opts...) + return r +} + +// Ready signals that runner is ready +func (r *runner) Ready() <-chan struct{} { + return r.options.ready() +} + +// Run executes a task +func (r *runner) Run(ctx context.Context) error { + return r.run(ctx) +} + +// Close runner. Once called runner is required to exit from main Run method +// within defined duration otherwise run context will be canceled. +// Close can block for configured duration. When exceeded close context is canceled +func (r *runner) Close(ctx context.Context) error { + if r.options.close != nil { + return r.options.close(ctx) + } + return nil +} + +// GracefulRunner is runner supporting Run and Close methods +func GracefulRunner(run, closeFn func(ctx context.Context) error) Runner { + return NewRunner(run, WithClose(closeFn)) +} + +// closedCh is a ready made closed channel +var closedCh = func() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +}() + +// RunnerReady return channel that can be used to check if runner is ready. +// When channel is closed runner can be considered ready. +func RunnerReady(runner Runner) <-chan struct{} { + if r, ok := runner.(Readyable); ok { + return r.Ready() + } + return closedCh +} + +// RunnerClose calls Close method on given Runner when supported +func RunnerClose(ctx context.Context, runner Runner) error { + if r, ok := runner.(Closeble); ok { + return r.Close(ctx) + } + return nil +} diff --git a/runner_test.go b/runner_test.go new file mode 100644 index 0000000..6a80b3d --- /dev/null +++ b/runner_test.go @@ -0,0 +1,42 @@ +package plumber_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/getoutreach/plumber" +) + +func TestRunner(t *testing.T) { + signal := plumber.NewSignal() + + r := plumber.NewRunner( + func(ctx context.Context) error { + return nil + }, + plumber.WithClose(func(ctx context.Context) error { + return nil + }), + plumber.WithReady(signal), + ) + + ctx := context.Background() + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + + go func() { + time.Sleep(1 * time.Second) + signal.Notify() + }() + + defer cancel() + + select { + case <-ctx.Done(): + fmt.Println("Context") + case <-plumber.RunnerReady(r): + fmt.Println("Ready") + } +} diff --git a/signal.go b/signal.go new file mode 100644 index 0000000..bade3ef --- /dev/null +++ b/signal.go @@ -0,0 +1,48 @@ +// Copyright 2024 Outreach Corporation. All Rights Reserved. + +// Description: Signal to notify when certain even occures. + +package plumber + +import ( + "context" + "sync" +) + +// Signal is and helper struct that broadcast a notification when certain even occures. +type Signal struct { + sync.Once + ch chan struct{} +} + +func (s *Signal) Notify() { + s.Once.Do( + func() { + close(s.ch) + }, + ) +} + +func (s *Signal) C() <-chan struct{} { + return s.ch +} + +func NewSignal() *Signal { + return &Signal{ + ch: make(chan struct{}), + } +} + +// forwardErrorSignal forward information that a Runner errored to given signal instance +func forwardErrorSignal(ctx context.Context, runner Runner, closed <-chan struct{}, signal *Signal) { + if notifier, ok := runner.(ErrorNotifier); ok { + select { + case <-closed: + return + case <-ctx.Done(): + return + case <-notifier.Errored(): + signal.Notify() + } + } +}