Skip to content

Commit

Permalink
feat: allow process exit with error
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 2, 2023
1 parent f890c83 commit 3468419
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 68 deletions.
4 changes: 2 additions & 2 deletions pkg/node/onetomany_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestOneToManyNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestOneToManyNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
errStream := err.Open(proc)
Expand Down
8 changes: 4 additions & 4 deletions pkg/node/onetoone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestOneToOneNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestOneToOneNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)
errStream := err.Open(proc)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestOneToOneNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestOneToOneNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
errStream := err.Open(proc)
Expand Down
26 changes: 18 additions & 8 deletions pkg/packet/error.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package packet

import "github.com/siyul-park/uniflow/pkg/primitive"
import (
"errors"

// NewError creates a new Packet to represent an error.
// It takes an error and an optional cause Packet and constructs a Packet with error details.
func NewError(err error, cause *Packet) *Packet {
"github.com/siyul-park/uniflow/pkg/primitive"
)

// WithError creates a new Packet representing an error with the given error and optional cause.
// It constructs a Packet with error details, including the error message.
// If a cause is provided, it is attached to the error packet.
func WithError(err error, cause *Packet) *Packet {
pairs := []primitive.Value{
primitive.NewString("error"),
primitive.NewString(err.Error()),
Expand All @@ -17,9 +22,14 @@ func NewError(err error, cause *Packet) *Packet {
return New(primitive.NewMap(pairs...))
}

// IsError checks if the given Packet represents an error.
func IsError(pck *Packet) bool {
// AsError extracts the error from a Packet, returning it along with a boolean indicating whether
// the Packet contains error information. If the Packet does not represent an error, the
// returned error is nil, and the boolean is false.
func AsError(pck *Packet) (error, bool) {
payload := pck.Payload()
_, ok := primitive.Pick[string](payload, "error")
return ok
err, ok := primitive.Pick[string](payload, "error")
if !ok {
return nil, false
}
return errors.New(err), true
}
13 changes: 7 additions & 6 deletions pkg/packet/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/stretchr/testify/assert"
)

func TestNewError(t *testing.T) {
func TestWithError(t *testing.T) {
err := errors.New(faker.Sentence())

pck1 := New(primitive.NewString(faker.Word()))
pck2 := NewError(err, pck1)
pck2 := WithError(err, pck1)

assert.NotNil(t, pck2)
assert.NotZero(t, pck2.ID())
Expand All @@ -24,12 +24,13 @@ func TestNewError(t *testing.T) {
assert.Equal(t, pck1.Payload(), payload.GetOr(primitive.NewString("cause"), nil))
}

func TestIsError(t *testing.T) {
func TestAsError(t *testing.T) {
err := errors.New(faker.Sentence())

pck1 := New(primitive.NewString(faker.Word()))
pck2 := NewError(err, pck1)
pck2 := WithError(err, pck1)

assert.True(t, IsError(pck2))
assert.False(t, IsError(pck1))
err1, ok := AsError(pck2)
assert.True(t, ok)
assert.Equal(t, err.Error(), err1.Error())
}
4 changes: 2 additions & 2 deletions pkg/plugin/controllx/snippet.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func (n *SnippetNode) action(proc *process.Process, inPck *packet.Packet) (*pack
}

if output, err := n.run(input); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else if outPayload, err := primitive.MarshalText(output); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else {
return packet.New(outPayload), nil
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/plugin/controllx/snippet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function main(inPayload: any): any {
ioPort.Link(io)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -80,7 +80,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -114,7 +114,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -146,7 +146,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -185,7 +185,7 @@ function main(inPayload: any): any {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -222,7 +222,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -255,7 +255,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down Expand Up @@ -288,7 +288,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

ioStream := io.Open(proc)

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/controllx/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ func (n *SwitchNode) action(proc *process.Process, inPck *packet.Packet) ([]*pac
}
}

return nil, packet.NewError(node.ErrDiscardPacket, inPck)
return nil, packet.WithError(node.ErrDiscardPacket, inPck)
}
2 changes: 1 addition & 1 deletion pkg/plugin/controllx/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSwitchNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proc := process.New()
defer func() {
proc.Stack().Wait()
proc.Exit()
proc.Exit(nil)
}()

go func() {
select {
case <-r.Context().Done():
proc.Exit()
proc.Exit(r.Context().Err())
case <-proc.Done():
}
}()
Expand Down Expand Up @@ -451,7 +451,7 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {

var res HTTPPayload
if err := primitive.Unmarshal(inPayload, &res); err != nil {
if packet.IsError(inPck) {
if _, ok := packet.AsError(inPck); ok {
res = InternalServerError
} else {
res.Body = inPayload
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/networkx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ func (n *RouterNode) action(proc *process.Process, inPck *packet.Packet) ([]*pac

inPayload, ok := inPck.Payload().(*primitive.Map)
if !ok {
return nil, packet.NewError(node.ErrInvalidPacket, inPck)
return nil, packet.WithError(node.ErrInvalidPacket, inPck)
}
method, ok := primitive.Pick[string](inPayload, KeyMethod)
if !ok {
return nil, packet.NewError(node.ErrInvalidPacket, inPck)
return nil, packet.WithError(node.ErrInvalidPacket, inPck)
}
path, ok := primitive.Pick[string](inPayload, KeyPath)
if !ok {
return nil, packet.NewError(node.ErrInvalidPacket, inPck)
return nil, packet.WithError(node.ErrInvalidPacket, inPck)
}

pre, cur, values := n.find(method, path)
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/networkx/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRouterNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Exit()
defer proc.Exit(nil)

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down
30 changes: 15 additions & 15 deletions pkg/plugin/systemx/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack
case OPDelete:
filter, err := examplesToFilter(examples)
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

specs, err := n.storage.FindMany(ctx, filter)
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

var ids []ulid.ULID
Expand All @@ -106,14 +106,14 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack
}

if _, err := n.storage.DeleteMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...)); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

if len(specs) == 0 {
return nil, inPck
}
if outPayload, err := specsToExamples(specs, batch); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else {
return packet.New(outPayload), nil
}
Expand All @@ -122,40 +122,40 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack

ids, err := n.storage.InsertMany(ctx, specs)
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

specs, err = n.storage.FindMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...), &database.FindOptions{
Limit: lo.ToPtr(len(ids)),
})
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

if len(specs) == 0 {
return nil, inPck
}
if outPayload, err := specsToExamples(specs, batch); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else {
return packet.New(outPayload), nil
}
case OPSelect:
filter, err := examplesToFilter(examples)
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

specs, err := n.storage.FindMany(ctx, filter)
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

if len(specs) == 0 {
return nil, inPck
}
if outPayload, err := specsToExamples(specs, batch); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else {
return packet.New(outPayload), nil
}
Expand All @@ -177,14 +177,14 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack
Limit: lo.ToPtr(len(ids)),
})
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

var merges []scheme.Spec
for _, spec := range specs {
unstructured := scheme.NewUnstructured(nil)
if err := unstructured.Marshal(spec); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

patch := patches[spec.GetID()]
Expand All @@ -198,21 +198,21 @@ func (n *ReflectNode) action(proc *process.Process, inPck *packet.Packet) (*pack
}

if _, err := n.storage.UpdateMany(ctx, merges); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

specs, err = n.storage.FindMany(ctx, storage.Where[ulid.ULID](scheme.KeyID).IN(ids...), &database.FindOptions{
Limit: lo.ToPtr(len(ids)),
})
if err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
}

if len(specs) == 0 {
return nil, inPck
}
if outPayload, err := specsToExamples(specs, batch); err != nil {
return nil, packet.NewError(err, inPck)
return nil, packet.WithError(err, inPck)
} else {
return packet.New(outPayload), nil
}
Expand Down
Loading

0 comments on commit 3468419

Please sign in to comment.