Skip to content

Commit

Permalink
refactor: call node -> pipe node
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 7, 2024
1 parent b35118a commit 5c9b067
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 68 deletions.
5 changes: 3 additions & 2 deletions cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
)

require (
cel.dev/expr v0.18.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/lipgloss v1.0.0 // indirect
Expand All @@ -34,7 +35,7 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/benbjohnson/immutable v0.4.3 // indirect
github.com/charmbracelet/bubbles v0.20.0
github.com/charmbracelet/bubbletea v1.1.2
github.com/charmbracelet/bubbletea v1.2.0
github.com/charmbracelet/x/ansi v0.4.5 // indirect
github.com/charmbracelet/x/term v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand All @@ -45,7 +46,7 @@ require (
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.4+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/cel-go v0.22.0 // indirect
github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions cmd/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cel.dev/expr v0.18.0 h1:CJ6drgk+Hf96lkLikr4rFf19WrU0BOWEihyZnI2TAzo=
cel.dev/expr v0.18.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
Expand All @@ -16,8 +18,8 @@ github.com/benbjohnson/immutable v0.4.3 h1:GYHcksoJ9K6HyAUpGxwZURrbTkXA0Dh4otXGq
github.com/benbjohnson/immutable v0.4.3/go.mod h1:qJIKKSmdqz1tVzNtst1DZzvaqOU1onk1rc03IeM3Owk=
github.com/charmbracelet/bubbles v0.20.0 h1:jSZu6qD8cRQ6k9OMfR1WlM+ruM8fkPWkHvQWD9LIutE=
github.com/charmbracelet/bubbles v0.20.0/go.mod h1:39slydyswPy+uVOHZ5x/GjwVAFkCsV8IIVy+4MhzwwU=
github.com/charmbracelet/bubbletea v1.1.2 h1:naQXF2laRxyLyil/i7fxdpiz1/k06IKquhm4vBfHsIc=
github.com/charmbracelet/bubbletea v1.1.2/go.mod h1:9HIU/hBV24qKjlehyj8z1r/tR9TYTQEag+cWZnuXo8E=
github.com/charmbracelet/bubbletea v1.2.0 h1:WYHclJaFDOz4dPxiGx7owwb8P4000lYPcuXPIALS5Z8=
github.com/charmbracelet/bubbletea v1.2.0/go.mod h1:viLoDL7hG4njLJSKU2gw7kB3LSEmWsrM80rO1dBJWBI=
github.com/charmbracelet/lipgloss v1.0.0 h1:O7VkGDvqEdGi93X+DeqsQ7PKHDgtQfF8j8/O2qFMQNg=
github.com/charmbracelet/lipgloss v1.0.0/go.mod h1:U5fy9Z+C38obMs+T+tJqst9VGzlOYGj4ri9reL3qUlo=
github.com/charmbracelet/x/ansi v0.4.5 h1:LqK4vwBNaXw2AyGIICa5/29Sbdq58GbGdFngSexTdRM=
Expand Down Expand Up @@ -53,8 +55,8 @@ github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI=
github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc=
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 h1:sAGdeJj0bnMgUNVeUpp6AYlVdCt3/GdI3pGRqsNSQLs=
Expand Down
10 changes: 5 additions & 5 deletions cmd/pkg/cli/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestStartCommand_Execute(t *testing.T) {
s.AddCodec(kind, codec)

t.Run("NoFlag", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

meta := &spec.Meta{
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestStartCommand_Execute(t *testing.T) {
})

t.Run(flagDebug, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

meta := &spec.Meta{
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestStartCommand_Execute(t *testing.T) {
})

t.Run(flagFromSpecs, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

filename := "specs.json"
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestStartCommand_Execute(t *testing.T) {
})

t.Run(flagFromSecrets, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

filename := "secrets.json"
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestStartCommand_Execute(t *testing.T) {
})

t.Run(flagFromCharts, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

filename := "charts.json"
Expand Down
11 changes: 4 additions & 7 deletions examples/helloworld.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
- kind: nop
name: init
ports:
init:
- name: hello_world
port: in

- kind: block
name: hello_world
specs:
Expand All @@ -13,3 +6,7 @@
code: Hello, World!
- kind: print
filename: /dev/stdout
ports:
init:
- name: hello_world
port: in
3 changes: 2 additions & 1 deletion examples/httpproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
port: '{{ .PORT }}'
ports:
out:
- name: router
- name: proxy
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: proxy
name: proxy
urls: [https://www.google.com/]
2 changes: 1 addition & 1 deletion ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ Built-in extensions enable efficient processing of various tasks and maximize sy

Precisely manage data flow.

- **[Call Node](./docs/call_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows.
- **[Block Node](./docs/block_node.md)**: Systematically manages complex data processing flows and executes multiple sub-nodes sequentially.
- **[Fork Node](./docs/fork_node.md)**: Asynchronously branches data flows to perform independent tasks in parallel.
- **[If Node](./docs/if_node.md)**: Evaluates conditions to split packets into two paths.
- **[Loop Node](./docs/loop_node.md)**: Divides input packets into multiple sub-packets for repeated processing.
- **[Merge Node](./docs/merge_node.md)**: Combines multiple input packets into one.
- **[NOP Node](./docs/nop_node.md)**: Responds to input packets with an empty packet, without any processing.
- **[Pipe Node](./docs/pipe_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows.
- **[Reduce Node](./docs/reduce_node.md)**: Repeatedly processes input data to produce a single output value, useful for data aggregation.
- **[Session Node](./docs/session_node.md)**: Stores and manages process information, maintaining session continuity.
- **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets.
Expand Down
2 changes: 1 addition & 1 deletion ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

데이터 흐름을 정밀하게 제어합니다.

- **[Call 노드](./docs/call_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다.
- **[Block 노드](./docs/block_node_kr.md)**: 복잡한 데이터 처리 흐름을 체계적으로 관리하며, 여러 하위 노드를 순차적으로 실행합니다.
- **[Fork 노드](./docs/fork_node_kr.md)**: 데이터 흐름을 비동기적으로 분기하여 독립적인 작업을 병렬로 수행합니다.
- **[If 노드](./docs/if_node_kr.md)**: 조건을 평가하여 패킷을 두 경로로 분기합니다.
- **[Loop 노드](./docs/loop_node_kr.md)**: 입력 패킷을 여러 하위 패킷으로 나누어 반복 처리합니다.
- **[Merge 노드](./docs/merge_node_kr.md)**: 여러 입력 패킷을 하나로 통합합니다.
- **[NOP 노드](./docs/nop_node_kr.md)**: 입력 패킷을 처리하지 않고 빈 패킷으로 응답합니다.
- **[Pipe 노드](./docs/pipe_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다.
- **[Reduce 노드](./docs/reduce_node_kr.md)**: 입력 데이터를 반복적으로 연산하여 하나의 출력 값을 생성합니다. 데이터 집계에 유용합니다.
- **[Session 노드](./docs/session_node_kr.md)**: 프로세스 정보를 저장하고 관리하여 세션을 유지합니다.
- **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다.
Expand Down
6 changes: 3 additions & 3 deletions ext/docs/call_node.md → ext/docs/pipe_node.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Call Node
# Pipe Node

**The Call Node** processes the input packet and delivers the result to multiple output ports. This node allows for the reuse of data processing flows and is useful for modularizing complex tasks.
**The Pipe Node** processes the input packet and delivers the result to multiple output ports. This node allows for the reuse of data processing flows and is useful for modularizing complex tasks.

## Specification

Expand All @@ -16,7 +16,7 @@
## Example

```yaml
- kind: call
- kind: pipe
ports:
out[0]:
- name: origin
Expand Down
6 changes: 3 additions & 3 deletions ext/docs/call_node_kr.md → ext/docs/pipe_node_kr.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Call 노드
# Pipe 노드

**Call 노드**는 입력된 패킷을 처리하고 그 결과를 여러 출력 포트로 전달하는 역할을 합니다. 이 노드는 데이터 처리 흐름을 호출하여 재사용할 수 있으며, 복잡한 작업을 모듈화하는 데 유용합니다.
**Pipe 노드**는 입력된 패킷을 처리하고 그 결과를 여러 출력 포트로 전달하는 역할을 합니다. 이 노드는 데이터 처리 흐름을 호출하여 재사용할 수 있으며, 복잡한 작업을 모듈화하는 데 유용합니다.

## 명세

Expand All @@ -16,7 +16,7 @@
## 예시

```yaml
- kind: call
- kind: pipe
ports:
out[0]:
- name: origin
Expand Down
4 changes: 3 additions & 1 deletion ext/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/evanw/esbuild v0.24.0
github.com/go-faker/faker/v4 v4.5.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/google/cel-go v0.21.0
github.com/google/cel-go v0.22.0
github.com/gorilla/websocket v1.5.3
github.com/iancoleman/strcase v0.3.0
github.com/jmoiron/sqlx v1.4.0
Expand All @@ -20,6 +20,8 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require cel.dev/expr v0.18.0 // indirect

require (
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/benbjohnson/immutable v0.4.3 // indirect
Expand Down
6 changes: 4 additions & 2 deletions ext/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cel.dev/expr v0.18.0 h1:CJ6drgk+Hf96lkLikr4rFf19WrU0BOWEihyZnI2TAzo=
cel.dev/expr v0.18.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
Expand Down Expand Up @@ -27,8 +29,8 @@ github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpv
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI=
github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc=
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 h1:sAGdeJj0bnMgUNVeUpp6AYlVdCt3/GdI3pGRqsNSQLs=
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
s.AddKnownType(KindBlock, &BlockNodeSpec{})
s.AddCodec(KindBlock, NewBlockNodeCodec(s))

s.AddKnownType(KindCall, &CallNodeSpec{})
s.AddCodec(KindCall, NewCallNodeCodec())
s.AddKnownType(KindPipe, &PipeNodeSpec{})
s.AddCodec(KindPipe, NewPipeNodeCodec())

s.AddKnownType(KindFork, &ForkNodeSpec{})
s.AddCodec(KindFork, NewForkNodeCodec())
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindCall, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch}
tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
38 changes: 19 additions & 19 deletions ext/pkg/control/call.go → ext/pkg/control/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ import (
"github.com/siyul-park/uniflow/pkg/types"
)

// CallNodeSpec holds the specification for creating a CallNode.
type CallNodeSpec struct {
// PipeNodeSpec holds the specification for creating a PipeNode.
type PipeNodeSpec struct {
spec.Meta `map:",inline"`
}

// CallNode processes an input packet and sends the result to multiple output ports.
type CallNode struct {
// PipeNode processes an input packet and sends the result to multiple output ports.
type PipeNode struct {
tracer *packet.Tracer
inPort *port.InPort
outPorts []*port.OutPort
errPort *port.OutPort
}

const KindCall = "call"
const KindPipe = "pipe"

var _ node.Node = (*CallNode)(nil)
var _ node.Node = (*PipeNode)(nil)

// NewCallNodeCodec creates a new codec for CallNodeSpec.
func NewCallNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(spec *CallNodeSpec) (node.Node, error) {
return NewCallNode(), nil
// NewPipeNodeCodec creates a new codec for PipeNodeSpec.
func NewPipeNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(spec *PipeNodeSpec) (node.Node, error) {
return NewPipeNode(), nil
})
}

// NewCallNode creates a new CallNode.
func NewCallNode() *CallNode {
n := &CallNode{
// NewPipeNode creates a new PipeNode.
func NewPipeNode() *PipeNode {
n := &PipeNode{
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPorts: []*port.OutPort{port.NewOut(), port.NewOut()},
Expand All @@ -52,7 +52,7 @@ func NewCallNode() *CallNode {
}

// In returns the input port with the specified name.
func (n *CallNode) In(name string) *port.InPort {
func (n *PipeNode) In(name string) *port.InPort {
switch name {
case node.PortIn:
return n.inPort
Expand All @@ -62,7 +62,7 @@ func (n *CallNode) In(name string) *port.InPort {
}

// Out returns the output port with the specified name.
func (n *CallNode) Out(name string) *port.OutPort {
func (n *PipeNode) Out(name string) *port.OutPort {
switch name {
case node.PortOut:
return n.outPorts[0]
Expand All @@ -80,7 +80,7 @@ func (n *CallNode) Out(name string) *port.OutPort {
}

// Close closes all ports associated with the node.
func (n *CallNode) Close() error {
func (n *PipeNode) Close() error {
n.inPort.Close()
for _, outPort := range n.outPorts {
outPort.Close()
Expand All @@ -90,7 +90,7 @@ func (n *CallNode) Close() error {
return nil
}

func (n *CallNode) forward(proc *process.Process) {
func (n *PipeNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
outWriter0 := n.outPorts[0].Open(proc)
outWriter1 := n.outPorts[1].Open(proc)
Expand All @@ -112,7 +112,7 @@ func (n *CallNode) forward(proc *process.Process) {
}
}

func (n *CallNode) backward(index int) port.Listener {
func (n *PipeNode) backward(index int) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
outWriter := n.outPorts[index].Open(proc)

Expand All @@ -122,7 +122,7 @@ func (n *CallNode) backward(index int) port.Listener {
})
}

func (n *CallNode) catch(proc *process.Process) {
func (n *PipeNode) catch(proc *process.Process) {
errWriter := n.errPort.Open(proc)

for backPck := range errWriter.Receive() {
Expand Down
Loading

0 comments on commit 5c9b067

Please sign in to comment.