From 5efd0193ca0aa7d99a8fd626e2962793f1c7aaaf Mon Sep 17 00:00:00 2001 From: Pavel Smejkal Date: Tue, 7 May 2024 14:21:52 +0200 Subject: [PATCH] fix: refactoring --- closer.go | 8 ++++ orchestration.go | 104 +++++++++++++++++++++++++++-------------------- 2 files changed, 69 insertions(+), 43 deletions(-) diff --git a/closer.go b/closer.go index e92098f..79e041e 100644 --- a/closer.go +++ b/closer.go @@ -27,6 +27,14 @@ func (o *Options) Apply(oo ...Option) *Options { return o } +// closeContext returns ready made close context with or without timeout +func (o *Options) closeContext(ctx context.Context) (context.Context, context.CancelFunc) { + if o.CloseTimeout > 0 { + return context.WithTimeout(ctx, o.CloseTimeout) + } + return context.WithCancel(ctx) +} + // Closer registers new closer func (o *Options) Closer(closer func(context.Context) error) *Options { o.closers = append(o.closers, closer) diff --git a/orchestration.go b/orchestration.go index 9c2fc34..b6c7423 100644 --- a/orchestration.go +++ b/orchestration.go @@ -429,45 +429,40 @@ func (r *SerialPipeline) With(oo ...PipelineOption) *SerialPipeline { // Start will execute given runner with optional configuration func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { var ( - options = &Options{} - closerCancel func() + options = &Options{} ) startCtx, startCancel := context.WithCancelCause(ctx) defer startCancel(nil) - var erg, closerCtx = errgroup.WithContext(startCtx) - closerCtx, closerCancel = context.WithCancel(closerCtx) - defer closerCancel() + closers := newCloserContext(startCtx) + defer closers.cancel() var ( - errorCh = make(ErrorCh, 3) - closeCh = make(chan struct{}, 1) - once sync.Once - closerGroup sync.WaitGroup - closeRunner = func(ctx context.Context, initiateClose bool) { + errorCh = make(ErrorCh, 3) + closeCh = make(chan struct{}, 1) + once sync.Once + chanWriters sync.WaitGroup + closeChannels = func() { + closers.cancel() + close(closeCh) + chanWriters.Wait() + // We can really terminate since all channel writers are done + close(errorCh) + } + terminate = func(ctx context.Context, initiateClose bool) { once.Do(func() { if !initiateClose { - closerCancel() - closerGroup.Wait() - // We can really terminate since all channel writers are done - close(errorCh) + closeChannels() return } - closeCtx, closeCancel := context.WithCancel(ctx) + closeCtx, closeCancel := options.closeContext(ctx) defer closeCancel() - if options.CloseTimeout > 0 { - closeCtx, closeCancel = context.WithTimeout(closeCtx, options.CloseTimeout) - defer closeCancel() - } + // go routine handling close async so it can be canceled go func() { err := runner.Close(closeCtx) startCancel(closeCtx.Err()) errorCh <- err - closerCancel() - close(closeCh) - closerGroup.Wait() - // We can really terminate since all channel writers are done - close(errorCh) + closeChannels() }() select { // Wait for close to finish @@ -484,34 +479,20 @@ func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { }) } ) - closerGroup.Add(2) + chanWriters.Add(2) options.close = func() { go func() { - closeRunner(ctx, true) + terminate(ctx, true) }() } options.Apply(opts...) - for _, closer := range options.closers { - closer := closer - erg.Go(func() error { - return closer(closerCtx) - }) - } - - // closers go routine - go func() { - defer closerGroup.Done() - err := erg.Wait() - if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - errorCh <- err - } - }() + closers.start(errorCh, &chanWriters, options.closers...) // runner go routine go func() { - defer closerGroup.Done() + defer chanWriters.Done() err := runner.Run(startCtx, func() { // We might expose ready callback later via pipeline options }) @@ -519,12 +500,49 @@ func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error { // runner sequence had a problems calling close errorCh <- err } - go closeRunner(ctx, false) + go terminate(ctx, false) }() return errors.Join(errorCh.Wait(ctx)...) } +// newCloserContext return instance of *closerContext +func newCloserContext(startCtx context.Context) *closerContext { + var erg, closerCtx = errgroup.WithContext(startCtx) + closerCtx, closerCancel := context.WithCancel(closerCtx) + return &closerContext{ + erg: erg, + cancel: closerCancel, + ctx: closerCtx, + } +} + +// closerContext a helper struct managing pipeline closers +type closerContext struct { + erg *errgroup.Group + ctx context.Context + cancel func() +} + +// start starts closers functions and captures an error +func (c *closerContext) start(errorCh chan error, chanWriters *sync.WaitGroup, closers ...func(context.Context) error) { + for _, closer := range closers { + closer := closer + c.erg.Go(func() error { + return closer(c.ctx) + }) + } + + // closers go routine + go func() { + defer chanWriters.Done() + err := c.erg.Wait() + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + errorCh <- err + } + }() +} + // PipelineOptions holds a pipeline options type PipelineOptions struct { ErrorSignaler func(error)