Skip to content

Commit

Permalink
refactor: remove benbjohnson/immutable dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 26, 2024
1 parent bbdba1d commit 80699fc
Show file tree
Hide file tree
Showing 23 changed files with 589 additions and 305 deletions.
8 changes: 4 additions & 4 deletions ext/pkg/io/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func (n *PrintNode) action(_ *process.Process, inPck *packet.Packet) (*packet.Pa
defer n.mu.RUnlock()

var args []any
format, ok := types.Pick[string](inPck.Payload())
format, ok := types.Get[string](inPck.Payload())
if !ok {
payload, ok := inPck.Payload().(types.Slice)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}
format, ok = types.Pick[string](payload, 0)
format, ok = types.Get[string](payload, 0)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}
Expand All @@ -115,12 +115,12 @@ func (n *DynPrintNode) action(_ *process.Process, inPck *packet.Packet) (*packet
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}

filename, ok := types.Pick[string](payload, 0)
filename, ok := types.Get[string](payload, 0)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}

format, ok := types.Pick[string](payload, 1)
format, ok := types.Get[string](payload, 1)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/io/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (n *ScanNode) action(_ *process.Process, inPck *packet.Packet) (*packet.Pac
n.mu.RLock()
defer n.mu.RUnlock()

format, ok := types.Pick[string](inPck.Payload())
format, ok := types.Get[string](inPck.Payload())
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}
Expand Down Expand Up @@ -118,12 +118,12 @@ func (n *DynScanNode) action(_ *process.Process, inPck *packet.Packet) (*packet.
n.mu.RLock()
defer n.mu.RUnlock()

filename, ok := types.Pick[string](inPck.Payload(), 0)
filename, ok := types.Get[string](inPck.Payload(), 0)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}

format, ok := types.Pick[string](inPck.Payload(), 1)
format, ok := types.Get[string](inPck.Payload(), 1)
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
}
Expand Down
10 changes: 5 additions & 5 deletions ext/pkg/io/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (n *SQLNode) action(proc *process.Process, inPck *packet.Packet) (*packet.P

ctx := proc.Context()

query, ok := types.Pick[string](inPck.Payload())
query, ok := types.Get[string](inPck.Payload())
if !ok {
query, ok = types.Pick[string](inPck.Payload(), 0)
query, ok = types.Get[string](inPck.Payload(), 0)
}
if !ok {
return nil, packet.New(types.NewError(encoding.ErrUnsupportedType))
Expand Down Expand Up @@ -129,16 +129,16 @@ func (n *SQLNode) action(proc *process.Process, inPck *packet.Packet) (*packet.P

var rows *sqlx.Rows
if len(stmt.Params) == 0 {
args, _ := types.Pick[[]any](inPck.Payload(), 1)
args, _ := types.Get[[]any](inPck.Payload(), 1)
if rows, err = tx.QueryxContext(ctx, query, args...); err != nil {
return nil, packet.New(types.NewError(err))
}
} else {
var arg any
var ok bool
arg, ok = types.Pick[map[string]any](inPck.Payload(), 1)
arg, ok = types.Get[map[string]any](inPck.Payload(), 1)
if !ok {
arg, _ = types.Pick[[]map[string]any](inPck.Payload(), 1)
arg, _ = types.Get[[]map[string]any](inPck.Payload(), 1)
}

query, args, err := tx.BindNamed(query, arg)
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/io/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestSQLNode_SendAndReceive(t *testing.T) {

select {
case outPck := <-inWriter.Receive():
assert.Equal(t, types.NewSlice(), outPck.Payload())
assert.Equal(t, nil, outPck.Payload().Interface())
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestSQLNode_SendAndReceive(t *testing.T) {

select {
case outPck := <-inWriter.Receive():
assert.Equal(t, types.NewSlice(), outPck.Payload())
assert.Equal(t, nil, outPck.Payload().Interface())
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
Expand Down
10 changes: 5 additions & 5 deletions ext/pkg/mime/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er

writeField := func(obj types.Map, key types.Value) error {
if key, ok := key.(types.String); ok {
value := obj.GetOr(key, nil)
value := obj.Get(key)

var elements types.Slice
if v, ok := value.(types.Slice); ok {
Expand Down Expand Up @@ -135,7 +135,7 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er
if value, ok := value.(types.Map); ok {
for key := range value.Range() {
if key, ok := key.(types.String); ok {
value := value.GetOr(key, nil)
value := value.Get(key)

var elements types.Slice
if v, ok := value.(types.Slice); ok {
Expand All @@ -145,16 +145,16 @@ func Encode(writer io.Writer, value types.Value, header textproto.MIMEHeader) er
}

for _, element := range elements.Values() {
data, ok := types.Pick[types.Value](element, "data")
data, ok := types.Get[types.Value](element, "data")
if !ok {
data = element
}
filename, ok := types.Pick[string](element, "filename")
filename, ok := types.Get[string](element, "filename")
if !ok {
filename = key.String()
}

header, _ := types.Pick[types.Value](element, "header")
header, _ := types.Get[types.Value](element, "header")

h := textproto.MIMEHeader{}
_ = types.Unmarshal(header, &h)
Expand Down
18 changes: 9 additions & 9 deletions ext/pkg/mime/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func TestEncode(t *testing.T) {
),
whenType: MultipartFormData + "; boundary=MyBoundary",
expect: []byte("--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary--\r\n"),
Expand All @@ -83,12 +83,12 @@ func TestEncode(t *testing.T) {
),
whenType: MultipartFormData + "; boundary=MyBoundary",
expect: []byte("--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary--\r\n"),
Expand All @@ -112,12 +112,12 @@ func TestEncode(t *testing.T) {
),
whenType: MultipartFormData + "; boundary=MyBoundary",
expect: []byte("--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary\r\n" +
"Content-Disposition: form-data; name=\"test\"\r\n" +
"Content-Disposition: form-data; name=\"test\"; filename=\"test\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"test\r\n" +
"--MyBoundary--\r\n"),
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type ListenNodeSpec struct {
spec.Meta `map:",inline"`
Protocol string `map:"protocol" validate:"required"`
Host string `map:"host,omitempty" validate:"hostname|ip"`
Host string `map:"host,omitempty"`
Port int `map:"port" validate:"required"`
Cert string `map:"cert,omitempty"`
Key string `map:"key,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/network/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (n *RouteNode) action(_ *process.Process, inPck *packet.Packet) ([]*packet.
return nil, nil
}

method, _ := types.Pick[string](inPayload, "method")
path, _ := types.Pick[string](inPayload, "path")
method, _ := types.Get[string](inPayload, "method")
path, _ := types.Get[string](inPayload, "path")

route, paramValues := n.find(method, path)
if route == nil {
Expand Down
2 changes: 1 addition & 1 deletion ext/pkg/network/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestRouteNode_SendAndReceive(t *testing.T) {

select {
case outPck := <-outReader.Read():
params, _ := types.Pick[map[string]string](outPck.Payload(), "params")
params, _ := types.Get[map[string]string](outPck.Payload(), "params")
assert.Equal(t, tt.expectParams, params)
outReader.Receive(outPck)
case <-ctx.Done():
Expand Down
5 changes: 0 additions & 5 deletions ext/pkg/system/error.go

This file was deleted.

4 changes: 3 additions & 1 deletion ext/pkg/system/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type SignalNode struct {

const KindSignal = "signal"

var ErrInvalidTopic = errors.New("topic is invalid")

// NewSignalNodeCodec creates a codec for compiling SignalNodeSpec into SignalNode instances.
func NewSignalNodeCodec(signals map[string]func(context.Context) (<-chan any, error)) scheme.Codec {
if signals == nil {
Expand All @@ -41,7 +43,7 @@ func NewSignalNodeCodec(signals map[string]func(context.Context) (<-chan any, er
return scheme.CodecWithType[*SignalNodeSpec](func(spec *SignalNodeSpec) (node.Node, error) {
fn, ok := signals[spec.Topic]
if !ok {
return nil, errors.WithStack(ErrInvalidOperation)
return nil, errors.WithStack(ErrInvalidTopic)
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 2 additions & 0 deletions ext/pkg/system/syscall.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type SyscallNode struct {

const KindSyscall = "syscall"

var ErrInvalidOperation = errors.New("operation is invalid")

// NewSyscallNodeCodec returns a codec for SyscallNodeSpec.
func NewSyscallNodeCodec(functions map[string]func(ctx context.Context, arguments []any) ([]any, error)) scheme.Codec {
if functions == nil {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/siyul-park/uniflow
go 1.23.4

require (
github.com/benbjohnson/immutable v0.4.3
github.com/go-faker/faker/v4 v4.5.0
github.com/go-playground/validator/v10 v10.23.0
github.com/gofrs/uuid v4.4.0+incompatible
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/benbjohnson/immutable v0.4.3 h1:GYHcksoJ9K6HyAUpGxwZURrbTkXA0Dh4otXGqbhdrjA=
github.com/benbjohnson/immutable v0.4.3/go.mod h1:qJIKKSmdqz1tVzNtst1DZzvaqOU1onk1rc03IeM3Owk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
3 changes: 2 additions & 1 deletion pkg/port/listener.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package port

import (
"github.com/siyul-park/uniflow/pkg/process"
"sync"

"github.com/siyul-park/uniflow/pkg/process"
)

// Listener is an interface for handling process events.
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheme/scheme.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package scheme

import (
"github.com/go-playground/validator/v10"
"github.com/gofrs/uuid"
"reflect"
"slices"
"sync"

"github.com/go-playground/validator/v10"
"github.com/gofrs/uuid"

"github.com/pkg/errors"
"github.com/siyul-park/uniflow/pkg/encoding"
"github.com/siyul-park/uniflow/pkg/node"
Expand Down
18 changes: 15 additions & 3 deletions pkg/types/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"hash/fnv"
"reflect"
"sync"
"unsafe"

"github.com/pkg/errors"
Expand All @@ -17,6 +18,8 @@ type Binary = *binary_

type binary_ struct {
value []byte
hash uint64
mu sync.Mutex
}

var _ Value = (Binary)(nil)
Expand Down Expand Up @@ -53,9 +56,15 @@ func (b Binary) Kind() Kind {

// Hash returns the precomputed hash code.
func (b Binary) Hash() uint64 {
h := fnv.New64a()
h.Write(b.value)
return h.Sum64()
b.mu.Lock()
defer b.mu.Unlock()

if b.hash == 0 {
h := fnv.New64a()
h.Write(b.value)
b.hash = h.Sum64()
}
return b.hash
}

// Interface converts Binary to a byte slice.
Expand All @@ -66,6 +75,9 @@ func (b Binary) Interface() any {
// Equal checks whether another Object is equal to this Binary instance.
func (b Binary) Equal(other Value) bool {
if o, ok := other.(Binary); ok {
if b.Hash() != o.Hash() {
return false
}
return bytes.Equal(b.value, o.value)
}
return false
Expand Down
8 changes: 4 additions & 4 deletions pkg/types/pick.go → pkg/types/getter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package types

// Pick extracts a value from a nested structure using the provided paths.
func Pick[T any](obj Value, paths ...any) (T, bool) {
// Get extracts a value from a nested structure using the provided paths.
func Get[T any](obj Value, paths ...any) (T, bool) {
var val T
cur := obj
for _, path := range paths {
Expand All @@ -13,8 +13,8 @@ func Pick[T any](obj Value, paths ...any) (T, bool) {
switch p := p.(type) {
case String:
if v, ok := cur.(Map); ok {
child, ok := v.Get(p)
if !ok {
child := v.Get(p)
if child == nil {
return val, false
}
cur = child
Expand Down
Loading

0 comments on commit 80699fc

Please sign in to comment.