Skip to content

Commit

Permalink
refactor: expose port pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Jan 4, 2025
1 parent e62abc7 commit bf4b3de
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 62 deletions.
3 changes: 2 additions & 1 deletion ext/pkg/network/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package network
import (
"context"
"crypto/tls"
"github.com/go-faker/faker/v4"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/go-faker/faker/v4"

"golang.org/x/net/http2"

"github.com/siyul-park/uniflow/pkg/node"
Expand Down
39 changes: 39 additions & 0 deletions pkg/port/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package port

import (
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/process"
)

// Pipe is a connection between two ports.
func Pipe() (*InPort, *OutPort) {
inPort, outPort := NewIn(), NewOut()

inPort.AddListener(ListenFunc(func(proc *process.Process) {
reader := inPort.Open(proc)
var writer *packet.Writer

for inPck := range reader.Read() {
if writer == nil {
writer = outPort.Open(proc)
}
if writer.Write(inPck) == 0 {
reader.Receive(inPck)
}
}
}))

outPort.AddListener(ListenFunc(func(proc *process.Process) {
var reader *packet.Reader
writer := outPort.Open(proc)

for backPck := range writer.Receive() {
if reader == nil {
reader = inPort.Open(proc)
}
reader.Receive(backPck)
}
}))

return inPort, outPort
}
45 changes: 45 additions & 0 deletions pkg/port/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package port

import (
"context"
"testing"
"time"

"github.com/go-faker/faker/v4"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/types"
"github.com/stretchr/testify/assert"
)

func TestPipe(t *testing.T) {
inPort0, outPort0 := Pipe()
defer inPort0.Close()
defer outPort0.Close()

inPort1, outPort1 := NewIn(), NewOut()
defer inPort1.Close()
defer outPort1.Close()

outPort0.Link(inPort1)
outPort1.Link(inPort0)

proc := process.New()
defer proc.Exit(nil)

inWriter := outPort1.Open(proc)
outReader := inPort1.Open(proc)

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

inPck := packet.New(types.NewString(faker.UUIDHyphenated()))
inWriter.Write(inPck)

select {
case outPck := <-outReader.Read():
assert.Equal(t, inPck.Payload(), outPck.Payload())
case <-ctx.Done():
assert.NoError(t, ctx.Err())
}
}
70 changes: 9 additions & 61 deletions pkg/symbol/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"sync"

"github.com/siyul-park/uniflow/pkg/node"
"github.com/siyul-park/uniflow/pkg/packet"
"github.com/siyul-park/uniflow/pkg/port"
"github.com/siyul-park/uniflow/pkg/process"
"github.com/siyul-park/uniflow/pkg/spec"
)

Expand Down Expand Up @@ -58,25 +56,15 @@ func (n *Cluster) Inbound(source string, target spec.Port) bool {
return false
}

inPort, ok1 := n.inPorts[source]
if !ok1 {
inPort = port.NewIn()
_, ok1 := n._inPorts[source]
outPort, ok2 := n.outPorts[source]
if !ok1 || !ok2 {
var inPort *port.InPort
inPort, outPort = port.Pipe()
n.inPorts[source] = inPort
}

outPort, ok2 := n._outPorts[source]
if !ok2 {
outPort = port.NewOut()
n._outPorts[source] = outPort
}

if !ok1 {
inPort.AddListener(n.inbound(inPort, outPort))
}
if !ok2 {
outPort.AddListener(n.outbound(inPort, outPort))
}

outPort.Link(prt)
return true
}
Expand All @@ -103,24 +91,14 @@ func (n *Cluster) Outbound(source string, target spec.Port) bool {
}

inPort, ok1 := n._inPorts[source]
if !ok1 {
inPort = port.NewIn()
_, ok2 := n.outPorts[source]
if !ok1 || !ok2 {
var outPort *port.OutPort
inPort, outPort = port.Pipe()
n._inPorts[source] = inPort
}

outPort, ok2 := n.outPorts[source]
if !ok2 {
outPort = port.NewOut()
n.outPorts[source] = outPort
}

if !ok1 {
inPort.AddListener(n.inbound(inPort, outPort))
}
if !ok2 {
outPort.AddListener(n.outbound(inPort, outPort))
}

prt.Link(inPort)
return true
}
Expand Down Expand Up @@ -205,33 +183,3 @@ func (n *Cluster) Close() error {
}
return nil
}

func (n *Cluster) inbound(inPort *port.InPort, outPort *port.OutPort) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
reader := inPort.Open(proc)
var writer *packet.Writer

for inPck := range reader.Read() {
if writer == nil {
writer = outPort.Open(proc)
}
if writer.Write(inPck) == 0 {
reader.Receive(inPck)
}
}
})
}

func (n *Cluster) outbound(inPort *port.InPort, outPort *port.OutPort) port.Listener {
return port.ListenFunc(func(proc *process.Process) {
var reader *packet.Reader
writer := outPort.Open(proc)

for backPck := range writer.Receive() {
if reader == nil {
reader = inPort.Open(proc)
}
reader.Receive(backPck)
}
})
}

0 comments on commit bf4b3de

Please sign in to comment.