Skip to content

Commit

Permalink
refactor: network
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 1, 2023
1 parent 7ef76ef commit 1a25aa4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 98 deletions.
6 changes: 6 additions & 0 deletions pkg/plugin/networkx/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/siyul-park/uniflow/pkg/symbol"
)

// AddToHooks returns a function that adds hooks for HTTPNode to the given hook.Hook.
func AddToHooks() func(*hook.Hook) error {
return func(h *hook.Hook) error {
h.AddLoadHook(symbol.LoadHookFunc(func(n node.Node) error {
Expand All @@ -26,6 +27,7 @@ func AddToHooks() func(*hook.Hook) error {
}
return nil
}))

h.AddUnloadHook(symbol.UnloadHookFunc(func(n node.Node) error {
if n, ok := n.(*HTTPNode); ok {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand All @@ -35,10 +37,12 @@ func AddToHooks() func(*hook.Hook) error {
}
return nil
}))

return nil
}
}

// AddToScheme returns a function that adds schemes for HTTPNode and RouterNode to the given scheme.Scheme.
func AddToScheme() func(*scheme.Scheme) error {
return func(s *scheme.Scheme) error {
s.AddKnownType(KindHTTP, &HTTPSpec{})
Expand All @@ -54,9 +58,11 @@ func AddToScheme() func(*scheme.Scheme) error {
n := NewRouterNode(RouterNodeConfig{
ID: spec.ID,
})

for _, r := range spec.Routes {
n.Add(r.Method, r.Path, r.Port)
}

return n, nil
}))

Expand Down
102 changes: 47 additions & 55 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,53 +23,46 @@ import (
"github.com/siyul-park/uniflow/pkg/scheme"
)

type (
// HTTPNodeConfig represents the configuration of an HTTP node.
HTTPNodeConfig struct {
ID ulid.ULID
Address string
}

// HTTPNode represents a node based on the HTTP protocol.
HTTPNode struct {
id ulid.ULID
address string
server *http.Server
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}

// HTTPSpec represents the specification of an HTTP node.
HTTPSpec struct {
scheme.SpecMeta `map:",inline"`
Address string `map:"address"`
}

// HTTPPayload represents the payload for HTTP requests and responses.
HTTPPayload struct {
Proto string `map:"proto,omitempty"`
Path string `map:"path,omitempty"`
Method string `map:"method,omitempty"`
Header http.Header `map:"header,omitempty"`
Query url.Values `map:"query,omitempty"`
Cookies []*http.Cookie `map:"cookies,omitempty"`
Body primitive.Value `map:"body,omitempty"`
Status int `map:"status"`
}

tcpKeepAliveListener struct {
*net.TCPListener
}
)
// HTTPNodeConfig represents the configuration of an HTTP node.
type HTTPNodeConfig struct {
ID ulid.ULID
Address string
}

const (
KindHTTP = "http"
)
// HTTPNode represents a node based on the HTTP protocol.
type HTTPNode struct {
id ulid.ULID
address string
server *http.Server
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}

// HTTPSpec represents the specification of an HTTP node.
type HTTPSpec struct {
scheme.SpecMeta `map:",inline"`
Address string `map:"address"`
}

// HTTPPayload represents the payload for HTTP requests and responses.
type HTTPPayload struct {
Proto string `map:"proto,omitempty"`
Path string `map:"path,omitempty"`
Method string `map:"method,omitempty"`
Header http.Header `map:"header,omitempty"`
Query url.Values `map:"query,omitempty"`
Cookies []*http.Cookie `map:"cookies,omitempty"`
Body primitive.Value `map:"body,omitempty"`
Status int `map:"status"`
}

// KindHTTP is the kind identifier for HTTPNode.
const KindHTTP = "http"

// Commonly used HTTP header constants.
const (
Expand Down Expand Up @@ -205,16 +198,11 @@ var (
NetworkAuthenticationRequired = NewHTTPPayload(http.StatusNetworkAuthenticationRequired) // HTTP 511 Network Authentication Required
)

var (
ErrInvalidListenerNetwork = errors.New("invalid listener network")
)

var _ node.Node = &HTTPNode{}
var _ node.Node = (*HTTPNode)(nil)
var _ http.Handler = &HTTPNode{}
var _ scheme.Spec = (*HTTPSpec)(nil)

var (
forbiddenResponseHeaderRegexps []*regexp.Regexp
)
var forbiddenResponseHeaderRegexps []*regexp.Regexp

func init() {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
Expand Down Expand Up @@ -604,9 +592,13 @@ func isForbiddenResponseHeader(header string) bool {
return forbidden
}

type tcpKeepAliveListener struct {
*net.TCPListener
}

func newListener(address, network string) (*tcpKeepAliveListener, error) {
if network != "tcp" && network != "tcp4" && network != "tcp6" {
return nil, ErrInvalidListenerNetwork
return nil, errors.New("invalid listener network")
}
l, err := net.Listen(network, address)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/plugin/networkx/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/siyul-park/uniflow/pkg/primitive"
)

// MIME content types
const (
ApplicationJSON = "application/json"
ApplicationJSONCharsetUTF8 = ApplicationJSON + "; " + charsetUTF8
Expand All @@ -37,10 +38,9 @@ const (
OctetStream = "application/octet-stream"
)

const (
charsetUTF8 = "charset=utf-8"
)
const charsetUTF8 = "charset=utf-8"

// MarshalMIME marshals a primitive.Value to MIME format.
func MarshalMIME(value primitive.Value, typ *string) ([]byte, error) {
if typ == nil {
content := ""
Expand Down Expand Up @@ -202,6 +202,7 @@ func MarshalMIME(value primitive.Value, typ *string) ([]byte, error) {
return nil, errors.WithStack(encoding.ErrUnsupportedValue)
}

// UnmarshalMIME unmarshals MIME data to a primitive.Value.
func UnmarshalMIME(data []byte, typ *string) (primitive.Value, error) {
if len(data) == 0 {
return nil, nil
Expand Down
86 changes: 46 additions & 40 deletions pkg/plugin/networkx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,51 @@ import (
"github.com/siyul-park/uniflow/pkg/scheme"
)

type (
RouterNodeConfig struct {
ID ulid.ULID
}
// RouterNodeConfig holds the configuration for RouterNode.
type RouterNodeConfig struct {
ID ulid.ULID
}

RouterNode struct {
*node.OneToManyNode
tree *route
mu sync.RWMutex
}
// RouterNode represents a router node that handles routing based on HTTP methods, paths, and ports.
type RouterNode struct {
*node.OneToManyNode
tree *route
mu sync.RWMutex
}

RouterSpec struct {
scheme.SpecMeta `map:",inline"`
Routes []RouteInfo `map:"routes"`
}
// RouterSpec defines the specification for the router node.
type RouterSpec struct {
scheme.SpecMeta `map:",inline"`
Routes []RouteInfo `map:"routes"`
}

RouteInfo struct {
Method string `map:"method"`
Path string `map:"path"`
Port string `map:"port"`
}
// RouteInfo holds information about an individual route.
type RouteInfo struct {
Method string `map:"method"`
Path string `map:"path"`
Port string `map:"port"`
}

route struct {
kind routeKind
prefix string
parent *route
staticChildren []*route
paramChild *route
anyChild *route
paramNames []string
methods map[string]string
}
routeKind uint8
)
type route struct {
kind routeKind
prefix string
parent *route
staticChildren []*route
paramChild *route
anyChild *route
paramNames []string
methods map[string]string
}

type routeKind uint8

// KindRouter is the kind identifier for RouterNode.
const KindRouter = "router"

const (
KindRouter = "router"
KeyMethod = "method"
KeyPath = "path"
KeyParams = "params"
)

const (
Expand All @@ -62,14 +70,10 @@ const (
anyLabel = byte('*')
)

const (
KeyMethod = "method"
KeyPath = "path"
KeyParams = "params"
)

var _ node.Node = &RouterNode{}
var _ node.Node = (*RouterNode)(nil)
var _ scheme.Spec = (*RouterSpec)(nil)

// NewRouterNode creates a new instance of RouterNode with the given configuration.
func NewRouterNode(config RouterNodeConfig) *RouterNode {
id := config.ID

Expand All @@ -86,6 +90,7 @@ func NewRouterNode(config RouterNodeConfig) *RouterNode {
return n
}

// Add adds a new route to the router based on the provided HTTP method, path, and port.
func (n *RouterNode) Add(method, path, port string) {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -100,7 +105,7 @@ func (n *RouterNode) Add(method, path, port string) {
var paramNames []string

for i, lcpIndex := 0, len(path); i < lcpIndex; i++ {
if path[i] == ':' {
if path[i] == paramLabel {
if i > 0 && path[i-1] == '\\' {
path = path[:i-1] + path[i:]
i--
Expand All @@ -122,7 +127,7 @@ func (n *RouterNode) Add(method, path, port string) {
} else {
n.insert(method, path[:i], paramKind, nil, "")
}
} else if path[i] == '*' {
} else if path[i] == anyLabel {
n.insert(method, path[:i], staticKind, nil, "")
paramNames = append(paramNames, "*")
n.insert(method, path[:i+1], anyKind, paramNames, port)
Expand All @@ -132,6 +137,7 @@ func (n *RouterNode) Add(method, path, port string) {
n.insert(method, path, staticKind, paramNames, port)
}

// Close resets the router's tree when closing the node.
func (n *RouterNode) Close() error {
n.tree = &route{
methods: map[string]string{},
Expand Down

0 comments on commit 1a25aa4

Please sign in to comment.