Skip to content

Commit

Permalink
refactor: use one to one node in http
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 9, 2023
1 parent 23e8181 commit fc55acf
Showing 1 changed file with 17 additions and 34 deletions.
51 changes: 17 additions & 34 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ import (

// HTTPNode represents a node based on the HTTP protocol.
type HTTPNode struct {
*node.OneToOneNode
address string
server *http.Server
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}

Expand Down Expand Up @@ -245,13 +242,10 @@ func init() {
// NewHTTPNode creates a new HTTPNode.
func NewHTTPNode(address string) *HTTPNode {
n := &HTTPNode{
OneToOneNode: node.NewOneToOneNode(nil),
address: address,
server: new(http.Server),
listenerNetwork: "tcp",
ioPort: port.New(),
inPort: port.New(),
outPort: port.New(),
errPort: port.New(),
}
n.server.Handler = n

Expand All @@ -263,19 +257,7 @@ func (n *HTTPNode) Port(name string) (*port.Port, bool) {
n.mu.RLock()
defer n.mu.RUnlock()

switch name {
case node.PortIO:
return n.ioPort, true
case node.PortIn:
return n.inPort, true
case node.PortOut:
return n.outPort, true
case node.PortErr:
return n.errPort, true
default:
}

return nil, false
return n.OneToOneNode.Port(name)
}

// ListenerAddr returns the address of the listener associated with the HTTPNode.
Expand Down Expand Up @@ -336,18 +318,13 @@ func (n *HTTPNode) Shutdown(ctx context.Context) error {

// Close closes the HTTPNode.
func (n *HTTPNode) Close() error {
n.mu.RLock()
defer n.mu.RUnlock()
n.mu.Lock()
defer n.mu.Unlock()

if err := n.server.Close(); err != nil {
return err
}
n.ioPort.Close()
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()

return nil
return n.OneToOneNode.Close()
}

// ServeHTTP handles HTTP requests for the HTTPNode.
Expand All @@ -373,9 +350,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}()

ioStream := n.ioPort.Open(proc)
inStream := n.inPort.Open(proc)
outStream := n.outPort.Open(proc)
ioPort, _ := n.Port(node.PortIO)
inPort, _ := n.Port(node.PortIn)
outPort, _ := n.Port(node.PortOut)

ioStream := ioPort.Open(proc)
inStream := inPort.Open(proc)
outStream := outPort.Open(proc)

req, err := n.loadPayload(r)
if err != nil {
Expand Down Expand Up @@ -528,13 +509,15 @@ func (n *HTTPNode) storePayload(w http.ResponseWriter, res HTTPPayload) error {
}

func (n *HTTPNode) handleErrorPayload(proc *process.Process, err HTTPPayload) HTTPPayload {
if n.errPort.Links() == 0 {
errPort, _ := n.Port(node.PortErr)

if errPort.Links() == 0 {
return err
}

errPayload, _ := primitive.MarshalText(err)
errPck := packet.New(errPayload)
errStream := n.errPort.Open(proc)
errStream := errPort.Open(proc)
errStream.Send(errPck)

outPck, ok := <-errStream.Receive()
Expand Down

0 comments on commit fc55acf

Please sign in to comment.