diff --git a/pkg/node/onetomany_test.go b/pkg/node/onetomany_test.go index 7b9855e8..8d15a1a4 100644 --- a/pkg/node/onetomany_test.go +++ b/pkg/node/onetomany_test.go @@ -64,7 +64,7 @@ func TestOneToManyNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) @@ -109,7 +109,7 @@ func TestOneToManyNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/node/onetoone_test.go b/pkg/node/onetoone_test.go index fdb3fe1f..4b31c383 100644 --- a/pkg/node/onetoone_test.go +++ b/pkg/node/onetoone_test.go @@ -65,7 +65,7 @@ func TestOneToOneNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -102,7 +102,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) errStream := err.Open(proc) @@ -142,7 +142,7 @@ func TestOneToOneNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) @@ -188,7 +188,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/plugin/controllx/snippet_test.go b/pkg/plugin/controllx/snippet_test.go index 29915706..f0d7fd53 100644 --- a/pkg/plugin/controllx/snippet_test.go +++ b/pkg/plugin/controllx/snippet_test.go @@ -44,7 +44,7 @@ function main(inPayload: any): any { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -80,7 +80,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -114,7 +114,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -146,7 +146,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -185,7 +185,7 @@ function main(inPayload: any): any { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -222,7 +222,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -255,7 +255,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -288,7 +288,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) diff --git a/pkg/plugin/controllx/switch_test.go b/pkg/plugin/controllx/switch_test.go index 22fc7a5e..ac97a2e1 100644 --- a/pkg/plugin/controllx/switch_test.go +++ b/pkg/plugin/controllx/switch_test.go @@ -79,7 +79,7 @@ func TestSwitchNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/networkx/http.go b/pkg/plugin/networkx/http.go index 9d675772..e7d7c455 100644 --- a/pkg/plugin/networkx/http.go +++ b/pkg/plugin/networkx/http.go @@ -394,13 +394,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { proc := process.New() defer func() { proc.Stack().Wait() - proc.Close() + proc.Exit() }() go func() { select { case <-r.Context().Done(): - proc.Close() + proc.Exit() case <-proc.Done(): } }() diff --git a/pkg/plugin/networkx/router_test.go b/pkg/plugin/networkx/router_test.go index db5efcbc..6c4af490 100644 --- a/pkg/plugin/networkx/router_test.go +++ b/pkg/plugin/networkx/router_test.go @@ -71,7 +71,7 @@ func TestRouterNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/systemx/reflect_test.go b/pkg/plugin/systemx/reflect_test.go index 5ef1ec87..71b3eeb0 100644 --- a/pkg/plugin/systemx/reflect_test.go +++ b/pkg/plugin/systemx/reflect_test.go @@ -64,7 +64,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -108,7 +108,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -146,7 +146,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -190,7 +190,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) diff --git a/pkg/port/port_test.go b/pkg/port/port_test.go index 34093fdb..ea4aba6c 100644 --- a/pkg/port/port_test.go +++ b/pkg/port/port_test.go @@ -94,7 +94,7 @@ func TestPort_Open(t *testing.T) { proc := process.New() stream := port.Open(proc) - proc.Close() + proc.Exit() select { case <-stream.Done(): @@ -105,7 +105,7 @@ func TestPort_Open(t *testing.T) { t.Run("process closed", func(t *testing.T) { proc := process.New() - proc.Close() + proc.Exit() stream := port.Open(proc) diff --git a/pkg/process/process.go b/pkg/process/process.go index 2a2bee77..7aadb151 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -6,15 +6,14 @@ import ( "github.com/oklog/ulid/v2" ) -type ( - // Process is a processing unit that isolates data processing with others. - Process struct { - id ulid.ULID - stack *Stack - done chan struct{} - mu sync.RWMutex - } -) +// Process is a processing unit that isolates data processing from others. + +type Process struct { + id ulid.ULID + stack *Stack + done chan struct{} + mu sync.RWMutex +} // New creates a new Process. func New() *Process { @@ -26,7 +25,7 @@ func New() *Process { } } -// ID returns the ID. +// ID returns the ID of the process. func (p *Process) ID() ulid.ULID { p.mu.RLock() defer p.mu.RUnlock() @@ -34,7 +33,7 @@ func (p *Process) ID() ulid.ULID { return p.id } -// Stack returns a Stack +// Stack returns a process's stack. func (p *Process) Stack() *Stack { p.mu.RLock() defer p.mu.RUnlock() @@ -42,13 +41,13 @@ func (p *Process) Stack() *Stack { return p.stack } -// Done returns a channel that is closed when is closed. +// Done returns a channel that is closed when the process is closed. func (p *Process) Done() <-chan struct{} { return p.done } -// Close closes the Process. -func (p *Process) Close() { +// Exit closes the process. +func (p *Process) Exit() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/process/process_test.go b/pkg/process/process_test.go index 84e1e3aa..7107fe58 100644 --- a/pkg/process/process_test.go +++ b/pkg/process/process_test.go @@ -9,21 +9,21 @@ import ( func TestNew(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotNil(t, proc) } func TestProcess_ID(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotEqual(t, ulid.ULID{}, proc.ID()) } func TestProcess_Stack(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotNil(t, proc.Stack()) } @@ -37,7 +37,7 @@ func TestProcess_Close(t *testing.T) { default: } - proc.Close() + proc.Exit() select { case <-proc.Done(): diff --git a/pkg/process/stack.go b/pkg/process/stack.go index 46b6a280..93238e29 100644 --- a/pkg/process/stack.go +++ b/pkg/process/stack.go @@ -6,19 +6,17 @@ import ( "github.com/oklog/ulid/v2" ) -type ( - // Stack is trace object. - Stack struct { - stems map[ulid.ULID][]ulid.ULID - leaves map[ulid.ULID][]ulid.ULID - stacks map[ulid.ULID][]ulid.ULID - heads map[ulid.ULID][]ulid.ULID - wait sync.RWMutex - mu sync.RWMutex - } -) +// Stack is a data structure that manages relationships between ULIDs in a trace. +type Stack struct { + stems map[ulid.ULID][]ulid.ULID + leaves map[ulid.ULID][]ulid.ULID + stacks map[ulid.ULID][]ulid.ULID + heads map[ulid.ULID][]ulid.ULID + wait sync.RWMutex + mu sync.RWMutex +} -// NewStack returns a new Stack. +// NewStack creates a new Stack instance. func NewStack() *Stack { return &Stack{ stems: make(map[ulid.ULID][]ulid.ULID), @@ -28,7 +26,7 @@ func NewStack() *Stack { } } -// Link adds an relation. +// Link establishes a relationship between two ULIDs, a stem, and a leaf. func (s *Stack) Link(stem, leaf ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -51,7 +49,7 @@ func (s *Stack) Link(stem, leaf ulid.ULID) { s.leaves[stem] = append(s.leaves[stem], leaf) } -// Unlink deletes an relation. +// Unlink removes a relationship between a stem and a leaf. func (s *Stack) Unlink(stem, leaf ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -84,7 +82,7 @@ func (s *Stack) Unlink(stem, leaf ulid.ULID) { } } -// Push pushes the value. +// Push adds a value to the stack associated with a key. func (s *Stack) Push(key, value ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -96,7 +94,7 @@ func (s *Stack) Push(key, value ulid.ULID) { s.wait.RLock() } -// Pop pops the value. +// Pop removes and returns the top value from the stack associated with a key. func (s *Stack) Pop(key, value ulid.ULID) (ulid.ULID, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -169,7 +167,7 @@ func (s *Stack) Pop(key, value ulid.ULID) (ulid.ULID, bool) { return ulid.ULID{}, false } -// Clear removes a link from the child. +// Clear removes links from the child associated with a key. func (s *Stack) Clear(key ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -219,7 +217,7 @@ func (s *Stack) Clear(key ulid.ULID) { } } -// Len return the number of values. +// Len returns the number of values in the stack associated with a key. func (s *Stack) Len(key ulid.ULID) int { s.mu.RLock() defer s.mu.RUnlock() @@ -263,13 +261,13 @@ func (s *Stack) Len(key ulid.ULID) int { return count } -// Wait blocks until is empty. +// Wait blocks until the stack is empty. func (s *Stack) Wait() { s.wait.Lock() defer s.wait.Unlock() } -// Close closes all resources. +// Close releases all resources associated with the Stack. func (s *Stack) Close() { s.mu.Lock() defer s.mu.Unlock()