Skip to content

Commit da2c7db

Browse files
authored
test: add more benchmark (#34)
1 parent 38eb5f6 commit da2c7db

File tree

17 files changed

+562
-191
lines changed

17 files changed

+562
-191
lines changed

pkg/loader/reconciler.go

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,29 @@ import (
77
"github.com/siyul-park/uniflow/pkg/storage"
88
)
99

10-
type (
11-
// ReconcilerConfig holds the configuration for the Reconciler.
12-
ReconcilerConfig struct {
13-
Storage *storage.Storage // Storage is the storage used by the Reconciler.
14-
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
15-
Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec.
16-
}
10+
// ReconcilerConfig holds the configuration for the Reconciler.
11+
type ReconcilerConfig struct {
12+
Storage *storage.Storage // Storage is the storage used by the Reconciler.
13+
Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table.
14+
Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec.
15+
}
1716

18-
// Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec.
19-
Reconciler struct {
20-
storage *storage.Storage
21-
loader *Loader
22-
filter *storage.Filter
23-
stream *storage.Stream
24-
done chan struct{}
25-
mu sync.Mutex
26-
}
27-
)
17+
// Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec.
18+
type Reconciler struct {
19+
storage *storage.Storage
20+
loader *Loader
21+
filter *storage.Filter
22+
stream *storage.Stream
23+
done chan struct{}
24+
mu sync.Mutex
25+
}
2826

2927
// NewReconciler creates a new Reconciler with the given configuration.
3028
func NewReconciler(config ReconcilerConfig) *Reconciler {
31-
storage := config.Storage
32-
loader := config.Loader
33-
filter := config.Filter
34-
3529
return &Reconciler{
36-
storage: storage,
37-
loader: loader,
38-
filter: filter,
30+
storage: config.Storage,
31+
loader: config.Loader,
32+
filter: config.Filter,
3933
done: make(chan struct{}),
4034
}
4135
}

pkg/node/error.go

Lines changed: 0 additions & 8 deletions
This file was deleted.

pkg/node/node.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@ import (
55
"github.com/siyul-park/uniflow/pkg/port"
66
)
77

8-
type (
9-
// Node is an operational unit that processes *packet.Packet.
10-
Node interface {
11-
ID() ulid.ULID
12-
Port(name string) (*port.Port, bool)
13-
Close() error
14-
}
8+
// Node is an operational unit that processes *packet.Packet.
9+
type Node interface {
10+
ID() ulid.ULID
11+
Port(name string) (*port.Port, bool)
12+
Close() error
13+
}
14+
15+
const (
16+
PortIO = "io"
17+
PortIn = "in"
18+
PortOut = "out"
19+
PortErr = "error"
1520
)

pkg/node/onetomany.go

Lines changed: 43 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,25 @@ import (
99
"github.com/siyul-park/uniflow/pkg/process"
1010
)
1111

12-
type (
13-
// OneToManyNodeConfig is a config for ActionNode.
14-
OneToManyNodeConfig struct {
15-
ID ulid.ULID
16-
Action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
17-
}
12+
// OneToManyNodeConfig holds the configuration for OneToManyNode.
13+
type OneToManyNodeConfig struct {
14+
ID ulid.ULID
15+
Action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
16+
}
1817

19-
// OneToManyNode provide process *packet.Packet one source and many distance.
20-
OneToManyNode struct {
21-
id ulid.ULID
22-
action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
23-
inPort *port.Port
24-
outPorts []*port.Port
25-
errPort *port.Port
26-
mu sync.RWMutex
27-
}
28-
)
18+
// OneToManyNode represents a node that processes *packet.Packet with one input and many outputs.
19+
type OneToManyNode struct {
20+
id ulid.ULID
21+
action func(*process.Process, *packet.Packet) ([]*packet.Packet, *packet.Packet)
22+
inPort *port.Port
23+
outPorts []*port.Port
24+
errPort *port.Port
25+
mu sync.RWMutex
26+
}
2927

3028
var _ Node = (*OneToManyNode)(nil)
3129

32-
// NewOneToManyNode returns a new OneToManyNode.
30+
// NewOneToManyNode creates a new OneToManyNode with the given configuration.
3331
func NewOneToManyNode(config OneToManyNodeConfig) *OneToManyNode {
3432
id := config.ID
3533
action := config.Action
@@ -64,13 +62,15 @@ func NewOneToManyNode(config OneToManyNodeConfig) *OneToManyNode {
6462
return n
6563
}
6664

65+
// ID returns the ID of the OneToManyNode.
6766
func (n *OneToManyNode) ID() ulid.ULID {
6867
n.mu.RLock()
6968
defer n.mu.RUnlock()
7069

7170
return n.id
7271
}
7372

73+
// Port returns the specified port of the OneToManyNode.
7474
func (n *OneToManyNode) Port(name string) (*port.Port, bool) {
7575
n.mu.Lock()
7676
defer n.mu.Unlock()
@@ -81,30 +81,30 @@ func (n *OneToManyNode) Port(name string) (*port.Port, bool) {
8181
case PortErr:
8282
return n.errPort, true
8383
default:
84-
}
85-
86-
if i, ok := port.GetIndex(PortOut, name); ok {
87-
for j := 0; j <= i; j++ {
88-
if len(n.outPorts) <= j {
89-
outPort := port.New()
90-
outPort.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
91-
n.mu.RLock()
92-
defer n.mu.RUnlock()
93-
94-
outStream := outPort.Open(proc)
95-
96-
n.backward(proc, outStream)
97-
}))
98-
n.outPorts = append(n.outPorts, outPort)
84+
if i, ok := port.GetIndex(PortOut, name); ok {
85+
for j := 0; j <= i; j++ {
86+
if len(n.outPorts) <= j {
87+
outPort := port.New()
88+
outPort.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
89+
n.mu.RLock()
90+
defer n.mu.RUnlock()
91+
92+
outStream := outPort.Open(proc)
93+
94+
n.backward(proc, outStream)
95+
}))
96+
n.outPorts = append(n.outPorts, outPort)
97+
}
9998
}
100-
}
10199

102-
return n.outPorts[i], true
100+
return n.outPorts[i], true
101+
}
103102
}
104103

105104
return nil, false
106105
}
107106

107+
// Close closes all ports of the OneToManyNode.
108108
func (n *OneToManyNode) Close() error {
109109
n.mu.Lock()
110110
defer n.mu.Unlock()
@@ -147,30 +147,21 @@ func (n *OneToManyNode) forward(proc *process.Process) {
147147
inStream.Send(errPck)
148148
}
149149
} else if len(outPcks) > 0 && len(outStreams) > 0 {
150-
var ok bool
151150
for i, outPck := range outPcks {
152-
if len(outStreams) <= i {
153-
break
154-
}
155-
if outPck == nil {
151+
if len(outStreams) <= i || outPck == nil {
156152
continue
157153
}
158154
outStream := outStreams[i]
159-
160-
if outStream.Links() > 0 {
161-
if outPck == inPck {
162-
outPck = packet.New(outPck.Payload())
163-
}
164-
proc.Stack().Link(inPck.ID(), outPck.ID())
165-
proc.Stack().Push(outPck.ID(), inStream.ID())
166-
outStream.Send(outPck)
167-
168-
ok = true
155+
if outStream.Links() == 0 {
156+
continue
169157
}
170-
}
171158

172-
if !ok {
173-
proc.Stack().Clear(inPck.ID())
159+
if outPck == inPck {
160+
outPck = packet.New(outPck.Payload())
161+
}
162+
proc.Stack().Link(inPck.ID(), outPck.ID())
163+
proc.Stack().Push(outPck.ID(), inStream.ID())
164+
outStream.Send(outPck)
174165
}
175166
} else {
176167
proc.Stack().Clear(inPck.ID())

pkg/node/onetomany_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,36 @@ func TestOneToManyNode_Send(t *testing.T) {
130130
}
131131
})
132132
}
133+
134+
func BenchmarkOneToManyNode_Send(b *testing.B) {
135+
n := NewOneToManyNode(OneToManyNodeConfig{
136+
Action: func(_ *process.Process, inPck *packet.Packet) ([]*packet.Packet, *packet.Packet) {
137+
return []*packet.Packet{inPck}, nil
138+
},
139+
})
140+
defer func() { _ = n.Close() }()
141+
142+
in := port.New()
143+
inPort, _ := n.Port(PortIn)
144+
inPort.Link(in)
145+
146+
out := port.New()
147+
outPort, _ := n.Port(port.SetIndex(PortOut, 0))
148+
outPort.Link(out)
149+
150+
proc := process.New()
151+
defer proc.Exit(nil)
152+
153+
inStream := in.Open(proc)
154+
outStream := out.Open(proc)
155+
156+
inPayload := primitive.NewString(faker.UUIDHyphenated())
157+
inPck := packet.New(inPayload)
158+
159+
b.ResetTimer()
160+
161+
for i := 0; i < b.N; i++ {
162+
inStream.Send(inPck)
163+
<-outStream.Receive()
164+
}
165+
}

pkg/node/onetoone.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,26 @@ import (
99
"github.com/siyul-park/uniflow/pkg/process"
1010
)
1111

12-
type (
13-
// OneToOneNodeConfig is a config for ActionNode.
14-
OneToOneNodeConfig struct {
15-
ID ulid.ULID
16-
Action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
17-
}
12+
// OneToOneNodeConfig holds the configuration for OneToOneNode.
13+
type OneToOneNodeConfig struct {
14+
ID ulid.ULID
15+
Action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
16+
}
1817

19-
// OneToOneNode provide process *packet.Packet one source and onde distance.
20-
OneToOneNode struct {
21-
id ulid.ULID
22-
action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
23-
ioPort *port.Port
24-
inPort *port.Port
25-
outPort *port.Port
26-
errPort *port.Port
27-
mu sync.RWMutex
28-
}
29-
)
18+
// OneToOneNode represents a node that processes *packet.Packet with one input and one output.
19+
type OneToOneNode struct {
20+
id ulid.ULID
21+
action func(*process.Process, *packet.Packet) (*packet.Packet, *packet.Packet)
22+
ioPort *port.Port
23+
inPort *port.Port
24+
outPort *port.Port
25+
errPort *port.Port
26+
mu sync.RWMutex
27+
}
3028

3129
var _ Node = (*OneToOneNode)(nil)
3230

33-
// NewOneToOneNode returns a new OneToOneNode.
31+
// NewOneToOneNode creates a new OneToOneNode with the given configuration.
3432
func NewOneToOneNode(config OneToOneNodeConfig) *OneToOneNode {
3533
id := config.ID
3634
action := config.Action
@@ -90,13 +88,15 @@ func NewOneToOneNode(config OneToOneNodeConfig) *OneToOneNode {
9088
return n
9189
}
9290

91+
// ID returns the ID of the OneToOneNode.
9392
func (n *OneToOneNode) ID() ulid.ULID {
9493
n.mu.RLock()
9594
defer n.mu.RUnlock()
9695

9796
return n.id
9897
}
9998

99+
// Port returns the specified port of the OneToOneNode.
100100
func (n *OneToOneNode) Port(name string) (*port.Port, bool) {
101101
n.mu.RLock()
102102
defer n.mu.RUnlock()
@@ -116,6 +116,7 @@ func (n *OneToOneNode) Port(name string) (*port.Port, bool) {
116116
return nil, false
117117
}
118118

119+
// Close closes all ports of the OneToOneNode.
119120
func (n *OneToOneNode) Close() error {
120121
n.mu.Lock()
121122
defer n.mu.Unlock()
@@ -128,7 +129,7 @@ func (n *OneToOneNode) Close() error {
128129
return nil
129130
}
130131

131-
func (n *OneToOneNode) forward(proc *process.Process, inStream *port.Stream, outStream *port.Stream) {
132+
func (n *OneToOneNode) forward(proc *process.Process, inStream, outStream *port.Stream) {
132133
n.mu.RLock()
133134
defer n.mu.RUnlock()
134135

0 commit comments

Comments
 (0)