diff --git a/cmd/pkg/cli/apply.go b/cmd/pkg/cli/apply.go index fb824619f..e04ba46e1 100644 --- a/cmd/pkg/cli/apply.go +++ b/cmd/pkg/cli/apply.go @@ -67,7 +67,7 @@ func runApplyCommand(config ApplyConfig) func(cmd *cobra.Command, args []string) } } - exists, err := config.SpecStore.Load(ctx, specs...) + ok, err := config.SpecStore.Load(ctx, specs...) if err != nil { return err } @@ -75,7 +75,7 @@ func runApplyCommand(config ApplyConfig) func(cmd *cobra.Command, args []string) var inserts []spec.Spec var updates []spec.Spec for _, sp := range specs { - if match := resourcebase.Match(sp, exists...); len(match) > 0 { + if match := resourcebase.Match(sp, ok...); len(match) > 0 { sp.SetID(match[0].GetID()) updates = append(updates, sp) } else { @@ -103,7 +103,7 @@ func runApplyCommand(config ApplyConfig) func(cmd *cobra.Command, args []string) } } - exists, err := config.SecretStore.Load(ctx, secrets...) + ok, err := config.SecretStore.Load(ctx, secrets...) if err != nil { return err } @@ -111,7 +111,7 @@ func runApplyCommand(config ApplyConfig) func(cmd *cobra.Command, args []string) var inserts []*secret.Secret var updates []*secret.Secret for _, sec := range secrets { - if match := resourcebase.Match(sec, exists...); len(match) > 0 { + if match := resourcebase.Match(sec, ok...); len(match) > 0 { sec.SetID(match[0].GetID()) updates = append(updates, sec) } else { diff --git a/cmd/pkg/cli/debug.go b/cmd/pkg/cli/debug.go index 2b7065020..47a629efc 100644 --- a/cmd/pkg/cli/debug.go +++ b/cmd/pkg/cli/debug.go @@ -253,7 +253,7 @@ func (m *debugModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { var proc *process.Process if len(args) > 1 { id, _ := uuid.FromString(args[1]) - proc, _ = m.agent.Process(id) + proc = m.agent.Process(id) } else { proc = m.debugger.Process() } @@ -277,14 +277,14 @@ func (m *debugModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { var proc *process.Process if len(args) > 1 { id, _ := uuid.FromString(args[1]) - proc, _ = m.agent.Process(id) + proc = m.agent.Process(id) } else { proc = m.debugger.Process() } var frames []*agent.Frame if proc != nil { - frames, _ = m.agent.Frames(proc.ID()) + frames = m.agent.Frames(proc.ID()) } if frames == nil { diff --git a/docs/user_extensions.md b/docs/user_extensions.md index dd9706f42..da5c180d6 100644 --- a/docs/user_extensions.md +++ b/docs/user_extensions.md @@ -334,12 +334,10 @@ This code creates a new runtime environment using the provided schema, hook, spe To integrate the runtime environment with existing services and build an executable, you need to set up the environment to run continuously or in a simpler execution mode. -### Continuous Execution - -Running the runtime environment continuously allows it to handle external requests promptly. This approach is suitable for scenarios where ongoing workflow execution is needed. - ```go func main() { + ctx := context.TODO() + specStore := spec.NewStore() secretStore := secret.NewStore() @@ -372,39 +370,10 @@ func main() { _ = r.Close() }() - r.Listen(context.TODO()) + r.Watch(ctx) + r.Load(ctx) + r.Reconcile(ctx) } ``` This code keeps the runtime environment running and responsive to external signals. It uses `os.Signal` to listen for termination signals and safely shuts down the environment. - -### Simple Execution - -In some cases, you may prefer a simpler execution mode where the runtime environment runs only when needed and then exits. This approach is useful for scenarios where you don't need continuous operation. - -```go -r := runtime.New(runtime.Config{ - Namespace: "default", - Schema: scheme, - Hook: hook, - SpecStore: specStore, - SecretStore: secretStore, -}) -defer r.Close() - -r.Load(ctx) // Load all resources - -symbols, _ := r.Load(ctx, &spec.Meta{ - Name: "main", -}) - -sb := symbols[0] - -in := port.NewOut() -defer in.Close() - -in.Link(sb.In(node.PortIn)) - -payload := types.NewString(faker.Word()) -payload, err := port.Call(in, payload) -``` diff --git a/docs/user_extensions_kr.md b/docs/user_extensions_kr.md index 1b1fa97f3..fb49d0401 100644 --- a/docs/user_extensions_kr.md +++ b/docs/user_extensions_kr.md @@ -334,12 +334,10 @@ defer r.Close() 이제 만든 런타임 환경을 기존 서비스에 통합하고, 다시 빌드하여 실행 파일을 생성해야 합니다. -### 지속 실행 - -런타임 환경을 지속적으로 유지하면 외부 요청에 즉시 대응할 수 있습니다. 각 런타임 환경은 독립적인 컨테이너에서 실행되며, 지속적인 워크플로우 실행이 필요한 시나리오에 적합합니다. - ```go func main() { + ctx := context.TODO() + specStore := spec.NewStore() secretStore := secret.NewStore() @@ -372,39 +370,10 @@ func main() { _ = r.Close() }() - r.Listen(context.TODO()) + r.Watch(ctx) + r.Load(ctx) + r.Reconcile(ctx) } ``` 위 코드에서는 런타임 환경을 지속적으로 실행하여 외부 신호에 반응하도록 설정합니다. `os.Signal`을 통해 종료 신호를 수신하면 런타임 환경을 안전하게 종료합니다. - -### 단순 실행 - -때로는 런타임 환경을 지속적으로 유지하는 대신, 필요할 때만 실행하고 종료하는 간단한 방식이 더 적합할 수 있습니다. 이럴 때는 단순 실행 방식을 사용할 수 있습니다. - -```go -r := runtime.New(runtime.Config{ - Namespace: "default", - Schema: scheme, - Hook: hook, - SpecStore: specStore, - SecretStore: secretStore, -}) -defer r.Close() - -r.Load(ctx) // 모든 리소스 로드 - -symbols, _ := r.Load(ctx, &spec.Meta{ - Name: "main", -}) - -sb := symbols[0] - -in := port.NewOut() -defer in.Close() - -in.Link(sb.In(node.PortIn)) - -payload := types.NewString(faker.Word()) -payload, err := port.Call(in, payload) -``` diff --git a/driver/mongo/pkg/secret/store.go b/driver/mongo/pkg/secret/store.go index 04760727b..3a8c81cc6 100644 --- a/driver/mongo/pkg/secret/store.go +++ b/driver/mongo/pkg/secret/store.go @@ -152,18 +152,18 @@ func (s *Store) Swap(ctx context.Context, secrets ...*secret.Secret) (int, error } defer cursor.Close(ctx) - exists := make(map[uuid.UUID]*secret.Secret) + ok := make(map[uuid.UUID]*secret.Secret) for cursor.Next(ctx) { sec := &secret.Secret{} if err := cursor.Decode(&sec); err != nil { return 0, err } - exists[sec.GetID()] = sec + ok[sec.GetID()] = sec } count := 0 for _, sec := range secrets { - exist, ok := exists[sec.GetID()] + exist, ok := ok[sec.GetID()] if !ok { continue } diff --git a/driver/mongo/pkg/spec/store.go b/driver/mongo/pkg/spec/store.go index 98b03ffc8..f4c07e58b 100644 --- a/driver/mongo/pkg/spec/store.go +++ b/driver/mongo/pkg/spec/store.go @@ -166,18 +166,18 @@ func (s *Store) Swap(ctx context.Context, specs ...spec.Spec) (int, error) { } defer cursor.Close(ctx) - exists := make(map[uuid.UUID]spec.Spec) + ok := make(map[uuid.UUID]spec.Spec) for cursor.Next(ctx) { sp := &spec.Unstructured{} if err := cursor.Decode(sp); err != nil { return 0, err } - exists[sp.GetID()] = sp + ok[sp.GetID()] = sp } count := 0 for _, sp := range specs { - exist, ok := exists[sp.GetID()] + exist, ok := ok[sp.GetID()] if !ok { continue } diff --git a/ext/pkg/control/builder_test.go b/ext/pkg/control/builder_test.go index b3fa6ded3..fcbdd8ed3 100644 --- a/ext/pkg/control/builder_test.go +++ b/ext/pkg/control/builder_test.go @@ -49,11 +49,8 @@ func TestAddToScheme(t *testing.T) { for _, tt := range tests { t.Run(tt, func(t *testing.T) { - _, ok := s.KnownType(tt) - assert.True(t, ok) - - _, ok = s.Codec(tt) - assert.True(t, ok) + assert.NotNil(t, s.KnownType(tt)) + assert.NotNil(t, s.Codec(tt)) }) } } diff --git a/ext/pkg/io/builder_test.go b/ext/pkg/io/builder_test.go index caa562a00..9ee1527c1 100644 --- a/ext/pkg/io/builder_test.go +++ b/ext/pkg/io/builder_test.go @@ -17,11 +17,8 @@ func TestAddToScheme(t *testing.T) { for _, tt := range tests { t.Run(tt, func(t *testing.T) { - _, ok := s.KnownType(tt) - assert.True(t, ok) - - _, ok = s.Codec(tt) - assert.True(t, ok) + assert.NotNil(t, s.KnownType(tt)) + assert.NotNil(t, s.Codec(tt)) }) } } diff --git a/ext/pkg/network/builder_test.go b/ext/pkg/network/builder_test.go index 0678ccc51..f445ab193 100644 --- a/ext/pkg/network/builder_test.go +++ b/ext/pkg/network/builder_test.go @@ -46,11 +46,8 @@ func TestAddToScheme(t *testing.T) { for _, tt := range tests { t.Run(tt, func(t *testing.T) { - _, ok := s.KnownType(tt) - assert.True(t, ok) - - _, ok = s.Codec(tt) - assert.True(t, ok) + assert.NotNil(t, s.KnownType(tt)) + assert.NotNil(t, s.Codec(tt)) }) } } diff --git a/ext/pkg/system/builder_test.go b/ext/pkg/system/builder_test.go index d70cffc09..471a1dc34 100644 --- a/ext/pkg/system/builder_test.go +++ b/ext/pkg/system/builder_test.go @@ -17,11 +17,8 @@ func TestAddToScheme(t *testing.T) { for _, tt := range tests { t.Run(tt, func(t *testing.T) { - _, ok := s.KnownType(tt) - assert.True(t, ok) - - _, ok = s.Codec(tt) - assert.True(t, ok) + assert.NotNil(t, s.KnownType(tt)) + assert.NotNil(t, s.Codec(tt)) }) } } diff --git a/ext/pkg/system/syscall.go b/ext/pkg/system/syscall.go index 65fbffff3..b3293ab27 100644 --- a/ext/pkg/system/syscall.go +++ b/ext/pkg/system/syscall.go @@ -46,14 +46,14 @@ func UpdateNodes(s spec.Store) func(context.Context, []spec.Spec) ([]spec.Spec, func DeleteNodes(s spec.Store) func(context.Context, []spec.Spec) ([]spec.Spec, error) { return func(ctx context.Context, specs []spec.Spec) ([]spec.Spec, error) { - exists, err := s.Load(ctx, specs...) + ok, err := s.Load(ctx, specs...) if err != nil { return nil, err } - if _, err := s.Delete(ctx, exists...); err != nil { + if _, err := s.Delete(ctx, ok...); err != nil { return nil, err } - return exists, nil + return ok, nil } } @@ -84,13 +84,13 @@ func UpdateSecrets(s secret.Store) func(context.Context, []*secret.Secret) ([]*s func DeleteSecrets(s secret.Store) func(context.Context, []*secret.Secret) ([]*secret.Secret, error) { return func(ctx context.Context, secrets []*secret.Secret) ([]*secret.Secret, error) { - exists, err := s.Load(ctx, secrets...) + ok, err := s.Load(ctx, secrets...) if err != nil { return nil, err } - if _, err := s.Delete(ctx, exists...); err != nil { + if _, err := s.Delete(ctx, ok...); err != nil { return nil, err } - return exists, nil + return ok, nil } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3688b436b..1c21e43de 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -46,7 +46,6 @@ func (a *Agent) Watch(watcher Watcher) bool { return false } } - a.watchers = append(a.watchers, watcher) return true } @@ -78,12 +77,11 @@ func (a *Agent) Symbols() []*symbol.Symbol { } // Symbol retrieves a symbol by UUID, returning the symbol and a boolean indicating existence. -func (a *Agent) Symbol(id uuid.UUID) (*symbol.Symbol, bool) { +func (a *Agent) Symbol(id uuid.UUID) *symbol.Symbol { a.mu.RLock() defer a.mu.RUnlock() - sym, exists := a.symbols[id] - return sym, exists + return a.symbols[id] } // Processes returns all managed processes. @@ -99,24 +97,19 @@ func (a *Agent) Processes() []*process.Process { } // Process retrieves a process by UUID, returning the process and a boolean indicating existence. -func (a *Agent) Process(id uuid.UUID) (*process.Process, bool) { +func (a *Agent) Process(id uuid.UUID) *process.Process { a.mu.RLock() defer a.mu.RUnlock() - proc, exists := a.processes[id] - return proc, exists + return a.processes[id] } // Frames retrieves frames for a specific process UUID, returning frames and a boolean indicating existence. -func (a *Agent) Frames(id uuid.UUID) ([]*Frame, bool) { +func (a *Agent) Frames(id uuid.UUID) []*Frame { a.mu.RLock() defer a.mu.RUnlock() - frames, exists := a.frames[id] - if !exists { - return nil, false - } - return frames[:], true + return append([]*Frame(nil), a.frames[id]...) } // Load adds a symbol and its hooks to the agent. @@ -200,33 +193,33 @@ func (a *Agent) Close() { func (a *Agent) accept(proc *process.Process) { a.mu.Lock() - _, exists := a.processes[proc.ID()] + _, ok := a.processes[proc.ID()] + if ok { + a.mu.Unlock() + return + } - if !exists { - a.processes[proc.ID()] = proc + a.processes[proc.ID()] = proc - proc.AddExitHook(process.ExitFunc(func(err error) { - a.mu.Lock() - defer a.mu.Unlock() + proc.AddExitHook(process.ExitFunc(func(err error) { + a.mu.Lock() + defer a.mu.Unlock() - delete(a.processes, proc.ID()) - delete(a.frames, proc.ID()) - })) + delete(a.processes, proc.ID()) + delete(a.frames, proc.ID()) + })) - if _, exists := a.frames[proc.ID()]; !exists { - a.frames[proc.ID()] = nil - } + if _, ok := a.frames[proc.ID()]; !ok { + a.frames[proc.ID()] = nil } - watchers := a.watchers[:] + watchers := a.watchers a.mu.Unlock() - if !exists { - for i := len(watchers) - 1; i >= 0; i-- { - watcher := watchers[i] - watcher.OnProcess(proc) - } + for i := len(watchers) - 1; i >= 0; i-- { + watcher := watchers[i] + watcher.OnProcess(proc) } } @@ -255,7 +248,7 @@ func (a *Agent) hooks(proc *process.Process, sym *symbol.Symbol, in *port.InPort a.frames[proc.ID()] = append(a.frames[proc.ID()], frame) } - watchers := a.watchers[:] + watchers := a.watchers a.mu.Unlock() @@ -289,7 +282,7 @@ func (a *Agent) hooks(proc *process.Process, sym *symbol.Symbol, in *port.InPort a.frames[proc.ID()] = append(a.frames[proc.ID()], frame) } - watchers := a.watchers[:] + watchers := a.watchers a.mu.Unlock() diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 9716e6a64..c7e895175 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -52,11 +52,8 @@ func TestAgent_Symbol(t *testing.T) { a.Load(sb) defer a.Unload(sb) - _, ok := a.Symbol(sb.ID()) - assert.True(t, ok) - - sbs := a.Symbols() - assert.Contains(t, sbs, sb) + assert.Equal(t, sb, a.Symbol(sb.ID())) + assert.Contains(t, a.Symbols(), sb) } func TestAgent_Process(t *testing.T) { @@ -67,11 +64,8 @@ func TestAgent_Process(t *testing.T) { a.Watch(NewProcessWatcher(func(proc *process.Process) { defer close(done) - _, ok := a.Process(proc.ID()) - assert.True(t, ok) - - procs := a.Processes() - assert.Contains(t, procs, proc) + assert.Equal(t, proc, a.Process(proc.ID())) + assert.Contains(t, a.Processes(), proc) })) sb := &symbol.Symbol{ @@ -86,6 +80,7 @@ func TestAgent_Process(t *testing.T) { defer sb.Close() in := sb.In(node.PortIn) + out := sb.Out(node.PortOut) a.Load(sb) defer a.Unload(sb) @@ -94,6 +89,7 @@ func TestAgent_Process(t *testing.T) { defer proc.Exit(nil) in.Open(proc) + out.Open(proc) <-done } @@ -104,11 +100,9 @@ func TestAgent_Frames(t *testing.T) { count := 0 a.Watch(NewFrameWatcher(func(frame *Frame) { - frames, ok := a.Frames(frame.Process.ID()) - assert.True(t, ok) - assert.Contains(t, frames, frame) - count += 1 + + assert.Contains(t, a.Frames(frame.Process.ID()), frame) })) sb := &symbol.Symbol{ @@ -124,10 +118,14 @@ func TestAgent_Frames(t *testing.T) { } defer sb.Close() - out := port.NewOut() + in := port.NewOut() + defer in.Close() + + out := port.NewIn() defer out.Close() - out.Link(sb.In(node.PortIn)) + in.Link(sb.In(node.PortIn)) + sb.Out(node.PortOut).Link(out) a.Load(sb) defer a.Unload(sb) @@ -135,12 +133,16 @@ func TestAgent_Frames(t *testing.T) { proc := process.New() defer proc.Exit(nil) - writer := out.Open(proc) + inWriter := in.Open(proc) + outReader := out.Open(proc) pck := packet.New(nil) - writer.Write(pck) - <-writer.Receive() + inWriter.Write(pck) + <-outReader.Read() + + outReader.Receive(pck) + <-inWriter.Receive() - assert.Equal(t, 2, count) + assert.Equal(t, 4, count) } diff --git a/pkg/agent/frame.go b/pkg/agent/frame.go index 4c82a162b..657d85efc 100644 --- a/pkg/agent/frame.go +++ b/pkg/agent/frame.go @@ -9,14 +9,17 @@ import ( "github.com/siyul-park/uniflow/pkg/symbol" ) -// Frame represents a processing frame that links a process to its input and output packets. +// Frame represents a processing unit that links a process with its input and output packets. type Frame struct { - Process *process.Process // The process associated with this frame. - Symbol *symbol.Symbol // The symbol being processed. - InPort *port.InPort // The input port that received the packet. - OutPort *port.OutPort // The output port that will send the packet. - InPck *packet.Packet // The input packet being processed. - OutPck *packet.Packet // The output packet generated from processing. - InTime time.Time // The timestamp when the input packet was received. - OutTime time.Time // The timestamp when the output packet was sent. + Process *process.Process // The associated process handling this frame. + Symbol *symbol.Symbol // The symbol or metadata relevant to the current processing state. + + InPort *port.InPort // Input port that receives the packet. + OutPort *port.OutPort // Output port that sends the processed packet. + + InPck *packet.Packet // The incoming packet being processed. + OutPck *packet.Packet // The outgoing packet generated after processing. + + InTime time.Time // Timestamp when the input packet was received. + OutTime time.Time // Timestamp when the output packet was sent. } diff --git a/pkg/debug/breakpoint.go b/pkg/debug/breakpoint.go index edef7f467..b92e9d115 100644 --- a/pkg/debug/breakpoint.go +++ b/pkg/debug/breakpoint.go @@ -15,10 +15,12 @@ type Breakpoint struct { symbol *symbol.Symbol inPort *port.InPort outPort *port.OutPort - frame *agent.Frame - next chan *agent.Frame - done chan *agent.Frame - mu sync.RWMutex + current *agent.Frame + in chan *agent.Frame + out chan *agent.Frame + done chan struct{} + rmu sync.RWMutex + wmu sync.Mutex } var _ agent.Watcher = (*Breakpoint)(nil) @@ -26,8 +28,9 @@ var _ agent.Watcher = (*Breakpoint)(nil) // NewBreakpoint creates a new Breakpoint with optional configurations. func NewBreakpoint(options ...func(*Breakpoint)) *Breakpoint { b := &Breakpoint{ - next: make(chan *agent.Frame), - done: make(chan *agent.Frame), + in: make(chan *agent.Frame), + out: make(chan *agent.Frame), + done: make(chan struct{}), } for _, opt := range options { opt(b) @@ -59,35 +62,45 @@ func WithOutPort(port *port.OutPort) func(*Breakpoint) { func (b *Breakpoint) Next() bool { b.Done() - frame, ok := <-b.next + b.rmu.Lock() + defer b.rmu.Unlock() - b.mu.Lock() - b.frame = frame - b.mu.Unlock() - return ok + if b.current != nil { + return false + } + + select { + case b.current = <-b.in: + return true + case <-b.done: + return false + } } // Done completes the current frame's processing. func (b *Breakpoint) Done() bool { - b.mu.Lock() - frame := b.frame - b.frame = nil - b.mu.Unlock() + b.rmu.Lock() + defer b.rmu.Unlock() - if frame == nil { - return false + if b.current == nil { + return true } - b.done <- frame - return true + select { + case b.out <- b.current: + b.current = nil + return true + case <-b.done: + return false + } } // Frame returns the current frame under lock protection. func (b *Breakpoint) Frame() *agent.Frame { - b.mu.RLock() - defer b.mu.RUnlock() + b.rmu.RLock() + defer b.rmu.RUnlock() - return b.frame + return b.current } // Process returns the associated process. @@ -113,8 +126,15 @@ func (b *Breakpoint) OutPort() *port.OutPort { // OnFrame processes an incoming frame and synchronizes it. func (b *Breakpoint) OnFrame(frame *agent.Frame) { if b.matches(frame) { - b.next <- frame - <-b.done + select { + case b.in <- frame: + case <-b.done: + } + + select { + case <-b.out: + case <-b.done: + } } } @@ -123,13 +143,21 @@ func (b *Breakpoint) OnProcess(*process.Process) {} // Close cleans up resources. func (b *Breakpoint) Close() { - b.mu.Lock() - defer b.mu.Unlock() + b.wmu.Lock() + defer b.wmu.Unlock() - b.frame = nil + select { + case <-b.done: + return + default: + } - close(b.next) close(b.done) + + b.rmu.Lock() + defer b.rmu.Unlock() + + b.current = nil } func (b *Breakpoint) matches(frame *agent.Frame) bool { diff --git a/pkg/debug/debugger.go b/pkg/debug/debugger.go index 8f13d3071..3b3018afc 100644 --- a/pkg/debug/debugger.go +++ b/pkg/debug/debugger.go @@ -14,24 +14,25 @@ type Debugger struct { agent *agent.Agent breakpoints []*Breakpoint current *Breakpoint - nexts chan *Breakpoint + in chan *Breakpoint done chan struct{} - mu sync.RWMutex + rmu sync.RWMutex + wmu sync.RWMutex } // NewDebugger creates a new Debugger instance with the specified agent. func NewDebugger(agent *agent.Agent) *Debugger { return &Debugger{ agent: agent, - nexts: make(chan *Breakpoint), + in: make(chan *Breakpoint), done: make(chan struct{}), } } // AddBreakpoint adds a breakpoint and starts monitoring it. func (d *Debugger) AddBreakpoint(bp *Breakpoint) bool { - d.mu.Lock() - defer d.mu.Unlock() + d.wmu.Lock() + defer d.wmu.Unlock() for _, b := range d.breakpoints { if b == bp { @@ -48,8 +49,8 @@ func (d *Debugger) AddBreakpoint(bp *Breakpoint) bool { // RemoveBreakpoint deletes the specified breakpoint. func (d *Debugger) RemoveBreakpoint(bp *Breakpoint) bool { - d.mu.Lock() - defer d.mu.Unlock() + d.wmu.Lock() + defer d.wmu.Unlock() for i, b := range d.breakpoints { if b == bp { @@ -65,23 +66,23 @@ func (d *Debugger) RemoveBreakpoint(bp *Breakpoint) bool { // Breakpoints returns all registered breakpoints. func (d *Debugger) Breakpoints() []*Breakpoint { - d.mu.RLock() - defer d.mu.RUnlock() + d.wmu.RLock() + defer d.wmu.RUnlock() return d.breakpoints[:] } // Pause blocks until a breakpoint is hit or monitoring is done. func (d *Debugger) Pause(ctx context.Context) bool { - d.mu.Lock() - defer d.mu.Unlock() + d.rmu.Lock() + defer d.rmu.Unlock() if d.current != nil { return true } select { - case d.current = <-d.nexts: + case d.current = <-d.in: return true case <-d.done: return false @@ -92,15 +93,15 @@ func (d *Debugger) Pause(ctx context.Context) bool { // Step continues execution until the next breakpoint is hit. func (d *Debugger) Step(ctx context.Context) bool { - d.mu.Lock() - defer d.mu.Unlock() + d.rmu.Lock() + defer d.rmu.Unlock() if d.current != nil { go d.next(d.current) } select { - case d.current = <-d.nexts: + case d.current = <-d.in: return true case <-d.done: return false @@ -111,16 +112,16 @@ func (d *Debugger) Step(ctx context.Context) bool { // Breakpoint returns the currently active breakpoint. func (d *Debugger) Breakpoint() *Breakpoint { - d.mu.RLock() - defer d.mu.RUnlock() + d.rmu.RLock() + defer d.rmu.RUnlock() return d.current } // Frame returns the frame of the current breakpoint. func (d *Debugger) Frame() *agent.Frame { - d.mu.RLock() - defer d.mu.RUnlock() + d.rmu.RLock() + defer d.rmu.RUnlock() if d.current != nil { return d.current.Frame() @@ -130,8 +131,8 @@ func (d *Debugger) Frame() *agent.Frame { // Process retrieves the process linked to the current breakpoint. func (d *Debugger) Process() *process.Process { - d.mu.RLock() - defer d.mu.RUnlock() + d.rmu.RLock() + defer d.rmu.RUnlock() if d.current != nil { frame := d.current.Frame() @@ -144,8 +145,8 @@ func (d *Debugger) Process() *process.Process { // Symbol retrieves the symbol for the frame at the current breakpoint. func (d *Debugger) Symbol() *symbol.Symbol { - d.mu.RLock() - defer d.mu.RUnlock() + d.rmu.RLock() + defer d.rmu.RUnlock() if d.current != nil { frame := d.current.Frame() @@ -159,28 +160,32 @@ func (d *Debugger) Symbol() *symbol.Symbol { // Close stops monitoring breakpoints and releases resources. func (d *Debugger) Close() { - d.mu.Lock() - defer d.mu.Unlock() + d.wmu.Lock() + defer d.wmu.Unlock() select { case <-d.done: return default: - close(d.done) + } - for _, bp := range d.breakpoints { - bp.Close() - } - d.breakpoints = nil + close(d.done) - close(d.nexts) + for _, bp := range d.breakpoints { + bp.Close() } + d.breakpoints = nil + + d.rmu.Lock() + defer d.rmu.Unlock() + + d.current = nil } func (d *Debugger) next(bp *Breakpoint) { if bp.Next() { select { - case d.nexts <- bp: + case d.in <- bp: case <-d.done: } } diff --git a/pkg/encoding/assembler.go b/pkg/encoding/assembler.go index 8c1758805..99949c4f4 100644 --- a/pkg/encoding/assembler.go +++ b/pkg/encoding/assembler.go @@ -68,21 +68,22 @@ func (a *EncodeAssembler[S, T]) Compile(typ reflect.Type) (Encoder[S, T], error) return enc.(Encoder[S, T]), nil } - encoderGroup := NewEncoderGroup[S, T]() - a.encoders.Store(typ, encoderGroup) - + encoders := make([]Encoder[S, T], 0, len(a.compilers)) for _, compiler := range a.compilers { if enc, err := compiler.Compile(typ); err == nil { - encoderGroup.Add(enc) + encoders = append(encoders, enc) } } - - if encoderGroup.Len() == 0 { - a.encoders.Delete(typ) + if len(encoders) == 0 { return nil, errors.WithStack(ErrUnsupportedType) } - return encoderGroup, nil + group := NewEncoderGroup[S, T]() + for _, enc := range encoders { + group.Add(enc) + } + a.encoders.Store(typ, group) + return group, nil } // NewDecodeAssembler creates a new DecodeAssembler instance. @@ -127,7 +128,7 @@ func (a *DecodeAssembler[S, T]) Compile(typ reflect.Type) (Decoder[S, unsafe.Poi return dec.(Decoder[S, unsafe.Pointer]), nil } - var decoders []Decoder[S, unsafe.Pointer] + decoders := make([]Decoder[S, unsafe.Pointer], 0, len(a.compilers)) for _, compiler := range a.compilers { if dec, err := compiler.Compile(typ); err == nil { decoders = append(decoders, dec) @@ -137,11 +138,10 @@ func (a *DecodeAssembler[S, T]) Compile(typ reflect.Type) (Decoder[S, unsafe.Poi return nil, errors.WithStack(ErrUnsupportedType) } - decoderGroup := NewDecoderGroup[S, unsafe.Pointer]() + group := NewDecoderGroup[S, unsafe.Pointer]() for _, dec := range decoders { - decoderGroup.Add(dec) + group.Add(dec) } - - a.decoders.Store(typ, decoderGroup) - return decoderGroup, nil + a.decoders.Store(typ, group) + return group, nil } diff --git a/pkg/encoding/assembler_test.go b/pkg/encoding/assembler_test.go index 90b89c56f..97ebd20f8 100644 --- a/pkg/encoding/assembler_test.go +++ b/pkg/encoding/assembler_test.go @@ -23,7 +23,7 @@ func TestEncodeAssembler_Compile(t *testing.T) { a := NewEncodeAssembler[any, any]() a.Add(EncodeCompilerFunc[any, any](func(typ reflect.Type) (Encoder[any, any], error) { if typ.Kind() == reflect.String { - return EncodeFunc[any, any](func(source any) (any, error) { + return EncodeFunc(func(source any) (any, error) { return source, nil }), nil } @@ -40,7 +40,7 @@ func TestEncodeAssembler_Encode(t *testing.T) { a := NewEncodeAssembler[any, any]() a.Add(EncodeCompilerFunc[any, any](func(typ reflect.Type) (Encoder[any, any], error) { if typ.Kind() == reflect.String { - return EncodeFunc[any, any](func(source any) (any, error) { + return EncodeFunc(func(source any) (any, error) { return source, nil }), nil } @@ -66,7 +66,7 @@ func TestDecodeAssembler_Compile(t *testing.T) { a := NewDecodeAssembler[any, any]() a.Add(DecodeCompilerFunc[any](func(typ reflect.Type) (Decoder[any, unsafe.Pointer], error) { if typ.Kind() == reflect.Pointer && typ.Elem().Kind() == reflect.String { - return DecodeFunc[any, unsafe.Pointer](func(source any, target unsafe.Pointer) error { + return DecodeFunc(func(source any, target unsafe.Pointer) error { return nil }), nil } @@ -83,7 +83,7 @@ func TestDecodeAssembler_Decode(t *testing.T) { a := NewDecodeAssembler[any, any]() a.Add(DecodeCompilerFunc[any](func(typ reflect.Type) (Decoder[any, unsafe.Pointer], error) { if typ.Kind() == reflect.Pointer && typ.Elem().Kind() == reflect.String { - return DecodeFunc[any, unsafe.Pointer](func(source any, target unsafe.Pointer) error { + return DecodeFunc(func(source any, target unsafe.Pointer) error { if s, ok := source.(*string); ok { *(*string)(target) = *s return nil diff --git a/pkg/encoding/decoder.go b/pkg/encoding/decoder.go index 95dbc0db7..25d5348e5 100644 --- a/pkg/encoding/decoder.go +++ b/pkg/encoding/decoder.go @@ -6,12 +6,18 @@ type Decoder[S, T any] interface { Decode(source S, target T) error } -// DecodeFunc is a function type that implements the Decoder interface. -type DecodeFunc[S, T any] func(source S, target T) error +type decoder[S, T any] struct { + decode func(source S, target T) error +} + +var _ Decoder[any, any] = (*decoder[any, any])(nil) -var _ Decoder[any, any] = (DecodeFunc[any, any])(nil) +// DecodeFunc returns a Decoder implemented via a function. +func DecodeFunc[S, T any](decode func(source S, target T) error) Decoder[S, T] { + return &decoder[S, T]{decode: decode} +} -// Decode calls the underlying function to perform decoding. -func (f DecodeFunc[S, T]) Decode(source S, target T) error { - return f(source, target) +// Decode calls the underlying decode function. +func (d *decoder[S, T]) Decode(source S, target T) error { + return d.decode(source, target) } diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go index 4edd40586..1b9b2e306 100644 --- a/pkg/encoding/encoder.go +++ b/pkg/encoding/encoder.go @@ -6,12 +6,18 @@ type Encoder[S, T any] interface { Encode(source S) (T, error) } -// EncodeFunc is a function type that implements the Encoder interface. -type EncodeFunc[S, T any] func(source S) (T, error) +type encoder[S, T any] struct { + encode func(source S) (T, error) +} + +var _ Encoder[any, any] = (*encoder[any, any])(nil) -var _ Encoder[any, any] = (EncodeFunc[any, any])(nil) +// EncodeFunc creates an Encoder instance from a provided function. +func EncodeFunc[S, T any](encode func(source S) (T, error)) Encoder[S, T] { + return &encoder[S, T]{encode: encode} +} -// Encode calls the underlying function to perform encoding. -func (f EncodeFunc[S, T]) Encode(source S) (T, error) { - return f(source) +// Encode calls the underlying encode function. +func (e *encoder[S, T]) Encode(source S) (T, error) { + return e.encode(source) } diff --git a/pkg/encoding/group.go b/pkg/encoding/group.go index 26184b091..8d7932bbb 100644 --- a/pkg/encoding/group.go +++ b/pkg/encoding/group.go @@ -34,11 +34,17 @@ func NewEncoderGroup[S, T any]() *EncoderGroup[S, T] { } // Add adds an encoder to the group. -func (g *EncoderGroup[S, T]) Add(encoder Encoder[S, T]) { +func (g *EncoderGroup[S, T]) Add(encoder Encoder[S, T]) bool { g.mu.Lock() defer g.mu.Unlock() + for _, enc := range g.encoders { + if enc == encoder { + return false + } + } g.encoders = append(g.encoders, encoder) + return true } // Len returns the number of encoders in the group. @@ -67,11 +73,17 @@ func (g *EncoderGroup[S, T]) Encode(source S) (T, error) { } // Add adds a decoder to the group. -func (g *DecoderGroup[S, T]) Add(decoder Decoder[S, T]) { +func (g *DecoderGroup[S, T]) Add(decoder Decoder[S, T]) bool { g.mu.Lock() defer g.mu.Unlock() + for _, dec := range g.decoders { + if dec == decoder { + return false + } + } g.decoders = append(g.decoders, decoder) + return true } // Len returns the number of decoders in the group. diff --git a/pkg/encoding/group_test.go b/pkg/encoding/group_test.go index 08d27fb7e..799e7c169 100644 --- a/pkg/encoding/group_test.go +++ b/pkg/encoding/group_test.go @@ -9,6 +9,37 @@ import ( "github.com/stretchr/testify/assert" ) +func TestNewEncoderGroup(t *testing.T) { + e := NewEncoderGroup[any, any]() + assert.NotNil(t, e) +} + +func TestEncoderGroup_Add(t *testing.T) { + e := NewEncoderGroup[any, any]() + e.Add(EncodeFunc(func(source any) (any, error) { + return nil, errors.WithStack(ErrUnsupportedType) + })) + assert.Equal(t, 1, e.Len()) +} + +func TestEncoderGroup_Encode(t *testing.T) { + e := NewEncoderGroup[string, string]() + + suffix := faker.UUIDHyphenated() + e.Add(EncodeFunc(func(source string) (string, error) { + if source == "" { + return "", errors.WithStack(ErrUnsupportedType) + } + return source + suffix, nil + })) + + v := faker.UUIDHyphenated() + res, err := e.Encode(v) + + assert.NoError(t, err) + assert.Equal(t, v+suffix, res) +} + func TestNewDecoderGroup(t *testing.T) { e := NewDecoderGroup[any, any]() assert.NotNil(t, e) @@ -16,7 +47,7 @@ func TestNewDecoderGroup(t *testing.T) { func TestDecoderGroup_Add(t *testing.T) { e := NewDecoderGroup[any, any]() - e.Add(DecodeFunc[any, any](func(_ any, _ any) error { + e.Add(DecodeFunc(func(_ any, _ any) error { return errors.WithStack(ErrUnsupportedType) })) @@ -30,7 +61,7 @@ func TestDecoderGroup_Decode(t *testing.T) { var res string suffix := faker.UUIDHyphenated() - e.Add(DecodeFunc[any, any](func(source any, target any) error { + e.Add(DecodeFunc(func(source any, target any) error { if s, ok := source.(string); ok { if t, ok := target.(*string); ok { if strings.HasSuffix(s, suffix) { diff --git a/pkg/hook/builder.go b/pkg/hook/builder.go index 1468a7365..9f5c8a5ca 100644 --- a/pkg/hook/builder.go +++ b/pkg/hook/builder.go @@ -11,9 +11,9 @@ func NewBuilder(registers ...Register) Builder { } // AddToHooks adds all registered hook functions to the provided Hook instance. -func (b Builder) AddToHooks(h *Hook) error { +func (b Builder) AddToHooks(hook *Hook) error { for _, f := range b { - if err := f.AddToHooks(h); err != nil { + if err := f.AddToHooks(hook); err != nil { return err } } diff --git a/pkg/node/manytoone.go b/pkg/node/manytoone.go index 90e5b8177..17a585a9e 100644 --- a/pkg/node/manytoone.go +++ b/pkg/node/manytoone.go @@ -74,9 +74,9 @@ func (n *ManyToOneNode) Out(name string) *port.OutPort { return n.outPort case PortErr: return n.errPort + default: + return nil } - - return nil } // Close closes all ports and releases resources. diff --git a/pkg/node/manytoone_test.go b/pkg/node/manytoone_test.go index fc17f1aec..0ea6ca8d7 100644 --- a/pkg/node/manytoone_test.go +++ b/pkg/node/manytoone_test.go @@ -21,7 +21,7 @@ func TestNewManyToOneNode(t *testing.T) { func TestManyToOneNode_Port(t *testing.T) { n := NewManyToOneNode(nil) - assert.NotNil(t, n) + defer n.Close() assert.NotNil(t, n.In(PortWithIndex(PortIn, 0))) assert.NotNil(t, n.Out(PortOut)) diff --git a/pkg/node/node.go b/pkg/node/node.go index 287337b54..67cc8ad57 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -4,10 +4,7 @@ import "github.com/siyul-park/uniflow/pkg/port" // Node represents a unit that processes packets with input and output ports. type Node interface { - // In returns the input port identified by 'name'. - In(name string) *port.InPort - // Out returns the output port identified by 'name'. - Out(name string) *port.OutPort - // Close terminates the node and returns any encountered error. - Close() error + In(name string) *port.InPort // Returns the input port by name. + Out(name string) *port.OutPort // Returns the output port by name. + Close() error // Closes the node and returns any error. } diff --git a/pkg/node/onetomany.go b/pkg/node/onetomany.go index 2fd0a5d2c..1ba21a4b3 100644 --- a/pkg/node/onetomany.go +++ b/pkg/node/onetomany.go @@ -47,8 +47,8 @@ func (n *OneToManyNode) In(name string) *port.InPort { case PortIn: return n.inPort default: + return nil } - return nil } // Out returns the output port with the specified name. diff --git a/pkg/node/onetoone.go b/pkg/node/onetoone.go index d162681e8..1563b16e6 100644 --- a/pkg/node/onetoone.go +++ b/pkg/node/onetoone.go @@ -42,8 +42,8 @@ func (n *OneToOneNode) In(name string) *port.InPort { case PortIn: return n.inPort default: + return nil } - return nil } // Out returns the output port for the specified name. @@ -54,8 +54,8 @@ func (n *OneToOneNode) Out(name string) *port.OutPort { case PortErr: return n.errPort default: + return nil } - return nil } // Close closes all ports and releases resources. diff --git a/pkg/node/port.go b/pkg/node/port.go index 60cf83dd4..ba7ec0661 100644 --- a/pkg/node/port.go +++ b/pkg/node/port.go @@ -17,27 +17,25 @@ const ( var portExp = regexp.MustCompile(`(\w+)\[(\d+)\]`) -// PortWithIndex returns the full port name formatted as "name[index]". +// PortWithIndex formats the port name as "name[index]". func PortWithIndex(name string, index int) string { return fmt.Sprintf("%s[%d]", name, index) } -// NameOfPort extracts and returns the base name from a port name formatted as "name[index]". +// NameOfPort extracts the base name from a port name formatted as "name[index]". func NameOfPort(name string) string { - groups := portExp.FindStringSubmatch(name) - if groups == nil { - return name + if groups := portExp.FindStringSubmatch(name); groups != nil { + return groups[1] } - return groups[1] + return name } -// IndexOfPort extracts the index from a port name formatted as "name[index]" and returns it with a boolean indicating success. +// IndexOfPort extracts the index from a port name formatted as "name[index]". func IndexOfPort(name string) (int, bool) { - groups := portExp.FindStringSubmatch(name) - if groups == nil || len(groups) < 2 { - return 0, false + if groups := portExp.FindStringSubmatch(name); len(groups) == 3 { + if index, err := strconv.Atoi(groups[2]); err == nil { + return index, true + } } - - index, err := strconv.Atoi(groups[2]) - return index, err == nil + return 0, false } diff --git a/pkg/packet/hook.go b/pkg/packet/hook.go index 4e00b4de2..bafa41aeb 100644 --- a/pkg/packet/hook.go +++ b/pkg/packet/hook.go @@ -1,8 +1,8 @@ package packet -// Hook defines an interface for handling packets. +// Hook defines an interface for processing packets. type Hook interface { - // Handle processes the given packet. + // Handle processes the specified packet. Handle(*Packet) } @@ -12,7 +12,7 @@ type hook struct { var _ Hook = (*hook)(nil) -// HookFunc creates a new Hook from the provided function. +// HookFunc creates a new Hook using the provided function. func HookFunc(handle func(*Packet)) Hook { return &hook{handle: handle} } diff --git a/pkg/packet/packet_test.go b/pkg/packet/packet_test.go index 897109eb3..d416fe143 100644 --- a/pkg/packet/packet_test.go +++ b/pkg/packet/packet_test.go @@ -13,7 +13,7 @@ func TestNew(t *testing.T) { } func TestMerge(t *testing.T) { - t.Run("EOF", func(t *testing.T) { + t.Run("None", func(t *testing.T) { res := Merge([]*Packet{None, None}) assert.Equal(t, None, res) }) diff --git a/pkg/packet/reader.go b/pkg/packet/reader.go index b77e85fb1..f6eeccc55 100644 --- a/pkg/packet/reader.go +++ b/pkg/packet/reader.go @@ -93,7 +93,6 @@ func (r *Reader) AddOutboundHook(hook Hook) bool { return false } } - r.outboundHooks = append(r.outboundHooks, hook) return true } diff --git a/pkg/packet/writer.go b/pkg/packet/writer.go index 97e347f13..a72a48921 100644 --- a/pkg/packet/writer.go +++ b/pkg/packet/writer.go @@ -118,7 +118,6 @@ func (w *Writer) AddOutboundHook(hook Hook) bool { return false } } - w.outboundHooks = append(w.outboundHooks, hook) return true } diff --git a/pkg/port/inport.go b/pkg/port/inport.go index c5ed8905f..b03066c04 100644 --- a/pkg/port/inport.go +++ b/pkg/port/inport.go @@ -32,12 +32,11 @@ func (p *InPort) AddHook(hook Hook) bool { return false } } - p.hooks = append(p.hooks, hook) return true } -// RemoveHook removes a hook from the port if it exists. +// RemoveHook removes a hook from the port if it ok. func (p *InPort) RemoveHook(hook Hook) bool { p.mu.Lock() defer p.mu.Unlock() @@ -61,18 +60,16 @@ func (p *InPort) AddListener(listener Listener) bool { return false } } - p.listeners = append(p.listeners, listener) return true } // Open prepares the input port for a given process and returns a reader. -// If a reader for the process already exists, it is returned. Otherwise, a new reader is created. func (p *InPort) Open(proc *process.Process) *packet.Reader { p.mu.Lock() - reader, exists := p.readers[proc] - if exists { + reader, ok := p.readers[proc] + if ok { p.mu.Unlock() return reader } diff --git a/pkg/port/outport.go b/pkg/port/outport.go index 60e9e5377..0a643d50e 100644 --- a/pkg/port/outport.go +++ b/pkg/port/outport.go @@ -59,7 +59,6 @@ func (p *OutPort) AddHook(hook Hook) bool { return false } } - p.hooks = append(p.hooks, hook) return true } @@ -88,7 +87,6 @@ func (p *OutPort) AddListener(listener Listener) bool { return false } } - p.listeners = append(p.listeners, listener) return true } @@ -123,7 +121,6 @@ func (p *OutPort) Unlink(in *InPort) { } // Open opens the output port for the given process and returns a writer. -// It connects the writer to all linked input ports and starts data listeners. func (p *OutPort) Open(proc *process.Process) *packet.Writer { writer, ok := func() (*packet.Writer, bool) { p.mu.Lock() diff --git a/pkg/process/local.go b/pkg/process/local.go index 284ddd23b..ec75094ec 100644 --- a/pkg/process/local.go +++ b/pkg/process/local.go @@ -21,14 +21,14 @@ func NewLocal[T any]() *Local[T] { func (l *Local[T]) Watch(proc *Process, watch func(T)) { l.mu.Lock() - v, exists := l.data[proc] - if !exists { + v, ok := l.data[proc] + if !ok { l.watches[proc] = append(l.watches[proc], watch) } l.mu.Unlock() - if exists { + if ok { watch(v) } } @@ -50,24 +50,24 @@ func (l *Local[T]) Load(proc *Process) (T, bool) { l.mu.RLock() defer l.mu.RUnlock() - val, exists := l.data[proc] - return val, exists + val, ok := l.data[proc] + return val, ok } // Store associates a value with the specified process. func (l *Local[T]) Store(proc *Process, val T) { l.mu.Lock() - _, exists := l.data[proc] + _, ok := l.data[proc] l.data[proc] = val - if !exists { + if !ok { proc.AddExitHook(ExitFunc(func(err error) { l.Delete(proc) })) } - watches := l.watches[proc] + watches := l.watches[proc][:] delete(l.watches, proc) l.mu.Unlock() @@ -82,12 +82,12 @@ func (l *Local[T]) Delete(proc *Process) bool { l.mu.Lock() defer l.mu.Unlock() - _, exists := l.data[proc] + _, ok := l.data[proc] delete(l.data, proc) delete(l.watches, proc) - return exists + return ok } // LoadOrStore retrieves the value for the specified process or stores a new value if absent. @@ -95,7 +95,7 @@ func (l *Local[T]) LoadOrStore(proc *Process, val func() (T, error)) (T, error) l.mu.Lock() defer l.mu.Unlock() - if v, exists := l.data[proc]; exists { + if v, ok := l.data[proc]; ok { return v, nil } diff --git a/pkg/process/process.go b/pkg/process/process.go index cb7938520..62bd0700a 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -166,20 +166,19 @@ func (p *Process) Exit(err error) { // AddExitHook adds an exit hook to run when the process terminates. func (p *Process) AddExitHook(hook ExitHook) bool { - p.mu.Lock() - defer p.mu.Unlock() - - if p.status == StatusTerminated { - go hook.Exit(p.err) + if p.Status() == StatusTerminated { + hook.Exit(p.Err()) return false } + p.mu.Lock() + defer p.mu.Unlock() + for _, h := range p.exitHooks { if h == hook { return false } } - p.exitHooks = append(p.exitHooks, hook) return true } diff --git a/pkg/resource/store.go b/pkg/resource/store.go index 873735e3a..d8224dcef 100644 --- a/pkg/resource/store.go +++ b/pkg/resource/store.go @@ -220,7 +220,7 @@ func (s *store[T]) match(resource T, examples ...T) bool { } func (s *store[T]) insert(res T) bool { - if _, exists := s.data[res.GetID()]; exists { + if _, ok := s.data[res.GetID()]; ok { return false } diff --git a/pkg/resource/store_test.go b/pkg/resource/store_test.go index 0aec4bbea..efa11078e 100644 --- a/pkg/resource/store_test.go +++ b/pkg/resource/store_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/go-faker/faker/v4" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" ) @@ -33,6 +34,7 @@ func TestStore_Watch(t *testing.T) { meta := &Meta{ ID: uuid.Must(uuid.NewV7()), Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } _, _ = st.Store(ctx, meta) @@ -47,10 +49,14 @@ func TestStore_Load(t *testing.T) { st := NewStore[*Meta]() meta1 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } meta2 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } count, err := st.Store(ctx, meta1, meta2) @@ -69,10 +75,14 @@ func TestStore_Store(t *testing.T) { st := NewStore[*Meta]() meta1 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } meta2 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } count, err := st.Store(ctx, meta1, meta2) @@ -91,10 +101,14 @@ func TestStore_Swap(t *testing.T) { st := NewStore[*Meta]() meta1 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } meta2 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } count, err := st.Store(ctx, meta1, meta2) @@ -117,10 +131,14 @@ func TestStore_Delete(t *testing.T) { st := NewStore[*Meta]() meta1 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } meta2 := &Meta{ - ID: uuid.Must(uuid.NewV7()), + ID: uuid.Must(uuid.NewV7()), + Namespace: DefaultNamespace, + Name: faker.UUIDHyphenated(), } count, err := st.Store(ctx, meta1, meta2) diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index d58a745d5..70e3a24d6 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -116,7 +116,7 @@ func TestRuntime_Reconcile(t *testing.T) { assert.NoError(t, ctx.Err()) return default: - if sb, ok := a.Symbol(meta.GetID()); ok { + if sb := a.Symbol(meta.GetID()); sb != nil { assert.Equal(t, meta.GetID(), sb.ID()) assert.Equal(t, sec.Data, sb.Env()["key"][0].Value) return @@ -135,7 +135,7 @@ func TestRuntime_Reconcile(t *testing.T) { assert.NoError(t, ctx.Err()) return default: - if _, ok := a.Symbol(meta.GetID()); !ok { + if sb := a.Symbol(meta.GetID()); sb == nil { return } } @@ -206,7 +206,7 @@ func TestRuntime_Reconcile(t *testing.T) { assert.NoError(t, ctx.Err()) return default: - if sb, ok := a.Symbol(meta.GetID()); ok { + if sb := a.Symbol(meta.GetID()); sb != nil { if sec.Data == sb.Env()["key"][0].Value { return } @@ -225,7 +225,7 @@ func TestRuntime_Reconcile(t *testing.T) { assert.NoError(t, ctx.Err()) return default: - if _, ok := a.Symbol(meta.GetID()); !ok { + if sb := a.Symbol(meta.GetID()); sb == nil { return } } diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index da09c8a35..05c938da0 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -37,7 +37,7 @@ func (s *Scheme) AddKnownType(kind string, sp spec.Spec) bool { s.mu.Lock() defer s.mu.Unlock() - if _, exists := s.types[kind]; exists { + if _, ok := s.types[kind]; ok { return false } s.types[kind] = reflect.TypeOf(sp) @@ -49,7 +49,7 @@ func (s *Scheme) RemoveKnownType(kind string) bool { s.mu.Lock() defer s.mu.Unlock() - if _, exists := s.types[kind]; exists { + if _, ok := s.types[kind]; ok { delete(s.types, kind) return true } @@ -57,12 +57,11 @@ func (s *Scheme) RemoveKnownType(kind string) bool { } // KnownType retrieves the type of the spec associated with the given kind. -func (s *Scheme) KnownType(kind string) (reflect.Type, bool) { +func (s *Scheme) KnownType(kind string) reflect.Type { s.mu.RLock() defer s.mu.RUnlock() - typ, exists := s.types[kind] - return typ, exists + return s.types[kind] } // AddCodec associates a codec with a specific kind and returns true if successful. @@ -70,7 +69,7 @@ func (s *Scheme) AddCodec(kind string, codec Codec) bool { s.mu.Lock() defer s.mu.Unlock() - if _, exists := s.codecs[kind]; exists { + if _, ok := s.codecs[kind]; ok { return false } s.codecs[kind] = codec @@ -82,7 +81,7 @@ func (s *Scheme) RemoveCodec(kind string) bool { s.mu.Lock() defer s.mu.Unlock() - if _, exists := s.codecs[kind]; exists { + if _, ok := s.codecs[kind]; ok { delete(s.codecs, kind) return true } @@ -90,12 +89,11 @@ func (s *Scheme) RemoveCodec(kind string) bool { } // Codec retrieves the codec associated with the given kind. -func (s *Scheme) Codec(kind string) (Codec, bool) { +func (s *Scheme) Codec(kind string) Codec { s.mu.RLock() defer s.mu.RUnlock() - codec, exists := s.codecs[kind] - return codec, exists + return s.codecs[kind] } // Compile decodes the given spec into node using the associated codec. @@ -103,8 +101,8 @@ func (s *Scheme) Compile(sp spec.Spec) (node.Node, error) { s.mu.RLock() defer s.mu.RUnlock() - codec, exists := s.Codec(sp.GetKind()) - if !exists { + codec := s.Codec(sp.GetKind()) + if codec == nil { return nil, errors.WithStack(encoding.ErrUnsupportedType) } return codec.Compile(sp) @@ -206,8 +204,8 @@ func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { return nil, err } - typ, exists := s.types[sp.GetKind()] - if !exists { + typ, ok := s.types[sp.GetKind()] + if !ok { return sp, nil } diff --git a/pkg/scheme/scheme_test.go b/pkg/scheme/scheme_test.go index 55b3e8565..e2b191f54 100644 --- a/pkg/scheme/scheme_test.go +++ b/pkg/scheme/scheme_test.go @@ -17,18 +17,14 @@ func TestScheme_KnownType(t *testing.T) { ok := s.AddKnownType(kind, &spec.Meta{}) assert.True(t, ok) - - _, ok = s.KnownType(kind) - assert.True(t, ok) + assert.NotNil(t, s.KnownType(kind)) ok = s.AddKnownType(kind, &spec.Meta{}) assert.False(t, ok) ok = s.RemoveKnownType(kind) assert.True(t, ok) - - _, ok = s.KnownType(kind) - assert.False(t, ok) + assert.Nil(t, s.KnownType(kind)) ok = s.RemoveKnownType(kind) assert.False(t, ok) @@ -44,18 +40,14 @@ func TestScheme_Codec(t *testing.T) { ok := s.AddCodec(kind, c) assert.True(t, ok) - - _, ok = s.Codec(kind) - assert.True(t, ok) + assert.NotNil(t, s.Codec(kind)) ok = s.AddCodec(kind, c) assert.False(t, ok) ok = s.RemoveCodec(kind) assert.True(t, ok) - - _, ok = s.Codec(kind) - assert.False(t, ok) + assert.Nil(t, s.Codec(kind)) ok = s.RemoveCodec(kind) assert.False(t, ok) diff --git a/pkg/symbol/loader.go b/pkg/symbol/loader.go index c74f518e9..d29e9e7a1 100644 --- a/pkg/symbol/loader.go +++ b/pkg/symbol/loader.go @@ -110,14 +110,14 @@ func (l *Loader) Load(ctx context.Context, specs ...spec.Spec) error { for _, id := range l.table.Keys() { sb, ok := l.table.Lookup(id) if ok && len(resource.Match(sb.Spec, examples...)) > 0 { - exists := false + ok := false for _, s := range symbols { if s.ID() == id { - exists = true + ok = true break } } - if !exists { + if !ok { if _, err := l.table.Free(id); err != nil { return err }