Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: plugin package for improved readability and documentation #27

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/plugin/controllx/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/siyul-park/uniflow/pkg/scheme"
)

// AddToScheme returns a function that adds types and codecs to the provided scheme.
func AddToScheme() func(*scheme.Scheme) error {
return func(s *scheme.Scheme) error {
s.AddKnownType(KindSnippet, &SnippetSpec{})
Expand Down
50 changes: 27 additions & 23 deletions pkg/plugin/controllx/snippet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,47 @@ import (
"github.com/xiatechs/jsonata-go"
)

type (
SnippetNodeConfig struct {
ID ulid.ULID
Lang string
Code string
}

SnippetNode struct {
*node.OneToOneNode
run func(any) (any, error)
}
// SnippetNodeConfig holds the configuration for creating a SnippetNode.
type SnippetNodeConfig struct {
ID ulid.ULID
Lang string
Code string
}

SnippetSpec struct {
scheme.SpecMeta `map:",inline"`
Lang string `map:"lang"`
Code string `map:"code"`
}
// SnippetNode represents a node that runs a snippet of code.
type SnippetNode struct {
*node.OneToOneNode
run func(any) (any, error)
}

fieldNameMapper struct{}
)
// SnippetSpec represents the specification for the SnippetNode.
type SnippetSpec struct {
scheme.SpecMeta `map:",inline"`
Lang string `map:"lang"`
Code string `map:"code"`
}

const (
KindSnippet = "snippet"
)
// KindSnippet is the kind identifier for SnippetNode.
const KindSnippet = "snippet"

// Supported programming languages for snippets.
const (
LangTypescript = "typescript"
LangJavascript = "javascript"
LangJSON = "json"
LangJSONata = "jsonata"
)

var _ node.Node = &SnippetNode{}

// Errors related to snippet execution.
var (
ErrEntryPointNotUndeclared = errors.New("entry point is undeclared")
ErrNotSupportedLanguage = errors.New("language is not supported")
)

var _ node.Node = (*SnippetNode)(nil)
var _ scheme.Spec = (*SnippetSpec)(nil)

// NewSnippetNode creates a new SnippetNode with the given configuration.
func NewSnippetNode(config SnippetNodeConfig) (*SnippetNode, error) {
defer func() { _ = recover() }()

Expand Down Expand Up @@ -178,6 +180,8 @@ func compile(lang, code string) (func(any) (any, error), error) {
}
}

type fieldNameMapper struct{}

func (*fieldNameMapper) FieldName(t reflect.Type, f reflect.StructField) string {
return strcase.ToLowerCamel(f.Name)
}
Expand Down
57 changes: 31 additions & 26 deletions pkg/plugin/controllx/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,42 @@ import (
"github.com/xiatechs/jsonata-go"
)

type (
SwitchNodeConfig struct {
ID ulid.ULID
}
// SwitchNodeConfig holds the configuration for creating a SwitchNode.
type SwitchNodeConfig struct {
ID ulid.ULID
}

SwitchNode struct {
*node.OneToManyNode
conditions []condition
mu sync.RWMutex
}
// SwitchNode represents a node that switches packets based on conditions.
type SwitchNode struct {
*node.OneToManyNode
conditions []condition
mu sync.RWMutex
}

SwitchSpec struct {
scheme.SpecMeta `map:",inline"`
Match []Condition `map:"match"`
}
// SwitchSpec represents the specification for the SwitchNode.
type SwitchSpec struct {
scheme.SpecMeta `map:",inline"`
Match []Condition `map:"match"`
}

Condition struct {
When string `map:"when"`
Port string `map:"port"`
}
// Condition represents a condition for the SwitchNode.
type Condition struct {
When string `map:"when"`
Port string `map:"port"`
}

condition struct {
when *jsonata.Expr
port string
}
)
type condition struct {
when *jsonata.Expr
port string
}

const (
KindSwitch = "switch"
)
// KindSwitch is the kind identifier for SwitchNode.
const KindSwitch = "switch"

var _ node.Node = &SwitchNode{}
var _ node.Node = (*SwitchNode)(nil)
var _ scheme.Spec = (*SwitchSpec)(nil)

// NewSwitchNode creates a new SwitchNode with the given configuration.
func NewSwitchNode(config SwitchNodeConfig) *SwitchNode {
id := config.ID

Expand All @@ -58,6 +61,7 @@ func NewSwitchNode(config SwitchNodeConfig) *SwitchNode {
return n
}

// Add adds a new condition to the SwitchNode.
func (n *SwitchNode) Add(when string, port string) error {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -71,6 +75,7 @@ func (n *SwitchNode) Add(when string, port string) error {
return nil
}

// Close closes the SwitchNode and clears its conditions.
func (n *SwitchNode) Close() error {
n.mu.Lock()
defer n.mu.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions pkg/plugin/networkx/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/siyul-park/uniflow/pkg/symbol"
)

// AddToHooks returns a function that adds hooks to the given hook.Hook.
func AddToHooks() func(*hook.Hook) error {
return func(h *hook.Hook) error {
h.AddLoadHook(symbol.LoadHookFunc(func(n node.Node) error {
Expand All @@ -26,6 +27,7 @@ func AddToHooks() func(*hook.Hook) error {
}
return nil
}))

h.AddUnloadHook(symbol.UnloadHookFunc(func(n node.Node) error {
if n, ok := n.(*HTTPNode); ok {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand All @@ -35,10 +37,12 @@ func AddToHooks() func(*hook.Hook) error {
}
return nil
}))

return nil
}
}

// AddToScheme returns a function that adds types and codecs to the provided scheme.
func AddToScheme() func(*scheme.Scheme) error {
return func(s *scheme.Scheme) error {
s.AddKnownType(KindHTTP, &HTTPSpec{})
Expand All @@ -54,9 +58,11 @@ func AddToScheme() func(*scheme.Scheme) error {
n := NewRouterNode(RouterNodeConfig{
ID: spec.ID,
})

for _, r := range spec.Routes {
n.Add(r.Method, r.Path, r.Port)
}

return n, nil
}))

Expand Down
102 changes: 47 additions & 55 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,53 +23,46 @@ import (
"github.com/siyul-park/uniflow/pkg/scheme"
)

type (
// HTTPNodeConfig represents the configuration of an HTTP node.
HTTPNodeConfig struct {
ID ulid.ULID
Address string
}

// HTTPNode represents a node based on the HTTP protocol.
HTTPNode struct {
id ulid.ULID
address string
server *http.Server
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}

// HTTPSpec represents the specification of an HTTP node.
HTTPSpec struct {
scheme.SpecMeta `map:",inline"`
Address string `map:"address"`
}

// HTTPPayload represents the payload for HTTP requests and responses.
HTTPPayload struct {
Proto string `map:"proto,omitempty"`
Path string `map:"path,omitempty"`
Method string `map:"method,omitempty"`
Header http.Header `map:"header,omitempty"`
Query url.Values `map:"query,omitempty"`
Cookies []*http.Cookie `map:"cookies,omitempty"`
Body primitive.Value `map:"body,omitempty"`
Status int `map:"status"`
}

tcpKeepAliveListener struct {
*net.TCPListener
}
)
// HTTPNodeConfig represents the configuration of an HTTP node.
type HTTPNodeConfig struct {
ID ulid.ULID
Address string
}

const (
KindHTTP = "http"
)
// HTTPNode represents a node based on the HTTP protocol.
type HTTPNode struct {
id ulid.ULID
address string
server *http.Server
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}

// HTTPSpec represents the specification of an HTTP node.
type HTTPSpec struct {
scheme.SpecMeta `map:",inline"`
Address string `map:"address"`
}

// HTTPPayload represents the payload for HTTP requests and responses.
type HTTPPayload struct {
Proto string `map:"proto,omitempty"`
Path string `map:"path,omitempty"`
Method string `map:"method,omitempty"`
Header http.Header `map:"header,omitempty"`
Query url.Values `map:"query,omitempty"`
Cookies []*http.Cookie `map:"cookies,omitempty"`
Body primitive.Value `map:"body,omitempty"`
Status int `map:"status"`
}

// KindHTTP is the kind identifier for HTTPNode.
const KindHTTP = "http"

// Commonly used HTTP header constants.
const (
Expand Down Expand Up @@ -205,16 +198,11 @@ var (
NetworkAuthenticationRequired = NewHTTPPayload(http.StatusNetworkAuthenticationRequired) // HTTP 511 Network Authentication Required
)

var (
ErrInvalidListenerNetwork = errors.New("invalid listener network")
)

var _ node.Node = &HTTPNode{}
var _ node.Node = (*HTTPNode)(nil)
var _ http.Handler = &HTTPNode{}
var _ scheme.Spec = (*HTTPSpec)(nil)

var (
forbiddenResponseHeaderRegexps []*regexp.Regexp
)
var forbiddenResponseHeaderRegexps []*regexp.Regexp

func init() {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
Expand Down Expand Up @@ -604,9 +592,13 @@ func isForbiddenResponseHeader(header string) bool {
return forbidden
}

type tcpKeepAliveListener struct {
*net.TCPListener
}

func newListener(address, network string) (*tcpKeepAliveListener, error) {
if network != "tcp" && network != "tcp4" && network != "tcp6" {
return nil, ErrInvalidListenerNetwork
return nil, errors.New("invalid listener network")
}
l, err := net.Listen(network, address)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/plugin/networkx/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/siyul-park/uniflow/pkg/primitive"
)

// MIME content types
const (
ApplicationJSON = "application/json"
ApplicationJSONCharsetUTF8 = ApplicationJSON + "; " + charsetUTF8
Expand All @@ -37,10 +38,9 @@ const (
OctetStream = "application/octet-stream"
)

const (
charsetUTF8 = "charset=utf-8"
)
const charsetUTF8 = "charset=utf-8"

// MarshalMIME marshals a primitive.Value to MIME format.
func MarshalMIME(value primitive.Value, typ *string) ([]byte, error) {
if typ == nil {
content := ""
Expand Down Expand Up @@ -202,6 +202,7 @@ func MarshalMIME(value primitive.Value, typ *string) ([]byte, error) {
return nil, errors.WithStack(encoding.ErrUnsupportedValue)
}

// UnmarshalMIME unmarshals MIME data to a primitive.Value.
func UnmarshalMIME(data []byte, typ *string) (primitive.Value, error) {
if len(data) == 0 {
return nil, nil
Expand Down
Loading
Loading