diff --git a/pkg/plugin/networkx/http.go b/pkg/plugin/networkx/http.go index 1ebccce7..c24001e1 100644 --- a/pkg/plugin/networkx/http.go +++ b/pkg/plugin/networkx/http.go @@ -24,14 +24,11 @@ import ( // HTTPNode represents a node based on the HTTP protocol. type HTTPNode struct { + *node.OneToOneNode address string server *http.Server listener net.Listener listenerNetwork string - ioPort *port.Port - inPort *port.Port - outPort *port.Port - errPort *port.Port mu sync.RWMutex } @@ -245,13 +242,10 @@ func init() { // NewHTTPNode creates a new HTTPNode. func NewHTTPNode(address string) *HTTPNode { n := &HTTPNode{ + OneToOneNode: node.NewOneToOneNode(nil), address: address, server: new(http.Server), listenerNetwork: "tcp", - ioPort: port.New(), - inPort: port.New(), - outPort: port.New(), - errPort: port.New(), } n.server.Handler = n @@ -263,19 +257,7 @@ func (n *HTTPNode) Port(name string) (*port.Port, bool) { n.mu.RLock() defer n.mu.RUnlock() - switch name { - case node.PortIO: - return n.ioPort, true - case node.PortIn: - return n.inPort, true - case node.PortOut: - return n.outPort, true - case node.PortErr: - return n.errPort, true - default: - } - - return nil, false + return n.OneToOneNode.Port(name) } // ListenerAddr returns the address of the listener associated with the HTTPNode. @@ -336,18 +318,13 @@ func (n *HTTPNode) Shutdown(ctx context.Context) error { // Close closes the HTTPNode. func (n *HTTPNode) Close() error { - n.mu.RLock() - defer n.mu.RUnlock() + n.mu.Lock() + defer n.mu.Unlock() if err := n.server.Close(); err != nil { return err } - n.ioPort.Close() - n.inPort.Close() - n.outPort.Close() - n.errPort.Close() - - return nil + return n.OneToOneNode.Close() } // ServeHTTP handles HTTP requests for the HTTPNode. @@ -373,9 +350,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { } }() - ioStream := n.ioPort.Open(proc) - inStream := n.inPort.Open(proc) - outStream := n.outPort.Open(proc) + ioPort, _ := n.Port(node.PortIO) + inPort, _ := n.Port(node.PortIn) + outPort, _ := n.Port(node.PortOut) + + ioStream := ioPort.Open(proc) + inStream := inPort.Open(proc) + outStream := outPort.Open(proc) req, err := n.loadPayload(r) if err != nil { @@ -528,13 +509,15 @@ func (n *HTTPNode) storePayload(w http.ResponseWriter, res HTTPPayload) error { } func (n *HTTPNode) handleErrorPayload(proc *process.Process, err HTTPPayload) HTTPPayload { - if n.errPort.Links() == 0 { + errPort, _ := n.Port(node.PortErr) + + if errPort.Links() == 0 { return err } errPayload, _ := primitive.MarshalText(err) errPck := packet.New(errPayload) - errStream := n.errPort.Open(proc) + errStream := errPort.Open(proc) errStream.Send(errPck) outPck, ok := <-errStream.Receive()