diff --git a/pkg/node/onetomany_test.go b/pkg/node/onetomany_test.go index 8d15a1a4..b0df1d11 100644 --- a/pkg/node/onetomany_test.go +++ b/pkg/node/onetomany_test.go @@ -64,7 +64,7 @@ func TestOneToManyNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) outStream := out.Open(proc) @@ -109,7 +109,7 @@ func TestOneToManyNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/node/onetoone_test.go b/pkg/node/onetoone_test.go index 4b31c383..e63a4b6d 100644 --- a/pkg/node/onetoone_test.go +++ b/pkg/node/onetoone_test.go @@ -65,7 +65,7 @@ func TestOneToOneNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -102,7 +102,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) errStream := err.Open(proc) @@ -142,7 +142,7 @@ func TestOneToOneNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) outStream := out.Open(proc) @@ -188,7 +188,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/packet/error.go b/pkg/packet/error.go index 0713b4b7..47b70235 100644 --- a/pkg/packet/error.go +++ b/pkg/packet/error.go @@ -1,10 +1,15 @@ package packet -import "github.com/siyul-park/uniflow/pkg/primitive" +import ( + "errors" -// NewError creates a new Packet to represent an error. -// It takes an error and an optional cause Packet and constructs a Packet with error details. -func NewError(err error, cause *Packet) *Packet { + "github.com/siyul-park/uniflow/pkg/primitive" +) + +// WithError creates a new Packet representing an error with the given error and optional cause. +// It constructs a Packet with error details, including the error message. +// If a cause is provided, it is attached to the error packet. +func WithError(err error, cause *Packet) *Packet { pairs := []primitive.Value{ primitive.NewString("error"), primitive.NewString(err.Error()), @@ -17,9 +22,14 @@ func NewError(err error, cause *Packet) *Packet { return New(primitive.NewMap(pairs...)) } -// IsError checks if the given Packet represents an error. -func IsError(pck *Packet) bool { +// AsError extracts the error from a Packet, returning it along with a boolean indicating whether +// the Packet contains error information. If the Packet does not represent an error, the +// returned error is nil, and the boolean is false. +func AsError(pck *Packet) (error, bool) { payload := pck.Payload() - _, ok := primitive.Pick[string](payload, "error") - return ok + err, ok := primitive.Pick[string](payload, "error") + if !ok { + return nil, false + } + return errors.New(err), true } diff --git a/pkg/packet/error_test.go b/pkg/packet/error_test.go index 43db384a..03e4a3c1 100644 --- a/pkg/packet/error_test.go +++ b/pkg/packet/error_test.go @@ -9,11 +9,11 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewError(t *testing.T) { +func TestWithError(t *testing.T) { err := errors.New(faker.Sentence()) pck1 := New(primitive.NewString(faker.Word())) - pck2 := NewError(err, pck1) + pck2 := WithError(err, pck1) assert.NotNil(t, pck2) assert.NotZero(t, pck2.ID()) @@ -24,12 +24,13 @@ func TestNewError(t *testing.T) { assert.Equal(t, pck1.Payload(), payload.GetOr(primitive.NewString("cause"), nil)) } -func TestIsError(t *testing.T) { +func TestAsError(t *testing.T) { err := errors.New(faker.Sentence()) pck1 := New(primitive.NewString(faker.Word())) - pck2 := NewError(err, pck1) + pck2 := WithError(err, pck1) - assert.True(t, IsError(pck2)) - assert.False(t, IsError(pck1)) + err1, ok := AsError(pck2) + assert.True(t, ok) + assert.Equal(t, err.Error(), err1.Error()) } diff --git a/pkg/plugin/controllx/snippet.go b/pkg/plugin/controllx/snippet.go index 899ad470..3476d9ea 100644 --- a/pkg/plugin/controllx/snippet.go +++ b/pkg/plugin/controllx/snippet.go @@ -92,9 +92,9 @@ func (n *SnippetNode) action(proc *process.Process, inPck *packet.Packet) (*pack } if output, err := n.run(input); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else if outPayload, err := primitive.MarshalText(output); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else { return packet.New(outPayload), nil } diff --git a/pkg/plugin/controllx/snippet_test.go b/pkg/plugin/controllx/snippet_test.go index f0d7fd53..2fd03b40 100644 --- a/pkg/plugin/controllx/snippet_test.go +++ b/pkg/plugin/controllx/snippet_test.go @@ -44,7 +44,7 @@ function main(inPayload: any): any { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -80,7 +80,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -114,7 +114,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -146,7 +146,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -185,7 +185,7 @@ function main(inPayload: any): any { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -222,7 +222,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -255,7 +255,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -288,7 +288,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) diff --git a/pkg/plugin/controllx/switch.go b/pkg/plugin/controllx/switch.go index 89b0f855..41dbdfa7 100644 --- a/pkg/plugin/controllx/switch.go +++ b/pkg/plugin/controllx/switch.go @@ -106,5 +106,5 @@ func (n *SwitchNode) action(proc *process.Process, inPck *packet.Packet) ([]*pac } } - return nil, packet.NewError(node.ErrDiscardPacket, inPck) + return nil, packet.WithError(node.ErrDiscardPacket, inPck) } diff --git a/pkg/plugin/controllx/switch_test.go b/pkg/plugin/controllx/switch_test.go index ac97a2e1..96666a88 100644 --- a/pkg/plugin/controllx/switch_test.go +++ b/pkg/plugin/controllx/switch_test.go @@ -79,7 +79,7 @@ func TestSwitchNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/networkx/http.go b/pkg/plugin/networkx/http.go index b9599120..03361a5d 100644 --- a/pkg/plugin/networkx/http.go +++ b/pkg/plugin/networkx/http.go @@ -382,13 +382,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { proc := process.New() defer func() { proc.Stack().Wait() - proc.Exit() + proc.Exit(nil) }() go func() { select { case <-r.Context().Done(): - proc.Exit() + proc.Exit(r.Context().Err()) case <-proc.Done(): } }() @@ -451,7 +451,7 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { var res HTTPPayload if err := primitive.Unmarshal(inPayload, &res); err != nil { - if packet.IsError(inPck) { + if _, ok := packet.AsError(inPck); ok { res = InternalServerError } else { res.Body = inPayload diff --git a/pkg/plugin/networkx/router.go b/pkg/plugin/networkx/router.go index 940c126d..3863fa4b 100644 --- a/pkg/plugin/networkx/router.go +++ b/pkg/plugin/networkx/router.go @@ -151,15 +151,15 @@ func (n *RouterNode) action(proc *process.Process, inPck *packet.Packet) ([]*pac inPayload, ok := inPck.Payload().(*primitive.Map) if !ok { - return nil, packet.NewError(node.ErrInvalidPacket, inPck) + return nil, packet.WithError(node.ErrInvalidPacket, inPck) } method, ok := primitive.Pick[string](inPayload, KeyMethod) if !ok { - return nil, packet.NewError(node.ErrInvalidPacket, inPck) + return nil, packet.WithError(node.ErrInvalidPacket, inPck) } path, ok := primitive.Pick[string](inPayload, KeyPath) if !ok { - return nil, packet.NewError(node.ErrInvalidPacket, inPck) + return nil, packet.WithError(node.ErrInvalidPacket, inPck) } pre, cur, values := n.find(method, path) diff --git a/pkg/plugin/networkx/router_test.go b/pkg/plugin/networkx/router_test.go index 6c4af490..74f5e7b1 100644 --- a/pkg/plugin/networkx/router_test.go +++ b/pkg/plugin/networkx/router_test.go @@ -71,7 +71,7 @@ func TestRouterNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/systemx/reflect.go b/pkg/plugin/systemx/reflect.go index 2bc25582..e600db47 100644 --- a/pkg/plugin/systemx/reflect.go +++ b/pkg/plugin/systemx/reflect.go @@ -92,12 +92,12 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack case OPDelete: filter, err := examplesToFilter(examples) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } specs, err := n.storage.FindMany(ctx, filter) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } var ids []ulid.ULID @@ -106,14 +106,14 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack } if _, err := n.storage.DeleteMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...)); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } if len(specs) == 0 { return nil, inPck } if outPayload, err := specsToExamples(specs, batch); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else { return packet.New(outPayload), nil } @@ -122,40 +122,40 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack ids, err := n.storage.InsertMany(ctx, specs) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } specs, err = n.storage.FindMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...), &database.FindOptions{ Limit: lo.ToPtr(len(ids)), }) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } if len(specs) == 0 { return nil, inPck } if outPayload, err := specsToExamples(specs, batch); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else { return packet.New(outPayload), nil } case OPSelect: filter, err := examplesToFilter(examples) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } specs, err := n.storage.FindMany(ctx, filter) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } if len(specs) == 0 { return nil, inPck } if outPayload, err := specsToExamples(specs, batch); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else { return packet.New(outPayload), nil } @@ -177,14 +177,14 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack Limit: lo.ToPtr(len(ids)), }) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } var merges []scheme.Spec for _, spec := range specs { unstructured := scheme.NewUnstructured(nil) if err := unstructured.Marshal(spec); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } patch := patches[spec.GetID()] @@ -198,21 +198,21 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack } if _, err := n.storage.UpdateMany(ctx, merges); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } specs, err = n.storage.FindMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...), &database.FindOptions{ Limit: lo.ToPtr(len(ids)), }) if err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } if len(specs) == 0 { return nil, inPck } if outPayload, err := specsToExamples(specs, batch); err != nil { - return nil, packet.NewError(err, inPck) + return nil, packet.WithError(err, inPck) } else { return packet.New(outPayload), nil } diff --git a/pkg/plugin/systemx/reflect_test.go b/pkg/plugin/systemx/reflect_test.go index 71b3eeb0..68c2f345 100644 --- a/pkg/plugin/systemx/reflect_test.go +++ b/pkg/plugin/systemx/reflect_test.go @@ -64,7 +64,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -108,7 +108,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -146,7 +146,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) @@ -190,7 +190,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Exit() + defer proc.Exit(nil) ioStream := io.Open(proc) diff --git a/pkg/port/port_test.go b/pkg/port/port_test.go index ea4aba6c..2b92d5ce 100644 --- a/pkg/port/port_test.go +++ b/pkg/port/port_test.go @@ -94,7 +94,7 @@ func TestPort_Open(t *testing.T) { proc := process.New() stream := port.Open(proc) - proc.Exit() + proc.Exit(nil) select { case <-stream.Done(): @@ -105,7 +105,7 @@ func TestPort_Open(t *testing.T) { t.Run("process closed", func(t *testing.T) { proc := process.New() - proc.Exit() + proc.Exit(nil) stream := port.Open(proc) diff --git a/pkg/process/process.go b/pkg/process/process.go index 7aadb151..cfdcfe1c 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -7,10 +7,10 @@ import ( ) // Process is a processing unit that isolates data processing from others. - type Process struct { id ulid.ULID stack *Stack + err error done chan struct{} mu sync.RWMutex } @@ -41,13 +41,21 @@ func (p *Process) Stack() *Stack { return p.stack } +// Err returns the last error encountered by the process. +func (p *Process) Err() error { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.err +} + // Done returns a channel that is closed when the process is closed. func (p *Process) Done() <-chan struct{} { return p.done } -// Exit closes the process. -func (p *Process) Exit() { +// Exit closes the process with an optional error. +func (p *Process) Exit(err error) { p.mu.Lock() defer p.mu.Unlock() @@ -57,7 +65,7 @@ func (p *Process) Exit() { default: } + p.err = err close(p.done) - p.stack.Close() } diff --git a/pkg/process/process_test.go b/pkg/process/process_test.go index 7107fe58..9155b03d 100644 --- a/pkg/process/process_test.go +++ b/pkg/process/process_test.go @@ -9,21 +9,21 @@ import ( func TestNew(t *testing.T) { proc := New() - defer proc.Exit() + defer proc.Exit(nil) assert.NotNil(t, proc) } func TestProcess_ID(t *testing.T) { proc := New() - defer proc.Exit() + defer proc.Exit(nil) assert.NotEqual(t, ulid.ULID{}, proc.ID()) } func TestProcess_Stack(t *testing.T) { proc := New() - defer proc.Exit() + defer proc.Exit(nil) assert.NotNil(t, proc.Stack()) } @@ -37,7 +37,7 @@ func TestProcess_Close(t *testing.T) { default: } - proc.Exit() + proc.Exit(nil) select { case <-proc.Done():