Skip to content

Commit 38b94b5

Browse files
committed
refactor: improve readable
1 parent c3bb2b0 commit 38b94b5

File tree

5 files changed

+76
-87
lines changed

5 files changed

+76
-87
lines changed

pkg/loader/reconciler.go

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,30 @@ import (
77
"github.com/siyul-park/uniflow/pkg/storage"
88
)
99

10-
type (
11-
// ReconcilerConfig holds the configuration for the Reconciler.
12-
ReconcilerConfig struct {
13-
Storage *storage.Storage // Storage is the storage used by the Reconciler.
14-
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
15-
Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec.
16-
}
10+
11+
// ReconcilerConfig holds the configuration for the Reconciler.
12+
type ReconcilerConfig struct {
13+
Storage *storage.Storage // Storage is the storage used by the Reconciler.
14+
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
15+
Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec.
16+
}
1717

18-
// Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec.
19-
Reconciler struct {
20-
storage *storage.Storage
21-
loader *Loader
22-
filter *storage.Filter
23-
stream *storage.Stream
24-
done chan struct{}
25-
mu sync.Mutex
26-
}
27-
)
18+
// Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec.
19+
type Reconciler struct {
20+
storage *storage.Storage
21+
loader *Loader
22+
filter *storage.Filter
23+
stream *storage.Stream
24+
done chan struct{}
25+
mu sync.Mutex
26+
}
2827

2928
// NewReconciler creates a new Reconciler with the given configuration.
3029
func NewReconciler(config ReconcilerConfig) *Reconciler {
31-
storage := config.Storage
32-
loader := config.Loader
33-
filter := config.Filter
34-
3530
return &Reconciler{
36-
storage: storage,
37-
loader: loader,
38-
filter: filter,
31+
storage: config.Storage,
32+
loader: config.Loader,
33+
filter: config.Filter,
3934
done: make(chan struct{}),
4035
}
4136
}

pkg/node/node.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@ type Node interface {
1111
Port(name string) (*port.Port, bool)
1212
Close() error
1313
}
14+
15+
const (
16+
PortIO = "io"
17+
PortIn = "in"
18+
PortOut = "out"
19+
PortErr = "error"
20+
)

pkg/node/onetomany.go

Lines changed: 43 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,25 @@ import (
99
"github.com/siyul-park/uniflow/pkg/process"
1010
)
1111

12-
// OneToManyNodeConfig is a config for ActionNode.
13-
type OneToManyNodeConfig struct {
14-
ID ulid.ULID
15-
Action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
16-
}
17-
18-
// OneToManyNode provide process *packet.Packet one source and many distance.
19-
type OneToManyNode struct {
20-
id ulid.ULID
21-
action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
22-
inPort *port.Port
23-
outPorts []*port.Port
24-
errPort *port.Port
25-
mu sync.RWMutex
26-
}
12+
// OneToManyNodeConfig holds the configuration for OneToManyNode.
13+
type OneToManyNodeConfig struct {
14+
ID ulid.ULID
15+
Action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
16+
}
2717

18+
// OneToManyNode represents a node that processes *packet.Packet with one input and many outputs.
19+
type OneToManyNode struct {
20+
id ulid.ULID
21+
action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
22+
inPort *port.Port
23+
outPorts []*port.Port
24+
errPort *port.Port
25+
mu sync.RWMutex
26+
}
2827

2928
var _ Node = (*OneToManyNode)(nil)
3029

31-
// NewOneToManyNode returns a new OneToManyNode.
30+
// NewOneToManyNode creates a new OneToManyNode with the given configuration.
3231
func NewOneToManyNode(config OneToManyNodeConfig) *OneToManyNode {
3332
id := config.ID
3433
action := config.Action
@@ -63,13 +62,15 @@ func NewOneToManyNode(config OneToManyNodeConfig) *OneToManyNode {
6362
return n
6463
}
6564

65+
// ID returns the ID of the OneToManyNode.
6666
func (n *OneToManyNode) ID() ulid.ULID {
6767
n.mu.RLock()
6868
defer n.mu.RUnlock()
6969

7070
return n.id
7171
}
7272

73+
// Port returns the specified port of the OneToManyNode.
7374
func (n *OneToManyNode) Port(name string) (*port.Port, bool) {
7475
n.mu.Lock()
7576
defer n.mu.Unlock()
@@ -80,30 +81,30 @@ func (n *OneToManyNode) Port(name string) (*port.Port, bool) {
8081
case PortErr:
8182
return n.errPort, true
8283
default:
83-
}
84-
85-
if i, ok := port.GetIndex(PortOut, name); ok {
86-
for j := 0; j <= i; j++ {
87-
if len(n.outPorts) <= j {
88-
outPort := port.New()
89-
outPort.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
90-
n.mu.RLock()
91-
defer n.mu.RUnlock()
92-
93-
outStream := outPort.Open(proc)
94-
95-
n.backward(proc, outStream)
96-
}))
97-
n.outPorts = append(n.outPorts, outPort)
84+
if i, ok := port.GetIndex(PortOut, name); ok {
85+
for j := 0; j <= i; j++ {
86+
if len(n.outPorts) <= j {
87+
outPort := port.New()
88+
outPort.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
89+
n.mu.RLock()
90+
defer n.mu.RUnlock()
91+
92+
outStream := outPort.Open(proc)
93+
94+
n.backward(proc, outStream)
95+
}))
96+
n.outPorts = append(n.outPorts, outPort)
97+
}
9898
}
99-
}
10099

101-
return n.outPorts[i], true
100+
return n.outPorts[i], true
101+
}
102102
}
103103

104104
return nil, false
105105
}
106106

107+
// Close closes all ports of the OneToManyNode.
107108
func (n *OneToManyNode) Close() error {
108109
n.mu.Lock()
109110
defer n.mu.Unlock()
@@ -146,30 +147,21 @@ func (n *OneToManyNode) forward(proc *process.Process) {
146147
inStream.Send(errPck)
147148
}
148149
} else if len(outPcks) > 0 && len(outStreams) > 0 {
149-
var ok bool
150150
for i, outPck := range outPcks {
151-
if len(outStreams) <= i {
152-
break
153-
}
154-
if outPck == nil {
151+
if len(outStreams) <= i || outPck == nil {
155152
continue
156153
}
157154
outStream := outStreams[i]
158-
159-
if outStream.Links() > 0 {
160-
if outPck == inPck {
161-
outPck = packet.New(outPck.Payload())
162-
}
163-
proc.Stack().Link(inPck.ID(), outPck.ID())
164-
proc.Stack().Push(outPck.ID(), inStream.ID())
165-
outStream.Send(outPck)
166-
167-
ok = true
155+
if outStream.Links() == 0 {
156+
continue
168157
}
169-
}
170158

171-
if !ok {
172-
proc.Stack().Clear(inPck.ID())
159+
if outPck == inPck {
160+
outPck = packet.New(outPck.Payload())
161+
}
162+
proc.Stack().Link(inPck.ID(), outPck.ID())
163+
proc.Stack().Push(outPck.ID(), inStream.ID())
164+
outStream.Send(outPck)
173165
}
174166
} else {
175167
proc.Stack().Clear(inPck.ID())

pkg/node/onetoone.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
"github.com/siyul-park/uniflow/pkg/process"
1010
)
1111

12-
// OneToOneNodeConfig is a config for ActionNode.
12+
// OneToOneNodeConfig holds the configuration for OneToOneNode.
1313
type OneToOneNodeConfig struct {
1414
ID ulid.ULID
1515
Action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
1616
}
1717

18-
// OneToOneNode provide process *packet.Packet one source and onde distance.
18+
// OneToOneNode represents a node that processes *packet.Packet with one input and one output.
1919
type OneToOneNode struct {
2020
id ulid.ULID
2121
action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
@@ -28,7 +28,7 @@ type OneToOneNode struct {
2828

2929
var _ Node = (*OneToOneNode)(nil)
3030

31-
// NewOneToOneNode returns a new OneToOneNode.
31+
// NewOneToOneNode creates a new OneToOneNode with the given configuration.
3232
func NewOneToOneNode(config OneToOneNodeConfig) *OneToOneNode {
3333
id := config.ID
3434
action := config.Action
@@ -88,13 +88,15 @@ func NewOneToOneNode(config OneToOneNodeConfig) *OneToOneNode {
8888
return n
8989
}
9090

91+
// ID returns the ID of the OneToOneNode.
9192
func (n *OneToOneNode) ID() ulid.ULID {
9293
n.mu.RLock()
9394
defer n.mu.RUnlock()
9495

9596
return n.id
9697
}
9798

99+
// Port returns the specified port of the OneToOneNode.
98100
func (n *OneToOneNode) Port(name string) (*port.Port, bool) {
99101
n.mu.RLock()
100102
defer n.mu.RUnlock()
@@ -114,6 +116,7 @@ func (n *OneToOneNode) Port(name string) (*port.Port, bool) {
114116
return nil, false
115117
}
116118

119+
// Close closes all ports of the OneToOneNode.
117120
func (n *OneToOneNode) Close() error {
118121
n.mu.Lock()
119122
defer n.mu.Unlock()
@@ -126,7 +129,7 @@ func (n *OneToOneNode) Close() error {
126129
return nil
127130
}
128131

129-
func (n *OneToOneNode) forward(proc *process.Process, inStream *port.Stream, outStream *port.Stream) {
132+
func (n *OneToOneNode) forward(proc *process.Process, inStream, outStream *port.Stream) {
130133
n.mu.RLock()
131134
defer n.mu.RUnlock()
132135

pkg/node/ports.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

0 commit comments

Comments
 (0)