-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.go
133 lines (109 loc) · 3.1 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package pipeline
import (
"context"
"errors"
"fmt"
"io"
"slices"
"strconv"
"sync"
"github.com/hedisam/pipeline/chans"
"github.com/hedisam/pipeline/stage"
)
// Source defines the methods required for a pipeline input source.
type Source interface {
Next(ctx context.Context) (any, error)
}
// Sink defines the interface for a pipeline sink.
type Sink func(ctx context.Context, out any) error
// Pipeline is the pipeline orchestrator.
type Pipeline struct {
sources []Source
sink Sink
}
// Option defines a function that can be used to config Pipeline with more options
type Option func(p *Pipeline)
// WithSources can be used to provide more sources. It should be used when you have more than one pipeline source.
func WithSources(sources ...Source) Option {
return func(p *Pipeline) {
if len(sources) > 0 {
p.sources = append(p.sources, sources...)
}
}
}
// NewPipeline returns a new instance of Pipeline.
func NewPipeline(src Source, sink Sink, options ...Option) *Pipeline {
p := &Pipeline{
sources: []Source{src},
sink: sink,
}
for o := range slices.Values(options) {
o(p)
}
return p
}
// Run runs the pipeline for the provided stages.
// If no stage is provided, it will pass the data directly form the source(s) to the sink.
// Run blocks until either an error is occurred or all sources, stages and the sink have terminated.
func (p *Pipeline) Run(ctx context.Context, stages ...stage.Runner) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
inputChans := make([]<-chan any, len(stages)+1) // +1 for the sink input
errorChans := make([]<-chan error, 0, len(stages)+2) // +2 for source and sink
srcOutput, srcErrChan := p.startSources(ctx)
inputChans[0] = srcOutput
errorChans = append(errorChans, srcErrChan)
for i, stageRunner := range stages {
outCh, errCh := stageRunner(ctx, strconv.Itoa(i), inputChans[i])
inputChans[i+1] = outCh
errorChans = append(errorChans, errCh)
}
sinkErrCh := p.startSink(ctx, inputChans[len(inputChans)-1])
errorChans = append(errorChans, sinkErrCh)
err, _ := chans.ReceiveOrDone(ctx, chans.FanIn(ctx, errorChans...))
return err
}
func (p *Pipeline) startSources(ctx context.Context) (<-chan any, <-chan error) {
sourcesWG := &sync.WaitGroup{}
outCh := make(chan any)
errCh := make(chan error)
for i, src := range p.sources {
sourcesWG.Add(1)
go func() {
defer sourcesWG.Done()
for {
payload, err := src.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
_ = chans.SendOrDone(ctx, errCh, fmt.Errorf("next payload from source %d: %w", i, err))
}
return
}
ok := chans.SendOrDone(ctx, outCh, payload)
if !ok {
return
}
}
}()
}
go func() {
sourcesWG.Wait()
close(outCh)
close(errCh)
}()
return outCh, errCh
}
func (p *Pipeline) startSink(ctx context.Context, in <-chan any) <-chan error {
errCh := make(chan error)
go func() {
defer close(errCh)
for payload := range chans.ReceiveOrDoneSeq(ctx, in) {
err := p.sink(ctx, payload)
if err != nil {
_ = chans.SendOrDone(ctx, errCh, fmt.Errorf("sink: %w", err))
return
}
}
}()
return errCh
}