diff --git a/cmd/go.mod b/cmd/go.mod index 79add477..d9ded7e4 100644 --- a/cmd/go.mod +++ b/cmd/go.mod @@ -19,6 +19,7 @@ require ( ) require ( + cel.dev/expr v0.18.0 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/charmbracelet/lipgloss v1.0.0 // indirect @@ -34,7 +35,7 @@ require ( github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/benbjohnson/immutable v0.4.3 // indirect github.com/charmbracelet/bubbles v0.20.0 - github.com/charmbracelet/bubbletea v1.1.2 + github.com/charmbracelet/bubbletea v1.2.0 github.com/charmbracelet/x/ansi v0.4.5 // indirect github.com/charmbracelet/x/term v0.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -45,7 +46,7 @@ require ( github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/cel-go v0.21.0 // indirect + github.com/google/cel-go v0.22.0 // indirect github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/cmd/go.sum b/cmd/go.sum index c2f7429d..aba8907e 100644 --- a/cmd/go.sum +++ b/cmd/go.sum @@ -1,3 +1,5 @@ +cel.dev/expr v0.18.0 h1:CJ6drgk+Hf96lkLikr4rFf19WrU0BOWEihyZnI2TAzo= +cel.dev/expr v0.18.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= @@ -16,8 +18,8 @@ github.com/benbjohnson/immutable v0.4.3 h1:GYHcksoJ9K6HyAUpGxwZURrbTkXA0Dh4otXGq github.com/benbjohnson/immutable v0.4.3/go.mod h1:qJIKKSmdqz1tVzNtst1DZzvaqOU1onk1rc03IeM3Owk= github.com/charmbracelet/bubbles v0.20.0 h1:jSZu6qD8cRQ6k9OMfR1WlM+ruM8fkPWkHvQWD9LIutE= github.com/charmbracelet/bubbles v0.20.0/go.mod h1:39slydyswPy+uVOHZ5x/GjwVAFkCsV8IIVy+4MhzwwU= -github.com/charmbracelet/bubbletea v1.1.2 h1:naQXF2laRxyLyil/i7fxdpiz1/k06IKquhm4vBfHsIc= -github.com/charmbracelet/bubbletea v1.1.2/go.mod h1:9HIU/hBV24qKjlehyj8z1r/tR9TYTQEag+cWZnuXo8E= +github.com/charmbracelet/bubbletea v1.2.0 h1:WYHclJaFDOz4dPxiGx7owwb8P4000lYPcuXPIALS5Z8= +github.com/charmbracelet/bubbletea v1.2.0/go.mod h1:viLoDL7hG4njLJSKU2gw7kB3LSEmWsrM80rO1dBJWBI= github.com/charmbracelet/lipgloss v1.0.0 h1:O7VkGDvqEdGi93X+DeqsQ7PKHDgtQfF8j8/O2qFMQNg= github.com/charmbracelet/lipgloss v1.0.0/go.mod h1:U5fy9Z+C38obMs+T+tJqst9VGzlOYGj4ri9reL3qUlo= github.com/charmbracelet/x/ansi v0.4.5 h1:LqK4vwBNaXw2AyGIICa5/29Sbdq58GbGdFngSexTdRM= @@ -53,8 +55,8 @@ github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI= -github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc= +github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= +github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 h1:sAGdeJj0bnMgUNVeUpp6AYlVdCt3/GdI3pGRqsNSQLs= diff --git a/cmd/pkg/cli/start_test.go b/cmd/pkg/cli/start_test.go index f9e9f3d1..aea55e66 100644 --- a/cmd/pkg/cli/start_test.go +++ b/cmd/pkg/cli/start_test.go @@ -42,7 +42,7 @@ func TestStartCommand_Execute(t *testing.T) { s.AddCodec(kind, codec) t.Run("NoFlag", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() meta := &spec.Meta{ @@ -87,7 +87,7 @@ func TestStartCommand_Execute(t *testing.T) { }) t.Run(flagDebug, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() meta := &spec.Meta{ @@ -134,7 +134,7 @@ func TestStartCommand_Execute(t *testing.T) { }) t.Run(flagFromSpecs, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() filename := "specs.json" @@ -181,7 +181,7 @@ func TestStartCommand_Execute(t *testing.T) { }) t.Run(flagFromSecrets, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() filename := "secrets.json" @@ -228,7 +228,7 @@ func TestStartCommand_Execute(t *testing.T) { }) t.Run(flagFromCharts, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() filename := "charts.json" diff --git a/examples/helloworld.yaml b/examples/helloworld.yaml index f45ad1f9..427f7369 100644 --- a/examples/helloworld.yaml +++ b/examples/helloworld.yaml @@ -1,10 +1,3 @@ -- kind: nop - name: init - ports: - init: - - name: hello_world - port: in - - kind: block name: hello_world specs: @@ -13,3 +6,7 @@ code: Hello, World! - kind: print filename: /dev/stdout + ports: + init: + - name: hello_world + port: in diff --git a/examples/httpproxy.yaml b/examples/httpproxy.yaml index 72ce1af2..ce24c70f 100644 --- a/examples/httpproxy.yaml +++ b/examples/httpproxy.yaml @@ -4,11 +4,12 @@ port: '{{ .PORT }}' ports: out: - - name: router + - name: proxy port: in env: PORT: data: '{{ .PORT }}' + - kind: proxy name: proxy urls: [https://www.google.com/] diff --git a/ext/README.md b/ext/README.md index 90166234..5addcad7 100644 --- a/ext/README.md +++ b/ext/README.md @@ -8,13 +8,13 @@ Built-in extensions enable efficient processing of various tasks and maximize sy Precisely manage data flow. -- **[Call Node](./docs/call_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows. - **[Block Node](./docs/block_node.md)**: Systematically manages complex data processing flows and executes multiple sub-nodes sequentially. - **[Fork Node](./docs/fork_node.md)**: Asynchronously branches data flows to perform independent tasks in parallel. - **[If Node](./docs/if_node.md)**: Evaluates conditions to split packets into two paths. - **[Loop Node](./docs/loop_node.md)**: Divides input packets into multiple sub-packets for repeated processing. - **[Merge Node](./docs/merge_node.md)**: Combines multiple input packets into one. - **[NOP Node](./docs/nop_node.md)**: Responds to input packets with an empty packet, without any processing. +- **[Pipe Node](./docs/pipe_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows. - **[Reduce Node](./docs/reduce_node.md)**: Repeatedly processes input data to produce a single output value, useful for data aggregation. - **[Session Node](./docs/session_node.md)**: Stores and manages process information, maintaining session continuity. - **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets. diff --git a/ext/README_kr.md b/ext/README_kr.md index a670f16d..5db094d2 100644 --- a/ext/README_kr.md +++ b/ext/README_kr.md @@ -8,13 +8,13 @@ 데이터 흐름을 정밀하게 제어합니다. -- **[Call 노드](./docs/call_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다. - **[Block 노드](./docs/block_node_kr.md)**: 복잡한 데이터 처리 흐름을 체계적으로 관리하며, 여러 하위 노드를 순차적으로 실행합니다. - **[Fork 노드](./docs/fork_node_kr.md)**: 데이터 흐름을 비동기적으로 분기하여 독립적인 작업을 병렬로 수행합니다. - **[If 노드](./docs/if_node_kr.md)**: 조건을 평가하여 패킷을 두 경로로 분기합니다. - **[Loop 노드](./docs/loop_node_kr.md)**: 입력 패킷을 여러 하위 패킷으로 나누어 반복 처리합니다. - **[Merge 노드](./docs/merge_node_kr.md)**: 여러 입력 패킷을 하나로 통합합니다. - **[NOP 노드](./docs/nop_node_kr.md)**: 입력 패킷을 처리하지 않고 빈 패킷으로 응답합니다. +- **[Pipe 노드](./docs/pipe_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다. - **[Reduce 노드](./docs/reduce_node_kr.md)**: 입력 데이터를 반복적으로 연산하여 하나의 출력 값을 생성합니다. 데이터 집계에 유용합니다. - **[Session 노드](./docs/session_node_kr.md)**: 프로세스 정보를 저장하고 관리하여 세션을 유지합니다. - **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다. diff --git a/ext/docs/call_node.md b/ext/docs/pipe_node.md similarity index 88% rename from ext/docs/call_node.md rename to ext/docs/pipe_node.md index 5a461065..d905dabc 100644 --- a/ext/docs/call_node.md +++ b/ext/docs/pipe_node.md @@ -1,6 +1,6 @@ -# Call Node +# Pipe Node -**The Call Node** processes the input packet and delivers the result to multiple output ports. This node allows for the reuse of data processing flows and is useful for modularizing complex tasks. +**The Pipe Node** processes the input packet and delivers the result to multiple output ports. This node allows for the reuse of data processing flows and is useful for modularizing complex tasks. ## Specification @@ -16,7 +16,7 @@ ## Example ```yaml -- kind: call +- kind: pipe ports: out[0]: - name: origin diff --git a/ext/docs/call_node_kr.md b/ext/docs/pipe_node_kr.md similarity index 88% rename from ext/docs/call_node_kr.md rename to ext/docs/pipe_node_kr.md index a156a01c..694b4fae 100644 --- a/ext/docs/call_node_kr.md +++ b/ext/docs/pipe_node_kr.md @@ -1,6 +1,6 @@ -# Call 노드 +# Pipe 노드 -**Call 노드**는 입력된 패킷을 처리하고 그 결과를 여러 출력 포트로 전달하는 역할을 합니다. 이 노드는 데이터 처리 흐름을 호출하여 재사용할 수 있으며, 복잡한 작업을 모듈화하는 데 유용합니다. +**Pipe 노드**는 입력된 패킷을 처리하고 그 결과를 여러 출력 포트로 전달하는 역할을 합니다. 이 노드는 데이터 처리 흐름을 호출하여 재사용할 수 있으며, 복잡한 작업을 모듈화하는 데 유용합니다. ## 명세 @@ -16,7 +16,7 @@ ## 예시 ```yaml -- kind: call +- kind: pipe ports: out[0]: - name: origin diff --git a/ext/go.mod b/ext/go.mod index f19428b3..ceb8a354 100644 --- a/ext/go.mod +++ b/ext/go.mod @@ -8,7 +8,7 @@ require ( github.com/evanw/esbuild v0.24.0 github.com/go-faker/faker/v4 v4.5.0 github.com/gofrs/uuid v4.4.0+incompatible - github.com/google/cel-go v0.21.0 + github.com/google/cel-go v0.22.0 github.com/gorilla/websocket v1.5.3 github.com/iancoleman/strcase v0.3.0 github.com/jmoiron/sqlx v1.4.0 @@ -20,6 +20,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require cel.dev/expr v0.18.0 // indirect + require ( github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/benbjohnson/immutable v0.4.3 // indirect diff --git a/ext/go.sum b/ext/go.sum index 0f046725..391c6cf1 100644 --- a/ext/go.sum +++ b/ext/go.sum @@ -1,3 +1,5 @@ +cel.dev/expr v0.18.0 h1:CJ6drgk+Hf96lkLikr4rFf19WrU0BOWEihyZnI2TAzo= +cel.dev/expr v0.18.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= @@ -27,8 +29,8 @@ github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpv github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= -github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI= -github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc= +github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= +github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 h1:sAGdeJj0bnMgUNVeUpp6AYlVdCt3/GdI3pGRqsNSQLs= diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index 1d5d9282..261dc35b 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -43,8 +43,8 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { s.AddKnownType(KindBlock, &BlockNodeSpec{}) s.AddCodec(KindBlock, NewBlockNodeCodec(s)) - s.AddKnownType(KindCall, &CallNodeSpec{}) - s.AddCodec(KindCall, NewCallNodeCodec()) + s.AddKnownType(KindPipe, &PipeNodeSpec{}) + s.AddCodec(KindPipe, NewPipeNodeCodec()) s.AddKnownType(KindFork, &ForkNodeSpec{}) s.AddCodec(KindFork, NewForkNodeCodec()) diff --git a/ext/pkg/control/builder_test.go b/ext/pkg/control/builder_test.go index 578c4673..26b10d01 100644 --- a/ext/pkg/control/builder_test.go +++ b/ext/pkg/control/builder_test.go @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) { err := AddToScheme(m, text.Language).AddToScheme(s) assert.NoError(t, err) - tests := []string{KindCall, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch} + tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch} for _, tt := range tests { t.Run(tt, func(t *testing.T) { diff --git a/ext/pkg/control/call.go b/ext/pkg/control/pipe.go similarity index 74% rename from ext/pkg/control/call.go rename to ext/pkg/control/pipe.go index 46688298..ed0b84bc 100644 --- a/ext/pkg/control/call.go +++ b/ext/pkg/control/pipe.go @@ -10,33 +10,33 @@ import ( "github.com/siyul-park/uniflow/pkg/types" ) -// CallNodeSpec holds the specification for creating a CallNode. -type CallNodeSpec struct { +// PipeNodeSpec holds the specification for creating a PipeNode. +type PipeNodeSpec struct { spec.Meta `map:",inline"` } -// CallNode processes an input packet and sends the result to multiple output ports. -type CallNode struct { +// PipeNode processes an input packet and sends the result to multiple output ports. +type PipeNode struct { tracer *packet.Tracer inPort *port.InPort outPorts []*port.OutPort errPort *port.OutPort } -const KindCall = "call" +const KindPipe = "pipe" -var _ node.Node = (*CallNode)(nil) +var _ node.Node = (*PipeNode)(nil) -// NewCallNodeCodec creates a new codec for CallNodeSpec. -func NewCallNodeCodec() scheme.Codec { - return scheme.CodecWithType(func(spec *CallNodeSpec) (node.Node, error) { - return NewCallNode(), nil +// NewPipeNodeCodec creates a new codec for PipeNodeSpec. +func NewPipeNodeCodec() scheme.Codec { + return scheme.CodecWithType(func(spec *PipeNodeSpec) (node.Node, error) { + return NewPipeNode(), nil }) } -// NewCallNode creates a new CallNode. -func NewCallNode() *CallNode { - n := &CallNode{ +// NewPipeNode creates a new PipeNode. +func NewPipeNode() *PipeNode { + n := &PipeNode{ tracer: packet.NewTracer(), inPort: port.NewIn(), outPorts: []*port.OutPort{port.NewOut(), port.NewOut()}, @@ -52,7 +52,7 @@ func NewCallNode() *CallNode { } // In returns the input port with the specified name. -func (n *CallNode) In(name string) *port.InPort { +func (n *PipeNode) In(name string) *port.InPort { switch name { case node.PortIn: return n.inPort @@ -62,7 +62,7 @@ func (n *CallNode) In(name string) *port.InPort { } // Out returns the output port with the specified name. -func (n *CallNode) Out(name string) *port.OutPort { +func (n *PipeNode) Out(name string) *port.OutPort { switch name { case node.PortOut: return n.outPorts[0] @@ -80,7 +80,7 @@ func (n *CallNode) Out(name string) *port.OutPort { } // Close closes all ports associated with the node. -func (n *CallNode) Close() error { +func (n *PipeNode) Close() error { n.inPort.Close() for _, outPort := range n.outPorts { outPort.Close() @@ -90,7 +90,7 @@ func (n *CallNode) Close() error { return nil } -func (n *CallNode) forward(proc *process.Process) { +func (n *PipeNode) forward(proc *process.Process) { inReader := n.inPort.Open(proc) outWriter0 := n.outPorts[0].Open(proc) outWriter1 := n.outPorts[1].Open(proc) @@ -112,7 +112,7 @@ func (n *CallNode) forward(proc *process.Process) { } } -func (n *CallNode) backward(index int) port.Listener { +func (n *PipeNode) backward(index int) port.Listener { return port.ListenFunc(func(proc *process.Process) { outWriter := n.outPorts[index].Open(proc) @@ -122,7 +122,7 @@ func (n *CallNode) backward(index int) port.Listener { }) } -func (n *CallNode) catch(proc *process.Process) { +func (n *PipeNode) catch(proc *process.Process) { errWriter := n.errPort.Open(proc) for backPck := range errWriter.Receive() { diff --git a/ext/pkg/control/call_test.go b/ext/pkg/control/pipe_test.go similarity index 90% rename from ext/pkg/control/call_test.go rename to ext/pkg/control/pipe_test.go index 15454e72..a4bcbb18 100644 --- a/ext/pkg/control/call_test.go +++ b/ext/pkg/control/pipe_test.go @@ -15,10 +15,10 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCallNodeCodec_Decode(t *testing.T) { - codec := NewCallNodeCodec() +func TestPipeNodeCodec_Decode(t *testing.T) { + codec := NewPipeNodeCodec() - spec := &CallNodeSpec{} + spec := &PipeNodeSpec{} n, err := codec.Compile(spec) assert.NoError(t, err) @@ -26,14 +26,14 @@ func TestCallNodeCodec_Decode(t *testing.T) { assert.NoError(t, n.Close()) } -func TestNewCallNode(t *testing.T) { - n := NewCallNode() +func TestNewPipeNode(t *testing.T) { + n := NewPipeNode() assert.NotNil(t, n) assert.NoError(t, n.Close()) } -func TestCallNode_Port(t *testing.T) { - n := NewCallNode() +func TestPipeNode_Port(t *testing.T) { + n := NewPipeNode() defer n.Close() assert.NotNil(t, n.In(node.PortIn)) @@ -43,7 +43,7 @@ func TestCallNode_Port(t *testing.T) { assert.NotNil(t, n.Out(node.PortWithIndex(node.PortOut, 1))) } -func TestCallNode_SendAndReceive(t *testing.T) { +func TestPipeNode_SendAndReceive(t *testing.T) { t.Run("SingleInputToMultipleOutputs", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second) defer cancel() @@ -53,7 +53,7 @@ func TestCallNode_SendAndReceive(t *testing.T) { }) defer n1.Close() - n2 := NewCallNode() + n2 := NewPipeNode() defer n2.Close() n2.Out(node.PortWithIndex(node.PortOut, 0)).Link(n1.In(node.PortIn)) @@ -100,7 +100,7 @@ func TestCallNode_SendAndReceive(t *testing.T) { }) defer n1.Close() - n2 := NewCallNode() + n2 := NewPipeNode() defer n2.Close() n2.Out(node.PortWithIndex(node.PortOut, 0)).Link(n1.In(node.PortIn)) @@ -139,13 +139,13 @@ func TestCallNode_SendAndReceive(t *testing.T) { }) } -func BenchmarkCallNode_SendAndReceive(b *testing.B) { +func BenchmarkPipeNode_SendAndReceive(b *testing.B) { n1 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { return inPck, nil }) defer n1.Close() - n2 := NewCallNode() + n2 := NewPipeNode() defer n2.Close() n2.Out(node.PortWithIndex(node.PortOut, 0)).Link(n1.In(node.PortIn)) diff --git a/pkg/process/process_test.go b/pkg/process/process_test.go index 7a159ba6..9f5d7fb3 100644 --- a/pkg/process/process_test.go +++ b/pkg/process/process_test.go @@ -133,7 +133,7 @@ func TestProcess_Wait(t *testing.T) { child := proc.Fork() defer child.Exit(nil) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() done := make(chan struct{}) diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 304a81ad..bd131488 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -55,7 +55,7 @@ func TestRuntime_Load(t *testing.T) { func TestRuntime_Reconcile(t *testing.T) { t.Run("Spec", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() s := scheme.New() @@ -122,7 +122,7 @@ func TestRuntime_Reconcile(t *testing.T) { }) t.Run("Secret", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() s := scheme.New() @@ -202,7 +202,7 @@ func TestRuntime_Reconcile(t *testing.T) { }) t.Run("Chart", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() s := scheme.New() @@ -269,3 +269,177 @@ func TestRuntime_Reconcile(t *testing.T) { } }) } + +func BenchmarkRuntime_Reconcile(b *testing.B) { + b.Run("Spec", func(b *testing.B) { + ctx := context.TODO() + + s := scheme.New() + kind := faker.UUIDHyphenated() + + s.AddKnownType(kind, &spec.Meta{}) + s.AddCodec(kind, scheme.CodecFunc(func(spec spec.Spec) (node.Node, error) { + return node.NewOneToOneNode(nil), nil + })) + + specStore := spec.NewStore() + secretStore := secret.NewStore() + chartStore := chart.NewStore() + + h := hook.New() + symbols := make(chan *symbol.Symbol) + + h.AddLoadHook(symbol.LoadFunc(func(sb *symbol.Symbol) error { + symbols <- sb + return nil + })) + + r := New(Config{ + Scheme: s, + Hook: h, + SpecStore: specStore, + SecretStore: secretStore, + ChartStore: chartStore, + }) + defer r.Close() + + err := r.Watch(ctx) + assert.NoError(b, err) + + go r.Reconcile(ctx) + + for i := 0; i < b.N; i++ { + meta := &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: kind, + Namespace: resource.DefaultNamespace, + } + + specStore.Store(ctx, meta) + + select { + case <-symbols: + case <-ctx.Done(): + } + } + }) + + b.Run("Secret", func(b *testing.B) { + ctx := context.TODO() + + s := scheme.New() + kind := faker.UUIDHyphenated() + + s.AddKnownType(kind, &spec.Meta{}) + s.AddCodec(kind, scheme.CodecFunc(func(spec spec.Spec) (node.Node, error) { + return node.NewOneToOneNode(nil), nil + })) + + specStore := spec.NewStore() + secretStore := secret.NewStore() + chartStore := chart.NewStore() + + h := hook.New() + symbols := make(chan *symbol.Symbol) + + h.AddLoadHook(symbol.LoadFunc(func(sb *symbol.Symbol) error { + symbols <- sb + return nil + })) + + r := New(Config{ + Scheme: s, + Hook: h, + SpecStore: specStore, + SecretStore: secretStore, + ChartStore: chartStore, + }) + + err := r.Watch(ctx) + assert.NoError(b, err) + + go r.Reconcile(ctx) + + for i := 0; i < b.N; i++ { + scrt := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + Data: faker.Word(), + } + meta := &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: kind, + Namespace: resource.DefaultNamespace, + Env: map[string][]spec.Value{ + "key": { + { + ID: scrt.GetID(), + Data: "{{ . }}", + }, + }, + }, + } + + specStore.Store(ctx, meta) + secretStore.Store(ctx, scrt) + + select { + case <-symbols: + case <-ctx.Done(): + } + } + }) + + b.Run("Chart", func(b *testing.B) { + ctx := context.TODO() + + s := scheme.New() + kind := faker.UUIDHyphenated() + + specStore := spec.NewStore() + secretStore := secret.NewStore() + chartStore := chart.NewStore() + + h := hook.New() + symbols := make(chan *symbol.Symbol) + + h.AddLoadHook(symbol.LoadFunc(func(sb *symbol.Symbol) error { + symbols <- sb + return nil + })) + + r := New(Config{ + Scheme: s, + Hook: h, + SpecStore: specStore, + SecretStore: secretStore, + ChartStore: chartStore, + }) + defer r.Close() + + err := r.Watch(ctx) + assert.NoError(b, err) + + go r.Reconcile(ctx) + + for i := 0; i < b.N; i++ { + meta := &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: kind, + Namespace: resource.DefaultNamespace, + } + chrt := &chart.Chart{ + ID: uuid.Must(uuid.NewV7()), + Namespace: resource.DefaultNamespace, + Name: kind, + } + + specStore.Store(ctx, meta) + chartStore.Store(ctx, chrt) + + select { + case <-symbols: + case <-ctx.Done(): + } + } + }) +}