Skip to content

Commit

Permalink
refactor: port
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 1, 2023
1 parent 8b9971a commit b4c24c0
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 134 deletions.
8 changes: 4 additions & 4 deletions pkg/port/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strconv"
)

// GetIndex is return index of the given port.
// GetIndex returns the index of the given port.
func GetIndex(source string, target string) (int, bool) {
regex, err := regexp.Compile(source + `\[(\d+)\]`)
if err != nil {
Expand All @@ -16,14 +16,14 @@ func GetIndex(source string, target string) (int, bool) {
if len(groups) == 0 {
return 0, false
}
i, err := strconv.Atoi(groups[0][1])
index, err := strconv.Atoi(groups[0][1])
if err != nil {
return 0, false
}
return i, true
return index, true
}

// SetIndex is return full port name of the given port and index.
// SetIndex returns the full port name of the given port and index.
func SetIndex(source string, index int) string {
return fmt.Sprintf(source+"[%d]", index)
}
65 changes: 0 additions & 65 deletions pkg/port/inithook.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package port

import (
"sync"

"github.com/siyul-park/uniflow/pkg/process"
)

Expand All @@ -13,73 +11,10 @@ type (
}

InitHookFunc func(proc *process.Process)

// InitOnceHook is a hook that runs only once per process.process.
InitOnceHook struct {
init InitHook
processes map[*process.Process]struct{}
mu sync.RWMutex
}
)

var _ InitHook = InitHookFunc(func(proc *process.Process) {})
var _ InitHook = &InitOnceHook{}

func (h InitHookFunc) Init(proc *process.Process) {
h(proc)
}

// InitOnce returns a new InitOnceHook.
func InitOnce(h InitHook) *InitOnceHook {
return &InitOnceHook{
init: h,
processes: make(map[*process.Process]struct{}),
}
}

func (h *InitOnceHook) Init(proc *process.Process) {
if ok := func() bool {
h.mu.RLock()
defer h.mu.RUnlock()

_, ok := h.processes[proc]
return !ok
}(); !ok {
return
}

if ok := func() bool {
h.mu.Lock()
defer h.mu.Unlock()

_, ok := h.processes[proc]
if ok {
return false
}

h.processes[proc] = struct{}{}
go func() {
<-proc.Done()

h.mu.Lock()
defer h.mu.Unlock()

delete(h.processes, proc)
}()

return true
}(); !ok {
return
}

h.init.Init(proc)
}

func (h *InitOnceHook) Close() {
h.mu.Lock()
defer h.mu.Unlock()

for proc := range h.processes {
delete(h.processes, proc)
}
}
45 changes: 22 additions & 23 deletions pkg/port/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,15 @@ import (
"github.com/siyul-park/uniflow/pkg/packet"
)

type (
// ReadPipe is a Pipe that can be Receive Packet.
ReadPipe struct {
in chan *packet.Packet
out chan *packet.Packet
done chan struct{}
mu sync.RWMutex
}

// WritePipe is a Pipe that can be Send Packet.
WritePipe struct {
links []*ReadPipe
done chan struct{}
mu sync.RWMutex
}
)
// ReadPipe represents a unidirectional pipe for receiving packets.
type ReadPipe struct {
in chan *packet.Packet
out chan *packet.Packet
done chan struct{}
mu sync.RWMutex
}

// NewReadPipe returns a new ReadPipe.
// NewReadPipe creates a new ReadPipe instance.
func NewReadPipe() *ReadPipe {
p := &ReadPipe{
in: make(chan *packet.Packet),
Expand Down Expand Up @@ -70,7 +61,7 @@ func NewReadPipe() *ReadPipe {
return p
}

// Receive returns a channel that receives Packet.
// Receive returns a channel that receives packets.
func (p *ReadPipe) Receive() <-chan *packet.Packet {
return p.out
}
Expand All @@ -81,7 +72,7 @@ func (p *ReadPipe) Done() <-chan struct{} {
}

// Close closes the ReadPipe.
// Packet that are not processed will be discard.
// Unprocessed packets will be discarded.
func (p *ReadPipe) Close() {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -96,6 +87,7 @@ func (p *ReadPipe) Close() {
close(p.in)
}

// send sends a packet through the pipe.
func (p *ReadPipe) send(pck *packet.Packet) {
p.mu.RLock()
defer p.mu.RUnlock()
Expand All @@ -107,7 +99,14 @@ func (p *ReadPipe) send(pck *packet.Packet) {
}
}

// NewWritePipe returns a new WritePipe.
// WritePipe represents a unidirectional pipe for sending packets.
type WritePipe struct {
links []*ReadPipe
done chan struct{}
mu sync.RWMutex
}

// NewWritePipe creates a new WritePipe instance.
func NewWritePipe() *WritePipe {
return &WritePipe{
links: nil,
Expand All @@ -116,7 +115,7 @@ func NewWritePipe() *WritePipe {
}
}

// Send a Packet to all linked ReadPipe.
// Send sends a packet to all linked ReadPipe instances.
func (p *WritePipe) Send(pck *packet.Packet) {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -133,7 +132,7 @@ func (p *WritePipe) Send(pck *packet.Packet) {
wg.Wait()
}

// Link a ReadPipe to enable communication with each other.
// Link links a ReadPipe to enable communication with each other.
func (p *WritePipe) Link(pipe *ReadPipe) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -175,7 +174,7 @@ func (p *WritePipe) Done() <-chan struct{} {
}

// Close closes the WritePipe.
// Packet that are not processed will be discard.
// Unprocessed packets will be discarded.
func (p *WritePipe) Close() {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
33 changes: 16 additions & 17 deletions pkg/port/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import (
"github.com/siyul-park/uniflow/pkg/process"
)

type (
// Port is a linking terminal that allows *packet.Packet to be exchanged.
Port struct {
streams map[*process.Process]*Stream
links []*Port
initHooks []InitHook
done chan struct{}
mu sync.RWMutex
}
)
// Port is a linking terminal that allows *packet.Packet to be exchanged.
type Port struct {
streams map[*process.Process]*Stream
links []*Port
initHooks []InitHook
done chan struct{}
mu sync.RWMutex
}

// New returns a new Port.
func New() *Port {
Expand All @@ -25,15 +23,15 @@ func New() *Port {
}
}

// AddInitHook adds a InitHook.
// AddInitHook adds an InitHook to the Port.
func (p *Port) AddInitHook(hook InitHook) {
p.mu.Lock()
defer p.mu.Unlock()

p.initHooks = append(p.initHooks, hook)
}

// Link connects two Port to enable communication with each other.
// Link connects two Ports to enable communication with each other.
func (p *Port) Link(port *Port) {
p.link(port)
port.link(p)
Expand All @@ -45,16 +43,17 @@ func (p *Port) Unlink(port *Port) {
port.unlink(p)
}

// Links return length of linked.
// Links returns the number of linked Ports.
func (p *Port) Links() int {
p.mu.RLock()
defer p.mu.RUnlock()

return len(p.links)
}

// Open Stream to communicate. For each process, Stream is opened independently.
// When Process is closed, Stream is also closed. Stream Send and Receive Packet to Broadcast to all other Port connected to the Port.
// Open creates or returns an existing Stream for communication with a process.
// The Stream is closed when the associated Process or Port is closed.
// It broadcasts sent and received packets to all other Ports connected to it.
func (p *Port) Open(proc *process.Process) *Stream {
select {
case <-proc.Done():
Expand Down Expand Up @@ -133,8 +132,8 @@ func (p *Port) Done() <-chan struct{} {
return p.done
}

// Close the Port.
// All Stream currently open will also be shut down and any Packet that are not processed will be discard.
// Close closes the Port.
// All Streams currently open will also be shut down, and any unprocessed packets will be discarded.
func (p *Port) Close() {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
43 changes: 20 additions & 23 deletions pkg/port/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import (
"github.com/siyul-park/uniflow/pkg/packet"
)

type (
// Stream is a channel where you can exchange *packet.Packet.
Stream struct {
id ulid.ULID
read *ReadPipe
write *WritePipe
links []*Stream
done chan struct{}
mu sync.RWMutex
}
)
// Stream represents a communication channel for exchanging *packet.Packet.
type Stream struct {
id ulid.ULID
read *ReadPipe
write *WritePipe
links []*Stream
done chan struct{}
mu sync.RWMutex
}

// NewStream returns a new Stream.
// NewStream creates a new Stream instance.
func NewStream() *Stream {
return &Stream{
id: ulid.Make(),
Expand All @@ -29,51 +27,48 @@ func NewStream() *Stream {
}
}

// ID returns the ID.
// ID returns the Stream's ID.
func (s *Stream) ID() ulid.ULID {
s.mu.RLock()
defer s.mu.RUnlock()

return s.id
}

// Send sends a Packet to linked Stream.
// Send sends a *packet.Packet to linked Streams.
func (s *Stream) Send(pck *packet.Packet) {
s.write.Send(pck)
}

// Receive receives a Packet from linked Stream.
// Receive returns a channel for receiving *packet.Packet from linked Streams.
func (s *Stream) Receive() <-chan *packet.Packet {
return s.read.Receive()
}

// Link connects two Stream to enable communication with each other.
// Link connects two Streams for communication.
func (s *Stream) Link(stream *Stream) {
s.link(stream)
stream.link(s)
}

// Unlink removes the linked Stream from being able to communicate further.
// Unlink disconnects two linked Streams.
func (s *Stream) Unlink(stream *Stream) {
s.unlink(stream)
stream.unlink(s)
}

// Links returns length of linked.
// Links returns the number of linked Streams.
func (s *Stream) Links() int {
s.mu.RLock()
defer s.mu.RUnlock()

return len(s.links)
}

// Done returns a channel which is closed when the Stream is closed.
// Done returns a channel that's closed when the Stream is closed.
func (s *Stream) Done() <-chan struct{} {
return s.done
}

// Close closes the Stream.
// Shut down and any Packet that are not processed will be discard.
// Close closes the Stream, discarding any unprocessed packets.
func (s *Stream) Close() {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -89,6 +84,7 @@ func (s *Stream) Close() {
s.write.Close()
}

// link connects the current Stream with another for communication.
func (s *Stream) link(stream *Stream) {
if stream == s {
return
Expand All @@ -107,6 +103,7 @@ func (s *Stream) link(stream *Stream) {
s.write.Link(stream.read)
}

// unlink disconnects the current Stream from another.
func (s *Stream) unlink(stream *Stream) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
Loading

0 comments on commit b4c24c0

Please sign in to comment.