Skip to content

Commit

Permalink
feat: add retry node
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 15, 2024
1 parent 621bee3 commit 93efc29
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 22 deletions.
1 change: 1 addition & 0 deletions ext/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Precisely manage data flow.
- **[NOP Node](./docs/nop_node.md)**: Responds to input packets with an empty packet, without any processing.
- **[Pipe Node](./docs/pipe_node.md)**: Processes input packets and distributes results to multiple output ports, allowing for reusable data flows.
- **[Reduce Node](./docs/reduce_node.md)**: Repeatedly processes input data to produce a single output value, useful for data aggregation.
- **[Retry Node](./docs/retry_node.md)**: Retries packet processing a specified number of times upon encountering errors.
- **[Session Node](./docs/session_node.md)**: Stores and manages process information, maintaining session continuity.
- **[Snippet Node](./docs/snippet_node.md)**: Executes code snippets written in various programming languages to process input packets.
- **[Split Node](./docs/split_node.md)**: Splits input packets into multiple parts for processing.
Expand Down
1 change: 1 addition & 0 deletions ext/README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- **[NOP 노드](./docs/nop_node_kr.md)**: 입력 패킷을 처리하지 않고 빈 패킷으로 응답합니다.
- **[Pipe 노드](./docs/pipe_node_kr.md)**: 입력 패킷을 처리하고 결과를 여러 출력 포트로 전달하여 데이터 흐름을 재사용합니다.
- **[Reduce 노드](./docs/reduce_node_kr.md)**: 입력 데이터를 반복적으로 연산하여 하나의 출력 값을 생성합니다. 데이터 집계에 유용합니다.
- **[Retry 노드](./docs/retry_node_kr.md)**: 오류가 발생하면 지정된 횟수만큼 패킷 처리를 재시도합니다.
- **[Session 노드](./docs/session_node_kr.md)**: 프로세스 정보를 저장하고 관리하여 세션을 유지합니다.
- **[Snippet 노드](./docs/snippet_node_kr.md)**: 다양한 프로그래밍 언어로 작성된 코드 스니펫을 실행하여 입력 패킷을 처리합니다.
- **[Split 노드](./docs/split_node_kr.md)**: 입력 패킷을 여러 개로 나누어 처리합니다.
Expand Down
20 changes: 20 additions & 0 deletions ext/docs/retry_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Retry Node

**The Retry Node** retries packet processing multiple times in case of errors. This node is useful for tasks prone to temporary failures, providing multiple attempts to improve the chances of success before ultimately sending the packet to an error output if the retries are exhausted.

## Specification

- **limit**: Specifies the maximum number of retry attempts for processing a packet in case of failure. Once the retry limit is exceeded, the packet is routed to the error output port.

## Ports

- **in**: Receives the input packet and initiates processing. The packet will be retried until the `limit` is reached if processing fails.
- **out**: Outputs the packet if processing is successful within the retry limit.
- **error**: Routes the packet to the error output if it exceeds the retry limit without success.

## Example

```yaml
- kind: retry
limit: 3 # Retry up to 3 times
```
20 changes: 20 additions & 0 deletions ext/docs/retry_node_kr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Retry Node

**Retry Node**는 오류 발생 시 패킷 처리를 여러 번 시도합니다. 이 노드는 일시적인 오류가 발생할 수 있는 작업에 유용하며, 재시도를 통해 성공 가능성을 높이고, 재시도가 끝나면 오류 출력 포트로 패킷을 전송합니다.

## 명세

- **limit**: 오류가 발생할 경우 패킷을 최대 몇 번까지 재시도할지 설정합니다. 재시도 횟수를 초과하면 패킷은 오류 출력 포트로 전송됩니다.

## 포트

- **in**: 입력 패킷을 받아 처리를 시작합니다. 처리 실패 시 `limit`에 도달할 때까지 재시도를 시도합니다.
- **out**: 재시도 횟수 내에 처리에 성공하면 원래의 입력 패킷을 출력합니다.
- **error**: 재시도 횟수를 초과하여 처리에 실패한 패킷을 출력합니다.

## 예시

```yaml
- kind: retry
limit: 3 # 최대 3회 재시도
```
3 changes: 3 additions & 0 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
s.AddKnownType(KindReduce, &ReduceNodeSpec{})
s.AddCodec(KindReduce, NewReduceNodeCodec(expr))

s.AddKnownType(KindRetry, &RetryNodeSpec{})
s.AddCodec(KindRetry, NewRetryNodeCodec())

s.AddKnownType(KindSession, &SessionNodeSpec{})
s.AddCodec(KindSession, NewSessionNodeCodec())

Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/control/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddToScheme(t *testing.T) {
err := AddToScheme(m, text.Language).AddToScheme(s)
assert.NoError(t, err)

tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}
tests := []string{KindPipe, KindFork, KindIf, KindLoop, KindMerge, KindNOP, KindReduce, KindRetry, KindSession, KindSnippet, KindSplit, KindSwitch, KindWait}

for _, tt := range tests {
t.Run(tt, func(t *testing.T) {
Expand Down
145 changes: 145 additions & 0 deletions ext/pkg/control/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package control

import (
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/types"
"sync"
"sync/atomic"
)

// RetryNodeSpec defines the configuration for RetryNode.
type RetryNodeSpec struct {
spec.Meta `map:",inline"`
Limit int `map:"limit"`
}

// RetryNode attempts to process packets up to a specified retry limit.
type RetryNode struct {
limit int
tracer *packet.Tracer
inPort *port.InPort
outPort *port.OutPort
errPort *port.OutPort
}

var _ node.Node = (*RetryNode)(nil)

const KindRetry = "retry"

// NewRetryNodeCodec creates a codec to build RetryNode from RetryNodeSpec.
func NewRetryNodeCodec() scheme.Codec {
return scheme.CodecWithType(func(spec *RetryNodeSpec) (node.Node, error) {
return NewRetryNode(spec.Limit), nil
})
}

// NewRetryNode initializes a RetryNode with the given retry limit.
func NewRetryNode(limit int) *RetryNode {
n := &RetryNode{
limit: limit,
tracer: packet.NewTracer(),
inPort: port.NewIn(),
outPort: port.NewOut(),
errPort: port.NewOut(),
}

n.inPort.AddListener(port.ListenFunc(n.forward))
n.outPort.AddListener(port.ListenFunc(n.backward))
n.errPort.AddListener(port.ListenFunc(n.catch))

return n
}

// In returns the input port based on the given name.
func (n *RetryNode) In(name string) *port.InPort {
if name == node.PortIn {
return n.inPort
}
return nil
}

// Out returns the output port based on the given name.
func (n *RetryNode) Out(name string) *port.OutPort {
switch name {
case node.PortOut:
return n.outPort
case node.PortErr:
return n.errPort
default:
return nil
}
}

// Close shuts down all ports and releases resources.
func (n *RetryNode) Close() error {
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()
n.tracer.Close()
return nil
}

func (n *RetryNode) forward(proc *process.Process) {
inReader := n.inPort.Open(proc)
outWriter := n.outPort.Open(proc)
errWriter := n.errPort.Open(proc)

attempts := &sync.Map{}

for inPck := range inReader.Read() {
n.tracer.Read(inReader, inPck)

var hook packet.Hook
hook = packet.HookFunc(func(backPck *packet.Packet) {
if _, ok := backPck.Payload().(types.Error); !ok {
n.tracer.Transform(inPck, backPck)
n.tracer.Reduce(backPck)
return
}

actual, _ := attempts.LoadOrStore(inPck, &atomic.Uint32{})
count := actual.(*atomic.Uint32)

for {
v := count.Load()

if int(v) == n.limit {
n.tracer.Transform(inPck, backPck)
n.tracer.Write(errWriter, backPck)
return
}

if count.CompareAndSwap(v, v+1) {
break
}
}

n.tracer.AddHook(inPck, hook)
n.tracer.Write(outWriter, inPck)
})

n.tracer.AddHook(inPck, hook)
n.tracer.Write(outWriter, inPck)
}
}

func (n *RetryNode) backward(proc *process.Process) {
outWriter := n.outPort.Open(proc)

for backPck := range outWriter.Receive() {
n.tracer.Receive(outWriter, backPck)
}
}

func (n *RetryNode) catch(proc *process.Process) {
errWriter := n.errPort.Open(proc)

for backPck := range errWriter.Receive() {
n.tracer.Receive(errWriter, backPck)
}
}
113 changes: 113 additions & 0 deletions ext/pkg/control/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package control

import (
"context"
"github.com/go-faker/faker/v4"
"github.com/pkg/errors"
"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/types"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestRetryNodeCodec_Decode(t *testing.T) {
codec := NewRetryNodeCodec()

spec := &RetryNodeSpec{
Limit: 0,
}

n, err := codec.Compile(spec)
assert.NoError(t, err)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestNewRetryNode(t *testing.T) {
n := NewRetryNode(0)
assert.NotNil(t, n)
assert.NoError(t, n.Close())
}

func TestRetryNode_Port(t *testing.T) {
n := NewRetryNode(0)
defer n.Close()

assert.NotNil(t, n.In(node.PortIn))
assert.NotNil(t, n.Out(node.PortOut))
assert.NotNil(t, n.Out(node.PortErr))
}

func TestRetryNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

limit := 2

n1 := NewRetryNode(limit)
defer n1.Close()

count := 0
n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) {
count += 1
return nil, packet.New(types.NewError(errors.New(faker.Sentence())))
})
defer n2.Close()

n1.Out(node.PortOut).Link(n2.In(node.PortIn))

in := port.NewOut()
in.Link(n1.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

inPayload := types.NewString(faker.Word())
inPck := packet.New(inPayload)

inWriter.Write(inPck)

select {
case outPck := <-inWriter.Receive():
assert.Equal(t, limit+1, count)
assert.IsType(t, outPck.Payload(), types.NewError(nil))
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
}

func BenchmarkRetryNode_SendAndReceive(b *testing.B) {
n1 := NewRetryNode(1)
defer n1.Close()

n2 := node.NewOneToOneNode(func(_ *process.Process, inPck *packet.Packet) (*packet.Packet, *packet.Packet) {
return nil, packet.New(types.NewError(errors.New(faker.Sentence())))
})
defer n2.Close()

n1.Out(node.PortOut).Link(n2.In(node.PortIn))

in := port.NewOut()
in.Link(n1.In(node.PortIn))

proc := process.New()
defer proc.Exit(nil)

inWriter := in.Open(proc)

inPayload := types.NewString(faker.Word())
inPck := packet.New(inPayload)

b.ResetTimer()

for i := 0; i < b.N; i++ {
inWriter.Write(inPck)
<-inWriter.Receive()
}
}
4 changes: 2 additions & 2 deletions ext/pkg/control/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"time"
)

// WaitNodeSpec defines the configuration for WaitNode, including a delay duration.
// WaitNodeSpec defines the configuration for WaitNode.
type WaitNodeSpec struct {
spec.Meta `map:",inline"`
Interval time.Duration `map:"timeout"`
Interval time.Duration `map:"interval"`
}

// WaitNode adds a delay to packet processing, using a specified interval.
Expand Down
Loading

0 comments on commit 93efc29

Please sign in to comment.