From 93efc29d3af5ad7f589a02f4c7401299848aff5a Mon Sep 17 00:00:00 2001 From: siyual-park Date: Fri, 15 Nov 2024 10:25:33 +0900 Subject: [PATCH] feat: add retry node --- ext/README.md | 1 + ext/README_kr.md | 1 + ext/docs/retry_node.md | 20 +++++ ext/docs/retry_node_kr.md | 20 +++++ ext/pkg/control/builder.go | 3 + ext/pkg/control/builder_test.go | 2 +- ext/pkg/control/retry.go | 145 ++++++++++++++++++++++++++++++++ ext/pkg/control/retry_test.go | 113 +++++++++++++++++++++++++ ext/pkg/control/wait.go | 4 +- pkg/packet/tracer.go | 48 ++++++----- 10 files changed, 335 insertions(+), 22 deletions(-) create mode 100644 ext/docs/retry_node.md create mode 100644 ext/docs/retry_node_kr.md create mode 100644 ext/pkg/control/retry.go create mode 100644 ext/pkg/control/retry_test.go diff --git a/ext/README.md b/ext/README.md index 778efb63..993bad96 100644 --- a/ext/README.md +++ b/ext/README.md @@ -16,6 +16,7 @@ Precisely manage data flow. - **[NOP Node](./docs/nop_node.md)**: Responds to input packets with an empty packet, without any processing. - **[Pipe Node](./docs/pipe_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows. - **[Reduce Node](./docs/reduce_node.md)**: Repeatedly processes input data to produce a single output value, useful for data aggregation. +- **[Retry Node](./docs/retry_node.md)**: Retries packet processing a specified number of times upon encountering errors. - **[Session Node](./docs/session_node.md)**: Stores and manages process information, maintaining session continuity. - **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets. - **[Split Node](./docs/split_node.md)**: Splits input packets into multiple parts for processing. diff --git a/ext/README_kr.md b/ext/README_kr.md index fa80d803..fcc90b86 100644 --- a/ext/README_kr.md +++ b/ext/README_kr.md @@ -16,6 +16,7 @@ - **[NOP 노드](./docs/nop_node_kr.md)**: 입력 패킷을 처리하지 않고 빈 패킷으로 응답합니다. - **[Pipe 노드](./docs/pipe_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다. - **[Reduce 노드](./docs/reduce_node_kr.md)**: 입력 데이터를 반복적으로 연산하여 하나의 출력 값을 생성합니다. 데이터 집계에 유용합니다. +- **[Retry 노드](./docs/retry_node_kr.md)**: 오류가 발생하면 지정된 횟수만큼 패킷 처리를 재시도합니다. - **[Session 노드](./docs/session_node_kr.md)**: 프로세스 정보를 저장하고 관리하여 세션을 유지합니다. - **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다. - **[Split 노드](./docs/split_node_kr.md)**: 입력 패킷을 여러 개로 나누어 처리합니다. diff --git a/ext/docs/retry_node.md b/ext/docs/retry_node.md new file mode 100644 index 00000000..8750abec --- /dev/null +++ b/ext/docs/retry_node.md @@ -0,0 +1,20 @@ +# Retry Node + +**The Retry Node** retries packet processing multiple times in case of errors. This node is useful for tasks prone to temporary failures, providing multiple attempts to improve the chances of success before ultimately sending the packet to an error output if the retries are exhausted. + +## Specification + +- **limit**: Specifies the maximum number of retry attempts for processing a packet in case of failure. Once the retry limit is exceeded, the packet is routed to the error output port. + +## Ports + +- **in**: Receives the input packet and initiates processing. The packet will be retried until the `limit` is reached if processing fails. +- **out**: Outputs the packet if processing is successful within the retry limit. +- **error**: Routes the packet to the error output if it exceeds the retry limit without success. + +## Example + +```yaml +- kind: retry + limit: 3 # Retry up to 3 times +``` diff --git a/ext/docs/retry_node_kr.md b/ext/docs/retry_node_kr.md new file mode 100644 index 00000000..af9f6ed4 --- /dev/null +++ b/ext/docs/retry_node_kr.md @@ -0,0 +1,20 @@ +# Retry Node + +**Retry Node**는 오류 발생 시 패킷 처리를 여러 번 시도합니다. 이 노드는 일시적인 오류가 발생할 수 있는 작업에 유용하며, 재시도를 통해 성공 가능성을 높이고, 재시도가 끝나면 오류 출력 포트로 패킷을 전송합니다. + +## 명세 + +- **limit**: 오류가 발생할 경우 패킷을 최대 몇 번까지 재시도할지 설정합니다. 재시도 횟수를 초과하면 패킷은 오류 출력 포트로 전송됩니다. + +## 포트 + +- **in**: 입력 패킷을 받아 처리를 시작합니다. 처리 실패 시 `limit`에 도달할 때까지 재시도를 시도합니다. +- **out**: 재시도 횟수 내에 처리에 성공하면 원래의 입력 패킷을 출력합니다. +- **error**: 재시도 횟수를 초과하여 처리에 실패한 패킷을 출력합니다. + +## 예시 + +```yaml +- kind: retry + limit: 3 # 최대 3회 재시도 +``` diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index ec20df59..91cb9298 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -64,6 +64,9 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { s.AddKnownType(KindReduce, &ReduceNodeSpec{}) s.AddCodec(KindReduce, NewReduceNodeCodec(expr)) + s.AddKnownType(KindRetry, &RetryNodeSpec{}) + s.AddCodec(KindRetry, NewRetryNodeCodec()) + s.AddKnownType(KindSession, &SessionNodeSpec{}) s.AddCodec(KindSession, NewSessionNodeCodec()) diff --git a/ext/pkg/control/builder_test.go b/ext/pkg/control/builder_test.go index 26f2f571..51a94d2e 100644 --- a/ext/pkg/control/builder_test.go +++ b/ext/pkg/control/builder_test.go @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) { err := AddToScheme(m, text.Language).AddToScheme(s) assert.NoError(t, err) - tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait} + tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindRetry, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait} for _, tt := range tests { t.Run(tt, func(t *testing.T) { diff --git a/ext/pkg/control/retry.go b/ext/pkg/control/retry.go new file mode 100644 index 00000000..4b3cc440 --- /dev/null +++ b/ext/pkg/control/retry.go @@ -0,0 +1,145 @@ +package control + +import ( + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" + "github.com/siyul-park/uniflow/pkg/types" + "sync" + "sync/atomic" +) + +// RetryNodeSpec defines the configuration for RetryNode. +type RetryNodeSpec struct { + spec.Meta `map:",inline"` + Limit int `map:"limit"` +} + +// RetryNode attempts to process packets up to a specified retry limit. +type RetryNode struct { + limit int + tracer *packet.Tracer + inPort *port.InPort + outPort *port.OutPort + errPort *port.OutPort +} + +var _ node.Node = (*RetryNode)(nil) + +const KindRetry = "retry" + +// NewRetryNodeCodec creates a codec to build RetryNode from RetryNodeSpec. +func NewRetryNodeCodec() scheme.Codec { + return scheme.CodecWithType(func(spec *RetryNodeSpec) (node.Node, error) { + return NewRetryNode(spec.Limit), nil + }) +} + +// NewRetryNode initializes a RetryNode with the given retry limit. +func NewRetryNode(limit int) *RetryNode { + n := &RetryNode{ + limit: limit, + tracer: packet.NewTracer(), + inPort: port.NewIn(), + outPort: port.NewOut(), + errPort: port.NewOut(), + } + + n.inPort.AddListener(port.ListenFunc(n.forward)) + n.outPort.AddListener(port.ListenFunc(n.backward)) + n.errPort.AddListener(port.ListenFunc(n.catch)) + + return n +} + +// In returns the input port based on the given name. +func (n *RetryNode) In(name string) *port.InPort { + if name == node.PortIn { + return n.inPort + } + return nil +} + +// Out returns the output port based on the given name. +func (n *RetryNode) Out(name string) *port.OutPort { + switch name { + case node.PortOut: + return n.outPort + case node.PortErr: + return n.errPort + default: + return nil + } +} + +// Close shuts down all ports and releases resources. +func (n *RetryNode) Close() error { + n.inPort.Close() + n.outPort.Close() + n.errPort.Close() + n.tracer.Close() + return nil +} + +func (n *RetryNode) forward(proc *process.Process) { + inReader := n.inPort.Open(proc) + outWriter := n.outPort.Open(proc) + errWriter := n.errPort.Open(proc) + + attempts := &sync.Map{} + + for inPck := range inReader.Read() { + n.tracer.Read(inReader, inPck) + + var hook packet.Hook + hook = packet.HookFunc(func(backPck *packet.Packet) { + if _, ok := backPck.Payload().(types.Error); !ok { + n.tracer.Transform(inPck, backPck) + n.tracer.Reduce(backPck) + return + } + + actual, _ := attempts.LoadOrStore(inPck, &atomic.Uint32{}) + count := actual.(*atomic.Uint32) + + for { + v := count.Load() + + if int(v) == n.limit { + n.tracer.Transform(inPck, backPck) + n.tracer.Write(errWriter, backPck) + return + } + + if count.CompareAndSwap(v, v+1) { + break + } + } + + n.tracer.AddHook(inPck, hook) + n.tracer.Write(outWriter, inPck) + }) + + n.tracer.AddHook(inPck, hook) + n.tracer.Write(outWriter, inPck) + } +} + +func (n *RetryNode) backward(proc *process.Process) { + outWriter := n.outPort.Open(proc) + + for backPck := range outWriter.Receive() { + n.tracer.Receive(outWriter, backPck) + } +} + +func (n *RetryNode) catch(proc *process.Process) { + errWriter := n.errPort.Open(proc) + + for backPck := range errWriter.Receive() { + n.tracer.Receive(errWriter, backPck) + } +} diff --git a/ext/pkg/control/retry_test.go b/ext/pkg/control/retry_test.go new file mode 100644 index 00000000..264fc1dd --- /dev/null +++ b/ext/pkg/control/retry_test.go @@ -0,0 +1,113 @@ +package control + +import ( + "context" + "github.com/go-faker/faker/v4" + "github.com/pkg/errors" + "github.com/siyul-park/uniflow/pkg/node" + "github.com/siyul-park/uniflow/pkg/packet" + "github.com/siyul-park/uniflow/pkg/port" + "github.com/siyul-park/uniflow/pkg/process" + "github.com/siyul-park/uniflow/pkg/types" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestRetryNodeCodec_Decode(t *testing.T) { + codec := NewRetryNodeCodec() + + spec := &RetryNodeSpec{ + Limit: 0, + } + + n, err := codec.Compile(spec) + assert.NoError(t, err) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestNewRetryNode(t *testing.T) { + n := NewRetryNode(0) + assert.NotNil(t, n) + assert.NoError(t, n.Close()) +} + +func TestRetryNode_Port(t *testing.T) { + n := NewRetryNode(0) + defer n.Close() + + assert.NotNil(t, n.In(node.PortIn)) + assert.NotNil(t, n.Out(node.PortOut)) + assert.NotNil(t, n.Out(node.PortErr)) +} + +func TestRetryNode_SendAndReceive(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + limit := 2 + + n1 := NewRetryNode(limit) + defer n1.Close() + + count := 0 + n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { + count += 1 + return nil, packet.New(types.NewError(errors.New(faker.Sentence()))) + }) + defer n2.Close() + + n1.Out(node.PortOut).Link(n2.In(node.PortIn)) + + in := port.NewOut() + in.Link(n1.In(node.PortIn)) + + proc := process.New() + defer proc.Exit(nil) + + inWriter := in.Open(proc) + + inPayload := types.NewString(faker.Word()) + inPck := packet.New(inPayload) + + inWriter.Write(inPck) + + select { + case outPck := <-inWriter.Receive(): + assert.Equal(t, limit+1, count) + assert.IsType(t, outPck.Payload(), types.NewError(nil)) + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + } +} + +func BenchmarkRetryNode_SendAndReceive(b *testing.B) { + n1 := NewRetryNode(1) + defer n1.Close() + + n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) { + return nil, packet.New(types.NewError(errors.New(faker.Sentence()))) + }) + defer n2.Close() + + n1.Out(node.PortOut).Link(n2.In(node.PortIn)) + + in := port.NewOut() + in.Link(n1.In(node.PortIn)) + + proc := process.New() + defer proc.Exit(nil) + + inWriter := in.Open(proc) + + inPayload := types.NewString(faker.Word()) + inPck := packet.New(inPayload) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + inWriter.Write(inPck) + <-inWriter.Receive() + } +} diff --git a/ext/pkg/control/wait.go b/ext/pkg/control/wait.go index 8449ee4b..84165899 100644 --- a/ext/pkg/control/wait.go +++ b/ext/pkg/control/wait.go @@ -9,10 +9,10 @@ import ( "time" ) -// WaitNodeSpec defines the configuration for WaitNode, including a delay duration. +// WaitNodeSpec defines the configuration for WaitNode. type WaitNodeSpec struct { spec.Meta `map:",inline"` - Interval time.Duration `map:"timeout"` + Interval time.Duration `map:"interval"` } // WaitNode adds a delay to packet processing, using a specified interval. diff --git a/pkg/packet/tracer.go b/pkg/packet/tracer.go index 7902d19c..4f24cd20 100644 --- a/pkg/packet/tracer.go +++ b/pkg/packet/tracer.go @@ -59,6 +59,7 @@ func (t *Tracer) Reduce(pck *Packet) { t.mu.Lock() defer t.mu.Unlock() + t.reduce(pck, pck) t.handle(pck) t.receive(pck) } @@ -82,7 +83,7 @@ func (t *Tracer) Write(writer *Writer, pck *Packet) { t.writes[writer] = append(t.writes[writer], pck) t.receives[pck] = append(t.receives[pck], nil) } else { - t.receives[pck] = append(t.receives[pck], pck) + t.reduce(pck, pck) t.handle(pck) t.receive(pck) } @@ -105,24 +106,7 @@ func (t *Tracer) Receive(writer *Writer, pck *Packet) { delete(t.writes, writer) } - targets := t.targets[write] - receives := t.receives[write] - - offset := 0 - for i := 0; i < len(targets); i++ { - if receives[i+offset] != nil { - i-- - offset++ - } - } - - for i := len(targets) + offset; i < len(receives); i++ { - if receives[i] == nil { - receives[i] = pck - break - } - } - + t.reduce(write, pck) t.handle(write) t.receive(write) } @@ -145,6 +129,32 @@ func (t *Tracer) Close() { t.reader = make(map[*Packet]*Reader) } +func (t *Tracer) reduce(source, target *Packet) { + targets := t.targets[source] + receives := t.receives[source] + + offset := 0 + for i := 0; i < len(targets); i++ { + if receives[i+offset] != nil { + i-- + offset++ + } + } + + ok := false + for i := len(targets) + offset; i < len(receives); i++ { + if receives[i] == nil { + receives[i] = target + ok = true + break + } + } + if !ok { + receives = append(receives, target) + t.receives[source] = receives + } +} + func (t *Tracer) receive(pck *Packet) { receives := t.receives[pck]