Skip to content

Commit

Permalink
refactor: process
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 1, 2023
1 parent 46a2c47 commit 8b9971a
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 62 deletions.
4 changes: 2 additions & 2 deletions pkg/node/onetomany_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestOneToManyNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestOneToManyNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
errStream := err.Open(proc)
Expand Down
8 changes: 4 additions & 4 deletions pkg/node/onetoone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestOneToOneNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -102,7 +102,7 @@ func TestOneToOneNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)
errStream := err.Open(proc)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestOneToOneNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestOneToOneNode_Send(t *testing.T) {
errPort.Link(err)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
errStream := err.Open(proc)
Expand Down
16 changes: 8 additions & 8 deletions pkg/plugin/controllx/snippet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function main(inPayload: any): any {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -80,7 +80,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -114,7 +114,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -146,7 +146,7 @@ function main(inPayload) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -185,7 +185,7 @@ function main(inPayload: any): any {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -222,7 +222,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -255,7 +255,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -288,7 +288,7 @@ function main(inPayload) {

for i := 0; i < b.N; i++ {
proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/controllx/switch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSwitchNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proc := process.New()
defer func() {
proc.Stack().Wait()
proc.Close()
proc.Exit()
}()

go func() {
select {
case <-r.Context().Done():
proc.Close()
proc.Exit()
case <-proc.Done():
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/networkx/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestRouterNode_Send(t *testing.T) {
outPort.Link(out)

proc := process.New()
defer proc.Close()
defer proc.Exit()

inStream := in.Open(proc)
outStream := out.Open(proc)
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/systemx/reflect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestReflectNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -108,7 +108,7 @@ func TestReflectNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestReflectNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down Expand Up @@ -190,7 +190,7 @@ func TestReflectNode_Send(t *testing.T) {
ioPort.Link(io)

proc := process.New()
defer proc.Close()
defer proc.Exit()

ioStream := io.Open(proc)

Expand Down
4 changes: 2 additions & 2 deletions pkg/port/port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestPort_Open(t *testing.T) {
proc := process.New()
stream := port.Open(proc)

proc.Close()
proc.Exit()

select {
case <-stream.Done():
Expand All @@ -105,7 +105,7 @@ func TestPort_Open(t *testing.T) {

t.Run("process closed", func(t *testing.T) {
proc := process.New()
proc.Close()
proc.Exit()

stream := port.Open(proc)

Expand Down
27 changes: 13 additions & 14 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"github.com/oklog/ulid/v2"
)

type (
// Process is a processing unit that isolates data processing with others.
Process struct {
id ulid.ULID
stack *Stack
done chan struct{}
mu sync.RWMutex
}
)
// Process is a processing unit that isolates data processing from others.

type Process struct {
id ulid.ULID
stack *Stack
done chan struct{}
mu sync.RWMutex
}

// New creates a new Process.
func New() *Process {
Expand All @@ -26,29 +25,29 @@ func New() *Process {
}
}

// ID returns the ID.
// ID returns the ID of the process.
func (p *Process) ID() ulid.ULID {
p.mu.RLock()
defer p.mu.RUnlock()

return p.id
}

// Stack returns a Stack
// Stack returns a process's stack.
func (p *Process) Stack() *Stack {
p.mu.RLock()
defer p.mu.RUnlock()

return p.stack
}

// Done returns a channel that is closed when is closed.
// Done returns a channel that is closed when the process is closed.
func (p *Process) Done() <-chan struct{} {
return p.done
}

// Close closes the Process.
func (p *Process) Close() {
// Exit closes the process.
func (p *Process) Exit() {
p.mu.Lock()
defer p.mu.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ import (

func TestNew(t *testing.T) {
proc := New()
defer proc.Close()
defer proc.Exit()

assert.NotNil(t, proc)
}

func TestProcess_ID(t *testing.T) {
proc := New()
defer proc.Close()
defer proc.Exit()

assert.NotEqual(t, ulid.ULID{}, proc.ID())
}

func TestProcess_Stack(t *testing.T) {
proc := New()
defer proc.Close()
defer proc.Exit()

assert.NotNil(t, proc.Stack())
}
Expand All @@ -37,7 +37,7 @@ func TestProcess_Close(t *testing.T) {
default:
}

proc.Close()
proc.Exit()

select {
case <-proc.Done():
Expand Down
Loading

0 comments on commit 8b9971a

Please sign in to comment.