Skip to content

Commit

Permalink
feat: support in/out port in http (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park authored Nov 24, 2023
1 parent 4b66c51 commit 9e8b547
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 30 deletions.
4 changes: 2 additions & 2 deletions examples/echo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: http
address: :8000
links:
io:
out:
- name: router
port: in

Expand All @@ -15,4 +15,4 @@
links:
out[0]:
- name: http
port: io
port: in
71 changes: 57 additions & 14 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type (
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}
Expand Down Expand Up @@ -267,6 +269,8 @@ func NewHTTPNode(config HTTPNodeConfig) *HTTPNode {
server: new(http.Server),
listenerNetwork: "tcp",
ioPort: port.New(),
inPort: port.New(),
outPort: port.New(),
errPort: port.New(),
}
n.server.Handler = n
Expand All @@ -288,6 +292,10 @@ func (n *HTTPNode) Port(name string) (*port.Port, bool) {
switch name {
case node.PortIO:
return n.ioPort, true
case node.PortIn:
return n.inPort, true
case node.PortOut:
return n.outPort, true
case node.PortErr:
return n.errPort, true
default:
Expand Down Expand Up @@ -349,6 +357,8 @@ func (n *HTTPNode) Close() error {
return err
}
n.ioPort.Close()
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()

return nil
Expand All @@ -372,8 +382,9 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}()

outStream := n.ioPort.Open(proc)
inStream := n.ioPort.Open(proc)
ioStream := n.ioPort.Open(proc)
inStream := n.inPort.Open(proc)
outStream := n.outPort.Open(proc)

req, err := n.request(r)
if err != nil {
Expand All @@ -386,24 +397,56 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
outPck := packet.New(outPayload)
outStream.Send(outPck)

inPck, ok := <-inStream.Receive()
if !ok {
_ = n.response(r, w, n.errorPayload(proc, ServiceUnavailable))
if ioStream.Links() > 0 {
proc.Stack().Push(outPck.ID(), ioStream.ID())
ioStream.Send(outPck)
}
if outStream.Links() > 0 {
proc.Stack().Push(outPck.ID(), outStream.ID())
outStream.Send(outPck)
}
if ioStream.Links()+outStream.Links() == 0 {
return
}
proc.Stack().Clear(inPck.ID())

inPayload := inPck.Payload()
for {
var stream *port.Stream
var inPck *packet.Packet
var ok bool
select {
case inPck, ok = <-inStream.Receive():
stream = inStream
case inPck, ok = <-outStream.Receive():
stream = outStream
case inPck, ok = <-ioStream.Receive():
stream = ioStream
}
if !ok {
_ = n.response(r, w, n.errorPayload(proc, ServiceUnavailable))
return
}

var res HTTPPayload
if err := primitive.Unmarshal(inPayload, &res); err != nil {
res.Body = inPayload
}
if stream == outStream || stream == ioStream {
if _, ok := proc.Stack().Pop(inPck.ID(), stream.ID()); !ok {
continue
}
} else {
proc.Stack().Clear(inPck.ID())
}

inPayload := inPck.Payload()

var res HTTPPayload
if err := primitive.Unmarshal(inPayload, &res); err != nil {
res.Body = inPayload
}

if err := n.response(r, w, res); err != nil {
_ = n.response(r, w, n.errorPayload(proc, InternalServerError))
}

if err := n.response(r, w, res); err != nil {
_ = n.response(r, w, n.errorPayload(proc, InternalServerError))
break
}
}

Expand Down
41 changes: 27 additions & 14 deletions pkg/plugin/networkx/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func TestHTTPNode_Port(t *testing.T) {
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortIn)
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortOut)
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortErr)
assert.True(t, ok)
assert.NotNil(t, p)
Expand Down Expand Up @@ -69,7 +77,7 @@ func TestHTTPNode_StartAndClose(t *testing.T) {
}

func TestHTTPNode_ServeHTTP(t *testing.T) {
t.Run("Hello World", func(t *testing.T) {
t.Run("IO", func(t *testing.T) {
n := NewHTTPNode(HTTPNodeConfig{})
defer func() { _ = n.Close() }()

Expand Down Expand Up @@ -105,29 +113,34 @@ func TestHTTPNode_ServeHTTP(t *testing.T) {
assert.Equal(t, "Hello World!", w.Body.String())
})

t.Run("HTTPError", func(t *testing.T) {
t.Run("In/Out", func(t *testing.T) {
n := NewHTTPNode(HTTPNodeConfig{})
defer func() { _ = n.Close() }()

httpErr := NotFound
in := port.New()
inPort, _ := n.Port(node.PortIn)
inPort.Link(in)

io := port.New()
ioPort, _ := n.Port(node.PortIO)
ioPort.Link(io)
out := port.New()
outPort, _ := n.Port(node.PortOut)
outPort.Link(out)

io.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
ioStream := io.Open(proc)
out.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
inStream := in.Open(proc)
outStream := out.Open(proc)

for {
inPck, ok := <-ioStream.Receive()
inPck, ok := <-outStream.Receive()
if !ok {
return
}

outPayload, _ := primitive.MarshalText(httpErr)
outPck := packet.New(outPayload)
outPck := packet.New(primitive.NewMap(
primitive.NewString("body"), primitive.NewString("Hello World!"),
primitive.NewString("status"), primitive.NewInt(200),
))
proc.Stack().Link(inPck.ID(), outPck.ID())
ioStream.Send(outPck)
inStream.Send(outPck)
}
}))

Expand All @@ -136,8 +149,8 @@ func TestHTTPNode_ServeHTTP(t *testing.T) {

n.ServeHTTP(w, r)

assert.Equal(t, httpErr.Status, w.Result().StatusCode)
assert.Equal(t, 200, w.Result().StatusCode)
assert.Equal(t, TextPlainCharsetUTF8, w.Header().Get(HeaderContentType))
assert.Equal(t, httpErr.Body.Interface(), w.Body.String())
assert.Equal(t, "Hello World!", w.Body.String())
})
}

0 comments on commit 9e8b547

Please sign in to comment.