Skip to content

Commit

Permalink
Merge pull request #3 from getoutreach/refactoring
Browse files Browse the repository at this point in the history
fix: refactoring
  • Loading branch information
pavelsmejkal authored Jun 13, 2024
2 parents 31b2950 + 5efd019 commit dd8fb37
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 43 deletions.
8 changes: 8 additions & 0 deletions closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func (o *Options) Apply(oo ...Option) *Options {
return o
}

// closeContext returns ready made close context with or without timeout
func (o *Options) closeContext(ctx context.Context) (context.Context, context.CancelFunc) {
if o.CloseTimeout > 0 {
return context.WithTimeout(ctx, o.CloseTimeout)
}
return context.WithCancel(ctx)
}

// Closer registers new closer
func (o *Options) Closer(closer func(context.Context) error) *Options {
o.closers = append(o.closers, closer)
Expand Down
104 changes: 61 additions & 43 deletions orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,45 +444,40 @@ func (r *SerialPipeline) With(oo ...PipelineOption) *SerialPipeline {
// Start will execute given runner with optional configuration
func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error {
var (
options = &Options{}
closerCancel func()
options = &Options{}
)
startCtx, startCancel := context.WithCancelCause(ctx)
defer startCancel(nil)

var erg, closerCtx = errgroup.WithContext(startCtx)
closerCtx, closerCancel = context.WithCancel(closerCtx)
defer closerCancel()
closers := newCloserContext(startCtx)
defer closers.cancel()

var (
errorCh = make(ErrorCh, 3)
closeCh = make(chan struct{}, 1)
once sync.Once
closerGroup sync.WaitGroup
closeRunner = func(ctx context.Context, initiateClose bool) {
errorCh = make(ErrorCh, 3)
closeCh = make(chan struct{}, 1)
once sync.Once
chanWriters sync.WaitGroup
closeChannels = func() {
closers.cancel()
close(closeCh)
chanWriters.Wait()
// We can really terminate since all channel writers are done
close(errorCh)
}
terminate = func(ctx context.Context, initiateClose bool) {
once.Do(func() {
if !initiateClose {
closerCancel()
closerGroup.Wait()
// We can really terminate since all channel writers are done
close(errorCh)
closeChannels()
return
}
closeCtx, closeCancel := context.WithCancel(ctx)
closeCtx, closeCancel := options.closeContext(ctx)
defer closeCancel()
if options.CloseTimeout > 0 {
closeCtx, closeCancel = context.WithTimeout(closeCtx, options.CloseTimeout)
defer closeCancel()
}
// go routine handling close async so it can be canceled
go func() {
err := runner.Close(closeCtx)
startCancel(closeCtx.Err())
errorCh <- err
closerCancel()
close(closeCh)
closerGroup.Wait()
// We can really terminate since all channel writers are done
close(errorCh)
closeChannels()
}()
select {
// Wait for close to finish
Expand All @@ -499,47 +494,70 @@ func Start(ctx context.Context, runner RunnerCloser, opts ...Option) error {
})
}
)
closerGroup.Add(2)
chanWriters.Add(2)

options.close = func() {
go func() {
closeRunner(ctx, true)
terminate(ctx, true)
}()
}
options.Apply(opts...)

for _, closer := range options.closers {
closer := closer
erg.Go(func() error {
return closer(closerCtx)
})
}

// closers go routine
go func() {
defer closerGroup.Done()
err := erg.Wait()
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
errorCh <- err
}
}()
closers.start(errorCh, &chanWriters, options.closers...)

// runner go routine
go func() {
defer closerGroup.Done()
defer chanWriters.Done()
err := runner.Run(startCtx, func() {
// We might expose ready callback later via pipeline options
})
if err != nil {
// runner sequence had a problems calling close
errorCh <- err
}
go closeRunner(ctx, false)
go terminate(ctx, false)
}()

return errors.Join(errorCh.Wait(ctx)...)
}

// newCloserContext return instance of *closerContext
func newCloserContext(startCtx context.Context) *closerContext {
var erg, closerCtx = errgroup.WithContext(startCtx)
closerCtx, closerCancel := context.WithCancel(closerCtx)
return &closerContext{
erg: erg,
cancel: closerCancel,
ctx: closerCtx,
}
}

// closerContext a helper struct managing pipeline closers
type closerContext struct {
erg *errgroup.Group
ctx context.Context
cancel func()
}

// start starts closers functions and captures an error
func (c *closerContext) start(errorCh chan error, chanWriters *sync.WaitGroup, closers ...func(context.Context) error) {
for _, closer := range closers {
closer := closer
c.erg.Go(func() error {
return closer(c.ctx)
})
}

// closers go routine
go func() {
defer chanWriters.Done()
err := c.erg.Wait()
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
errorCh <- err
}
}()
}

// PipelineOptions holds a pipeline options
type PipelineOptions struct {
ErrorSignaler func(error)
Expand Down

0 comments on commit dd8fb37

Please sign in to comment.