From b4c24c06e84407bd852848e69971c10b9b1876e3 Mon Sep 17 00:00:00 2001 From: siyul-park Date: Thu, 30 Nov 2023 21:12:08 -0500 Subject: [PATCH] refactor: port --- pkg/port/array.go | 8 ++--- pkg/port/inithook.go | 65 ---------------------------------------- pkg/port/pipe.go | 45 ++++++++++++++-------------- pkg/port/port.go | 33 ++++++++++---------- pkg/port/stream.go | 43 +++++++++++++------------- pkg/symbol/loadhook.go | 1 - pkg/symbol/unloadhook.go | 1 - 7 files changed, 62 insertions(+), 134 deletions(-) diff --git a/pkg/port/array.go b/pkg/port/array.go index 4bbc69d4..649abfbc 100644 --- a/pkg/port/array.go +++ b/pkg/port/array.go @@ -6,7 +6,7 @@ import ( "strconv" ) -// GetIndex is return index of the given port. +// GetIndex returns the index of the given port. func GetIndex(source string, target string) (int, bool) { regex, err := regexp.Compile(source + `\[(\d+)\]`) if err != nil { @@ -16,14 +16,14 @@ func GetIndex(source string, target string) (int, bool) { if len(groups) == 0 { return 0, false } - i, err := strconv.Atoi(groups[0][1]) + index, err := strconv.Atoi(groups[0][1]) if err != nil { return 0, false } - return i, true + return index, true } -// SetIndex is return full port name of the given port and index. +// SetIndex returns the full port name of the given port and index. func SetIndex(source string, index int) string { return fmt.Sprintf(source+"[%d]", index) } diff --git a/pkg/port/inithook.go b/pkg/port/inithook.go index 470e9bd2..a1bc3cad 100644 --- a/pkg/port/inithook.go +++ b/pkg/port/inithook.go @@ -1,8 +1,6 @@ package port import ( - "sync" - "github.com/siyul-park/uniflow/pkg/process" ) @@ -13,73 +11,10 @@ type ( } InitHookFunc func(proc *process.Process) - - // InitOnceHook is a hook that runs only once per process.process. - InitOnceHook struct { - init InitHook - processes map[*process.Process]struct{} - mu sync.RWMutex - } ) var _ InitHook = InitHookFunc(func(proc *process.Process) {}) -var _ InitHook = &InitOnceHook{} func (h InitHookFunc) Init(proc *process.Process) { h(proc) } - -// InitOnce returns a new InitOnceHook. -func InitOnce(h InitHook) *InitOnceHook { - return &InitOnceHook{ - init: h, - processes: make(map[*process.Process]struct{}), - } -} - -func (h *InitOnceHook) Init(proc *process.Process) { - if ok := func() bool { - h.mu.RLock() - defer h.mu.RUnlock() - - _, ok := h.processes[proc] - return !ok - }(); !ok { - return - } - - if ok := func() bool { - h.mu.Lock() - defer h.mu.Unlock() - - _, ok := h.processes[proc] - if ok { - return false - } - - h.processes[proc] = struct{}{} - go func() { - <-proc.Done() - - h.mu.Lock() - defer h.mu.Unlock() - - delete(h.processes, proc) - }() - - return true - }(); !ok { - return - } - - h.init.Init(proc) -} - -func (h *InitOnceHook) Close() { - h.mu.Lock() - defer h.mu.Unlock() - - for proc := range h.processes { - delete(h.processes, proc) - } -} diff --git a/pkg/port/pipe.go b/pkg/port/pipe.go index 70b2db9a..8b21a9a2 100644 --- a/pkg/port/pipe.go +++ b/pkg/port/pipe.go @@ -6,24 +6,15 @@ import ( "github.com/siyul-park/uniflow/pkg/packet" ) -type ( - // ReadPipe is a Pipe that can be Receive Packet. - ReadPipe struct { - in chan *packet.Packet - out chan *packet.Packet - done chan struct{} - mu sync.RWMutex - } - - // WritePipe is a Pipe that can be Send Packet. - WritePipe struct { - links []*ReadPipe - done chan struct{} - mu sync.RWMutex - } -) +// ReadPipe represents a unidirectional pipe for receiving packets. +type ReadPipe struct { + in chan *packet.Packet + out chan *packet.Packet + done chan struct{} + mu sync.RWMutex +} -// NewReadPipe returns a new ReadPipe. +// NewReadPipe creates a new ReadPipe instance. func NewReadPipe() *ReadPipe { p := &ReadPipe{ in: make(chan *packet.Packet), @@ -70,7 +61,7 @@ func NewReadPipe() *ReadPipe { return p } -// Receive returns a channel that receives Packet. +// Receive returns a channel that receives packets. func (p *ReadPipe) Receive() <-chan *packet.Packet { return p.out } @@ -81,7 +72,7 @@ func (p *ReadPipe) Done() <-chan struct{} { } // Close closes the ReadPipe. -// Packet that are not processed will be discard. +// Unprocessed packets will be discarded. func (p *ReadPipe) Close() { p.mu.Lock() defer p.mu.Unlock() @@ -96,6 +87,7 @@ func (p *ReadPipe) Close() { close(p.in) } +// send sends a packet through the pipe. func (p *ReadPipe) send(pck *packet.Packet) { p.mu.RLock() defer p.mu.RUnlock() @@ -107,7 +99,14 @@ func (p *ReadPipe) send(pck *packet.Packet) { } } -// NewWritePipe returns a new WritePipe. +// WritePipe represents a unidirectional pipe for sending packets. +type WritePipe struct { + links []*ReadPipe + done chan struct{} + mu sync.RWMutex +} + +// NewWritePipe creates a new WritePipe instance. func NewWritePipe() *WritePipe { return &WritePipe{ links: nil, @@ -116,7 +115,7 @@ func NewWritePipe() *WritePipe { } } -// Send a Packet to all linked ReadPipe. +// Send sends a packet to all linked ReadPipe instances. func (p *WritePipe) Send(pck *packet.Packet) { p.mu.Lock() defer p.mu.Unlock() @@ -133,7 +132,7 @@ func (p *WritePipe) Send(pck *packet.Packet) { wg.Wait() } -// Link a ReadPipe to enable communication with each other. +// Link links a ReadPipe to enable communication with each other. func (p *WritePipe) Link(pipe *ReadPipe) { p.mu.Lock() defer p.mu.Unlock() @@ -175,7 +174,7 @@ func (p *WritePipe) Done() <-chan struct{} { } // Close closes the WritePipe. -// Packet that are not processed will be discard. +// Unprocessed packets will be discarded. func (p *WritePipe) Close() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/port/port.go b/pkg/port/port.go index 651b1d5e..577e2405 100644 --- a/pkg/port/port.go +++ b/pkg/port/port.go @@ -6,16 +6,14 @@ import ( "github.com/siyul-park/uniflow/pkg/process" ) -type ( - // Port is a linking terminal that allows *packet.Packet to be exchanged. - Port struct { - streams map[*process.Process]*Stream - links []*Port - initHooks []InitHook - done chan struct{} - mu sync.RWMutex - } -) +// Port is a linking terminal that allows *packet.Packet to be exchanged. +type Port struct { + streams map[*process.Process]*Stream + links []*Port + initHooks []InitHook + done chan struct{} + mu sync.RWMutex +} // New returns a new Port. func New() *Port { @@ -25,7 +23,7 @@ func New() *Port { } } -// AddInitHook adds a InitHook. +// AddInitHook adds an InitHook to the Port. func (p *Port) AddInitHook(hook InitHook) { p.mu.Lock() defer p.mu.Unlock() @@ -33,7 +31,7 @@ func (p *Port) AddInitHook(hook InitHook) { p.initHooks = append(p.initHooks, hook) } -// Link connects two Port to enable communication with each other. +// Link connects two Ports to enable communication with each other. func (p *Port) Link(port *Port) { p.link(port) port.link(p) @@ -45,7 +43,7 @@ func (p *Port) Unlink(port *Port) { port.unlink(p) } -// Links return length of linked. +// Links returns the number of linked Ports. func (p *Port) Links() int { p.mu.RLock() defer p.mu.RUnlock() @@ -53,8 +51,9 @@ func (p *Port) Links() int { return len(p.links) } -// Open Stream to communicate. For each process, Stream is opened independently. -// When Process is closed, Stream is also closed. Stream Send and Receive Packet to Broadcast to all other Port connected to the Port. +// Open creates or returns an existing Stream for communication with a process. +// The Stream is closed when the associated Process or Port is closed. +// It broadcasts sent and received packets to all other Ports connected to it. func (p *Port) Open(proc *process.Process) *Stream { select { case <-proc.Done(): @@ -133,8 +132,8 @@ func (p *Port) Done() <-chan struct{} { return p.done } -// Close the Port. -// All Stream currently open will also be shut down and any Packet that are not processed will be discard. +// Close closes the Port. +// All Streams currently open will also be shut down, and any unprocessed packets will be discarded. func (p *Port) Close() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/port/stream.go b/pkg/port/stream.go index bcf9f83e..b9458c88 100644 --- a/pkg/port/stream.go +++ b/pkg/port/stream.go @@ -7,19 +7,17 @@ import ( "github.com/siyul-park/uniflow/pkg/packet" ) -type ( - // Stream is a channel where you can exchange *packet.Packet. - Stream struct { - id ulid.ULID - read *ReadPipe - write *WritePipe - links []*Stream - done chan struct{} - mu sync.RWMutex - } -) +// Stream represents a communication channel for exchanging *packet.Packet. +type Stream struct { + id ulid.ULID + read *ReadPipe + write *WritePipe + links []*Stream + done chan struct{} + mu sync.RWMutex +} -// NewStream returns a new Stream. +// NewStream creates a new Stream instance. func NewStream() *Stream { return &Stream{ id: ulid.Make(), @@ -29,51 +27,48 @@ func NewStream() *Stream { } } -// ID returns the ID. +// ID returns the Stream's ID. func (s *Stream) ID() ulid.ULID { s.mu.RLock() defer s.mu.RUnlock() - return s.id } -// Send sends a Packet to linked Stream. +// Send sends a *packet.Packet to linked Streams. func (s *Stream) Send(pck *packet.Packet) { s.write.Send(pck) } -// Receive receives a Packet from linked Stream. +// Receive returns a channel for receiving *packet.Packet from linked Streams. func (s *Stream) Receive() <-chan *packet.Packet { return s.read.Receive() } -// Link connects two Stream to enable communication with each other. +// Link connects two Streams for communication. func (s *Stream) Link(stream *Stream) { s.link(stream) stream.link(s) } -// Unlink removes the linked Stream from being able to communicate further. +// Unlink disconnects two linked Streams. func (s *Stream) Unlink(stream *Stream) { s.unlink(stream) stream.unlink(s) } -// Links returns length of linked. +// Links returns the number of linked Streams. func (s *Stream) Links() int { s.mu.RLock() defer s.mu.RUnlock() - return len(s.links) } -// Done returns a channel which is closed when the Stream is closed. +// Done returns a channel that's closed when the Stream is closed. func (s *Stream) Done() <-chan struct{} { return s.done } -// Close closes the Stream. -// Shut down and any Packet that are not processed will be discard. +// Close closes the Stream, discarding any unprocessed packets. func (s *Stream) Close() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +84,7 @@ func (s *Stream) Close() { s.write.Close() } +// link connects the current Stream with another for communication. func (s *Stream) link(stream *Stream) { if stream == s { return @@ -107,6 +103,7 @@ func (s *Stream) link(stream *Stream) { s.write.Link(stream.read) } +// unlink disconnects the current Stream from another. func (s *Stream) unlink(stream *Stream) { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/symbol/loadhook.go b/pkg/symbol/loadhook.go index 7002a171..005a1fa5 100644 --- a/pkg/symbol/loadhook.go +++ b/pkg/symbol/loadhook.go @@ -13,7 +13,6 @@ type ( LoadHookFunc func(n node.Node) error ) -// Ensure that LoadHookFunc implements the LoadHook interface. var _ LoadHook = LoadHookFunc(func(n node.Node) error { return nil }) // Load is the implementation of the Load method for LoadHookFunc. diff --git a/pkg/symbol/unloadhook.go b/pkg/symbol/unloadhook.go index 937740e7..2dc61295 100644 --- a/pkg/symbol/unloadhook.go +++ b/pkg/symbol/unloadhook.go @@ -13,7 +13,6 @@ type ( UnloadHookFunc func(n node.Node) error ) -// Ensure that UnloadHookFunc implements the UnloadHook interface. var _ UnloadHook = UnloadHookFunc(func(n node.Node) error { return nil }) // Unload is the implementation of the Unload method for UnloadHookFunc.