diff --git a/cmd/resource/builder_test.go b/cmd/resource/builder_test.go index 55de0d80..ff63deab 100644 --- a/cmd/resource/builder_test.go +++ b/cmd/resource/builder_test.go @@ -22,7 +22,7 @@ func TestBuilder_Build(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { @@ -40,7 +40,7 @@ func TestBuilder_Build(t *testing.T) { builder := NewBuilder(). Scheme(s). - Namespace(scheme.NamespaceDefault). + Namespace(scheme.DefaultNamespace). FS(fsys). Filename(filename) diff --git a/cmd/resource/scheme.go b/cmd/resource/scheme.go index f1b0f53e..e3ecf830 100644 --- a/cmd/resource/scheme.go +++ b/cmd/resource/scheme.go @@ -50,7 +50,7 @@ func (c *SpecCodec) Decode(data any) (scheme.Spec, error) { if c.namespace != "" { unstructured.SetNamespace(c.namespace) } else { - unstructured.SetNamespace(scheme.NamespaceDefault) + unstructured.SetNamespace(scheme.DefaultNamespace) } } diff --git a/cmd/uniflow/apply/cmd_test.go b/cmd/uniflow/apply/cmd_test.go index e3241ed3..2c51a38c 100644 --- a/cmd/uniflow/apply/cmd_test.go +++ b/cmd/uniflow/apply/cmd_test.go @@ -32,7 +32,7 @@ func TestExecute(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { diff --git a/cmd/uniflow/start/cmd_test.go b/cmd/uniflow/start/cmd_test.go index 11b6f21f..8ac66b7f 100644 --- a/cmd/uniflow/start/cmd_test.go +++ b/cmd/uniflow/start/cmd_test.go @@ -35,7 +35,7 @@ func TestExecute(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { diff --git a/pkg/hook/builder.go b/pkg/hook/builder.go index f4e4d2b0..cafc5c3a 100644 --- a/pkg/hook/builder.go +++ b/pkg/hook/builder.go @@ -1,18 +1,16 @@ package hook -type ( - // Builder builds a new Hooks. - Builder []func(*Hook) error -) +// Builder builds a new Hooks. +type Builder []func(*Hook) error // NewBuilder returns a new HooksBuilder. func NewBuilder(funcs ...func(*Hook) error) Builder { return Builder(funcs) } -// AddToHooks adds all registered hook to s. -func (b *Builder) AddToHooks(h *Hook) error { - for _, f := range *b { +// AddToHooks adds all registered hooks to h. +func (b Builder) AddToHooks(h *Hook) error { + for _, f := range b { if err := f(h); err != nil { return err } @@ -20,13 +18,13 @@ func (b *Builder) AddToHooks(h *Hook) error { return nil } -// Register adds one or more hook. +// Register adds one or more hooks. func (b *Builder) Register(funcs ...func(*Hook) error) { *b = append(*b, funcs...) } // Build returns a new Hooks containing the registered hooks. -func (b *Builder) Build() (*Hook, error) { +func (b Builder) Build() (*Hook, error) { h := New() if err := b.AddToHooks(h); err != nil { return nil, err diff --git a/pkg/hook/hook.go b/pkg/hook/hook.go index aabf50ea..95b7654e 100644 --- a/pkg/hook/hook.go +++ b/pkg/hook/hook.go @@ -7,24 +7,22 @@ import ( "github.com/siyul-park/uniflow/pkg/symbol" ) -type ( - // Hook is a collection of hook functions. - Hook struct { - loadHooks []symbol.LoadHook - unloadHooks []symbol.UnloadHook - mu sync.RWMutex - } -) +// Hook is a collection of hook functions for loading and unloading nodes. +type Hook struct { + loadHooks []symbol.LoadHook + unloadHooks []symbol.UnloadHook + mu sync.RWMutex +} var _ symbol.LoadHook = &Hook{} var _ symbol.UnloadHook = &Hook{} -// New returns a new Hooks. +// New creates a new Hook instance. func New() *Hook { return &Hook{} } -// AddLoadHook adds a LoadHook. +// AddLoadHook adds a LoadHook function to the Hook. func (h *Hook) AddLoadHook(hook symbol.LoadHook) { h.mu.Lock() defer h.mu.Unlock() @@ -32,7 +30,7 @@ func (h *Hook) AddLoadHook(hook symbol.LoadHook) { h.loadHooks = append(h.loadHooks, hook) } -// AddUnloadHook adds a UnloadHook. +// AddUnloadHook adds an UnloadHook function to the Hook. func (h *Hook) AddUnloadHook(hook symbol.UnloadHook) { h.mu.Lock() defer h.mu.Unlock() @@ -40,7 +38,7 @@ func (h *Hook) AddUnloadHook(hook symbol.UnloadHook) { h.unloadHooks = append(h.unloadHooks, hook) } -// Load runs LoadHooks. +// Load executes LoadHooks on the provided node. func (h *Hook) Load(n node.Node) error { h.mu.RLock() defer h.mu.RUnlock() @@ -53,7 +51,7 @@ func (h *Hook) Load(n node.Node) error { return nil } -// Unload runs UnloadHooks. +// Unload executes UnloadHooks on the provided node. func (h *Hook) Unload(n node.Node) error { h.mu.RLock() defer h.mu.RUnlock() diff --git a/pkg/loader/loader.go b/pkg/loader/loader.go index 17ebd88c..d9ad80e6 100644 --- a/pkg/loader/loader.go +++ b/pkg/loader/loader.go @@ -14,24 +14,22 @@ import ( "github.com/siyul-park/uniflow/pkg/symbol" ) -type ( - // Config represents the configuration for the Loader. - Config struct { - Namespace string // Namespace is the namespace used by the Loader. - Table *symbol.Table // Table is the symbol table for managing symbols. - Scheme *scheme.Scheme // Scheme is the scheme used by the Loader. - Storage *storage.Storage // Storage is the storage used by the Loader. - } +// Config represents the configuration for the Loader. +type Config struct { + Namespace string // Namespace is the namespace used by the Loader. + Table *symbol.Table // Table is the symbol table for managing symbols. + Scheme *scheme.Scheme // Scheme is the scheme used by the Loader. + Storage *storage.Storage // Storage is the storage used by the Loader. +} - // Loader loads scheme.Spec into the symbol.Table. - Loader struct { - namespace string - scheme *scheme.Scheme - table *symbol.Table - storage *storage.Storage - mu sync.RWMutex - } -) +// Loader loads scheme.Spec into the symbol.Table. +type Loader struct { + namespace string + scheme *scheme.Scheme + table *symbol.Table + storage *storage.Storage + mu sync.RWMutex +} // New returns a new Loader. func New(config Config) *Loader { @@ -48,30 +46,34 @@ func New(config Config) *Loader { } } -// LoadOne loads a single scheme.Spec from the storage.Storage +// LoadOne loads a single scheme.Spec from the storage.Storage. +// It processes the specified ID and recursively loads linked scheme.Spec. +// If the loader is associated with a namespace, it uses that namespace. +// The loaded nodes are added to the symbol table for future reference. func (ld *Loader) LoadOne(ctx context.Context, id ulid.ULID) (node.Node, error) { ld.mu.Lock() defer ld.mu.Unlock() namespace := ld.namespace - queue := []any{id} + for len(queue) > 0 { prev := queue queue = nil - exists := map[any]bool{} - var filter *storage.Filter + for _, key := range prev { - if k, ok := key.(ulid.ULID); ok { + switch k := key.(type) { + case ulid.ULID: exists[k] = false filter = filter.Or(storage.Where[ulid.ULID](scheme.KeyID).EQ(k)) - } else if k, ok := key.(string); ok { + case string: exists[k] = false filter = filter.Or(storage.Where[string](scheme.KeyName).EQ(k)) } } + if namespace != "" { filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(namespace)) } @@ -139,9 +141,15 @@ func (ld *Loader) LoadOne(ctx context.Context, id ulid.ULID) (node.Node, error) } } -// LoadAll loads all scheme.Spec from the storage.Storage +// LoadAll loads all scheme.Spec from the storage.Storage. +// It loads all available scheme.Spec and adds them to the symbol table for future reference. +// If the loader is associated with a namespace, it filters the loading based on that namespace. func (ld *Loader) LoadAll(ctx context.Context) ([]node.Node, error) { + ld.mu.Lock() + defer ld.mu.Unlock() + var filter *storage.Filter + if ld.namespace != "" { filter = filter.And(storage.Where[string](scheme.KeyNamespace).EQ(ld.namespace)) } diff --git a/pkg/loader/loader_test.go b/pkg/loader/loader_test.go index 5bada763..f0881a16 100644 --- a/pkg/loader/loader_test.go +++ b/pkg/loader/loader_test.go @@ -36,12 +36,12 @@ func TestLoader_LoadOne(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } spec2 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortIO: { { @@ -95,7 +95,7 @@ func TestLoader_LoadOne(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { @@ -139,7 +139,7 @@ func TestLoader_LoadOne(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { @@ -187,12 +187,12 @@ func TestLoader_LoadAll(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } spec2 := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortIO: { { diff --git a/pkg/loader/reconciler.go b/pkg/loader/reconciler.go index bb393478..13ccecbd 100644 --- a/pkg/loader/reconciler.go +++ b/pkg/loader/reconciler.go @@ -8,14 +8,14 @@ import ( ) type ( - // ReconcilerConfig is a config for for the Reconciler. + // ReconcilerConfig holds the configuration for the Reconciler. ReconcilerConfig struct { - Storage *storage.Storage - Loader *Loader - Filter *storage.Filter + Storage *storage.Storage // Storage is the storage used by the Reconciler. + Loader *Loader // Loader is used to load scheme.Spec into the symbol.Table. + Filter *storage.Filter // Filter is the filter for tracking changes to the scheme.Spec. } - // Reconciler keeps up to date symbol.Table by tracking changes to the scheme.Spec. + // Reconciler keeps the symbol.Table up to date by tracking changes to scheme.Spec. Reconciler struct { storage *storage.Storage loader *Loader @@ -26,7 +26,7 @@ type ( } ) -// NewReconciler returns a new Reconciler. +// NewReconciler creates a new Reconciler with the given configuration. func NewReconciler(config ReconcilerConfig) *Reconciler { storage := config.Storage loader := config.Loader @@ -40,13 +40,13 @@ func NewReconciler(config ReconcilerConfig) *Reconciler { } } -// Watch starts to watch the changes. +// Watch starts watching for changes to scheme.Spec. func (r *Reconciler) Watch(ctx context.Context) error { _, err := r.watch(ctx) return err } -// Reconcile starts to reflects the changes. +// Reconcile reflects changes to scheme.Spec in the symbol.Table. func (r *Reconciler) Reconcile(ctx context.Context) error { stream, err := r.watch(ctx) if err != nil { @@ -76,7 +76,7 @@ func (r *Reconciler) Reconcile(ctx context.Context) error { } } -// Close closes the Reconciler. +// Close stops the Reconciler and closes the associated stream. func (r *Reconciler) Close() error { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/loader/reconciler_test.go b/pkg/loader/reconciler_test.go index 349d6ec3..06e652ef 100644 --- a/pkg/loader/reconciler_test.go +++ b/pkg/loader/reconciler_test.go @@ -51,7 +51,7 @@ func TestReconciler_Reconcile(t *testing.T) { m := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } codec := scheme.CodecFunc(func(spec scheme.Spec) (node.Node, error) { diff --git a/pkg/node/onetomany_test.go b/pkg/node/onetomany_test.go index 7b9855e8..8d15a1a4 100644 --- a/pkg/node/onetomany_test.go +++ b/pkg/node/onetomany_test.go @@ -64,7 +64,7 @@ func TestOneToManyNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) @@ -109,7 +109,7 @@ func TestOneToManyNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/node/onetoone_test.go b/pkg/node/onetoone_test.go index fdb3fe1f..4b31c383 100644 --- a/pkg/node/onetoone_test.go +++ b/pkg/node/onetoone_test.go @@ -65,7 +65,7 @@ func TestOneToOneNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -102,7 +102,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) errStream := err.Open(proc) @@ -142,7 +142,7 @@ func TestOneToOneNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) @@ -188,7 +188,7 @@ func TestOneToOneNode_Send(t *testing.T) { errPort.Link(err) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) errStream := err.Open(proc) diff --git a/pkg/packet/packet.go b/pkg/packet/packet.go index 2eaebdea..d5da2acd 100644 --- a/pkg/packet/packet.go +++ b/pkg/packet/packet.go @@ -5,18 +5,20 @@ import ( "github.com/siyul-park/uniflow/pkg/primitive" ) -type ( - // Packet is a formalized block of data. - Packet struct { - id ulid.ULID - payload primitive.Value - } -) +// Packet represents a formalized block of data. +type Packet struct { + id ulid.ULID + payload primitive.Value +} -// NewError return a new Packet for error. +// NewError creates a new Packet to represent an error. +// It takes an error and an optional cause Packet and constructs a Packet with error details. func NewError(err error, cause *Packet) *Packet { - var pairs []primitive.Value - pairs = append(pairs, primitive.NewString("error"), primitive.NewString(err.Error())) + pairs := []primitive.Value{ + primitive.NewString("error"), + primitive.NewString(err.Error()), + } + if cause != nil { pairs = append(pairs, primitive.NewString("cause"), cause.Payload()) } @@ -24,7 +26,8 @@ func NewError(err error, cause *Packet) *Packet { return New(primitive.NewMap(pairs...)) } -// New returns a new Packet. +// New creates a new Packet with the given payload. +// It generates a new unique ID for the Packet. func New(payload primitive.Value) *Packet { return &Packet{ id: ulid.Make(), @@ -32,12 +35,12 @@ func New(payload primitive.Value) *Packet { } } -// ID returns the ID of the Packet -func (pck *Packet) ID() ulid.ULID { - return pck.id +// ID returns the unique identifier (ID) of the Packet. +func (p *Packet) ID() ulid.ULID { + return p.id } -// Payload returns the payload of the Packet. -func (pck *Packet) Payload() primitive.Value { - return pck.payload +// Payload returns the data payload of the Packet. +func (p *Packet) Payload() primitive.Value { + return p.payload } diff --git a/pkg/plugin/controllx/snippet_test.go b/pkg/plugin/controllx/snippet_test.go index 29915706..f0d7fd53 100644 --- a/pkg/plugin/controllx/snippet_test.go +++ b/pkg/plugin/controllx/snippet_test.go @@ -44,7 +44,7 @@ function main(inPayload: any): any { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -80,7 +80,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -114,7 +114,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -146,7 +146,7 @@ function main(inPayload) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -185,7 +185,7 @@ function main(inPayload: any): any { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -222,7 +222,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -255,7 +255,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -288,7 +288,7 @@ function main(inPayload) { for i := 0; i < b.N; i++ { proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) diff --git a/pkg/plugin/controllx/switch_test.go b/pkg/plugin/controllx/switch_test.go index 22fc7a5e..ac97a2e1 100644 --- a/pkg/plugin/controllx/switch_test.go +++ b/pkg/plugin/controllx/switch_test.go @@ -79,7 +79,7 @@ func TestSwitchNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/networkx/http.go b/pkg/plugin/networkx/http.go index 9d675772..e7d7c455 100644 --- a/pkg/plugin/networkx/http.go +++ b/pkg/plugin/networkx/http.go @@ -394,13 +394,13 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) { proc := process.New() defer func() { proc.Stack().Wait() - proc.Close() + proc.Exit() }() go func() { select { case <-r.Context().Done(): - proc.Close() + proc.Exit() case <-proc.Done(): } }() diff --git a/pkg/plugin/networkx/router_test.go b/pkg/plugin/networkx/router_test.go index db5efcbc..6c4af490 100644 --- a/pkg/plugin/networkx/router_test.go +++ b/pkg/plugin/networkx/router_test.go @@ -71,7 +71,7 @@ func TestRouterNode_Send(t *testing.T) { outPort.Link(out) proc := process.New() - defer proc.Close() + defer proc.Exit() inStream := in.Open(proc) outStream := out.Open(proc) diff --git a/pkg/plugin/systemx/reflect_test.go b/pkg/plugin/systemx/reflect_test.go index 5ef1ec87..71b3eeb0 100644 --- a/pkg/plugin/systemx/reflect_test.go +++ b/pkg/plugin/systemx/reflect_test.go @@ -64,7 +64,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -108,7 +108,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -146,7 +146,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) @@ -190,7 +190,7 @@ func TestReflectNode_Send(t *testing.T) { ioPort.Link(io) proc := process.New() - defer proc.Close() + defer proc.Exit() ioStream := io.Open(proc) diff --git a/pkg/port/array.go b/pkg/port/array.go index 4bbc69d4..649abfbc 100644 --- a/pkg/port/array.go +++ b/pkg/port/array.go @@ -6,7 +6,7 @@ import ( "strconv" ) -// GetIndex is return index of the given port. +// GetIndex returns the index of the given port. func GetIndex(source string, target string) (int, bool) { regex, err := regexp.Compile(source + `\[(\d+)\]`) if err != nil { @@ -16,14 +16,14 @@ func GetIndex(source string, target string) (int, bool) { if len(groups) == 0 { return 0, false } - i, err := strconv.Atoi(groups[0][1]) + index, err := strconv.Atoi(groups[0][1]) if err != nil { return 0, false } - return i, true + return index, true } -// SetIndex is return full port name of the given port and index. +// SetIndex returns the full port name of the given port and index. func SetIndex(source string, index int) string { return fmt.Sprintf(source+"[%d]", index) } diff --git a/pkg/port/inithook.go b/pkg/port/inithook.go index 470e9bd2..a1bc3cad 100644 --- a/pkg/port/inithook.go +++ b/pkg/port/inithook.go @@ -1,8 +1,6 @@ package port import ( - "sync" - "github.com/siyul-park/uniflow/pkg/process" ) @@ -13,73 +11,10 @@ type ( } InitHookFunc func(proc *process.Process) - - // InitOnceHook is a hook that runs only once per process.process. - InitOnceHook struct { - init InitHook - processes map[*process.Process]struct{} - mu sync.RWMutex - } ) var _ InitHook = InitHookFunc(func(proc *process.Process) {}) -var _ InitHook = &InitOnceHook{} func (h InitHookFunc) Init(proc *process.Process) { h(proc) } - -// InitOnce returns a new InitOnceHook. -func InitOnce(h InitHook) *InitOnceHook { - return &InitOnceHook{ - init: h, - processes: make(map[*process.Process]struct{}), - } -} - -func (h *InitOnceHook) Init(proc *process.Process) { - if ok := func() bool { - h.mu.RLock() - defer h.mu.RUnlock() - - _, ok := h.processes[proc] - return !ok - }(); !ok { - return - } - - if ok := func() bool { - h.mu.Lock() - defer h.mu.Unlock() - - _, ok := h.processes[proc] - if ok { - return false - } - - h.processes[proc] = struct{}{} - go func() { - <-proc.Done() - - h.mu.Lock() - defer h.mu.Unlock() - - delete(h.processes, proc) - }() - - return true - }(); !ok { - return - } - - h.init.Init(proc) -} - -func (h *InitOnceHook) Close() { - h.mu.Lock() - defer h.mu.Unlock() - - for proc := range h.processes { - delete(h.processes, proc) - } -} diff --git a/pkg/port/pipe.go b/pkg/port/pipe.go index 70b2db9a..8b21a9a2 100644 --- a/pkg/port/pipe.go +++ b/pkg/port/pipe.go @@ -6,24 +6,15 @@ import ( "github.com/siyul-park/uniflow/pkg/packet" ) -type ( - // ReadPipe is a Pipe that can be Receive Packet. - ReadPipe struct { - in chan *packet.Packet - out chan *packet.Packet - done chan struct{} - mu sync.RWMutex - } - - // WritePipe is a Pipe that can be Send Packet. - WritePipe struct { - links []*ReadPipe - done chan struct{} - mu sync.RWMutex - } -) +// ReadPipe represents a unidirectional pipe for receiving packets. +type ReadPipe struct { + in chan *packet.Packet + out chan *packet.Packet + done chan struct{} + mu sync.RWMutex +} -// NewReadPipe returns a new ReadPipe. +// NewReadPipe creates a new ReadPipe instance. func NewReadPipe() *ReadPipe { p := &ReadPipe{ in: make(chan *packet.Packet), @@ -70,7 +61,7 @@ func NewReadPipe() *ReadPipe { return p } -// Receive returns a channel that receives Packet. +// Receive returns a channel that receives packets. func (p *ReadPipe) Receive() <-chan *packet.Packet { return p.out } @@ -81,7 +72,7 @@ func (p *ReadPipe) Done() <-chan struct{} { } // Close closes the ReadPipe. -// Packet that are not processed will be discard. +// Unprocessed packets will be discarded. func (p *ReadPipe) Close() { p.mu.Lock() defer p.mu.Unlock() @@ -96,6 +87,7 @@ func (p *ReadPipe) Close() { close(p.in) } +// send sends a packet through the pipe. func (p *ReadPipe) send(pck *packet.Packet) { p.mu.RLock() defer p.mu.RUnlock() @@ -107,7 +99,14 @@ func (p *ReadPipe) send(pck *packet.Packet) { } } -// NewWritePipe returns a new WritePipe. +// WritePipe represents a unidirectional pipe for sending packets. +type WritePipe struct { + links []*ReadPipe + done chan struct{} + mu sync.RWMutex +} + +// NewWritePipe creates a new WritePipe instance. func NewWritePipe() *WritePipe { return &WritePipe{ links: nil, @@ -116,7 +115,7 @@ func NewWritePipe() *WritePipe { } } -// Send a Packet to all linked ReadPipe. +// Send sends a packet to all linked ReadPipe instances. func (p *WritePipe) Send(pck *packet.Packet) { p.mu.Lock() defer p.mu.Unlock() @@ -133,7 +132,7 @@ func (p *WritePipe) Send(pck *packet.Packet) { wg.Wait() } -// Link a ReadPipe to enable communication with each other. +// Link links a ReadPipe to enable communication with each other. func (p *WritePipe) Link(pipe *ReadPipe) { p.mu.Lock() defer p.mu.Unlock() @@ -175,7 +174,7 @@ func (p *WritePipe) Done() <-chan struct{} { } // Close closes the WritePipe. -// Packet that are not processed will be discard. +// Unprocessed packets will be discarded. func (p *WritePipe) Close() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/port/port.go b/pkg/port/port.go index 651b1d5e..577e2405 100644 --- a/pkg/port/port.go +++ b/pkg/port/port.go @@ -6,16 +6,14 @@ import ( "github.com/siyul-park/uniflow/pkg/process" ) -type ( - // Port is a linking terminal that allows *packet.Packet to be exchanged. - Port struct { - streams map[*process.Process]*Stream - links []*Port - initHooks []InitHook - done chan struct{} - mu sync.RWMutex - } -) +// Port is a linking terminal that allows *packet.Packet to be exchanged. +type Port struct { + streams map[*process.Process]*Stream + links []*Port + initHooks []InitHook + done chan struct{} + mu sync.RWMutex +} // New returns a new Port. func New() *Port { @@ -25,7 +23,7 @@ func New() *Port { } } -// AddInitHook adds a InitHook. +// AddInitHook adds an InitHook to the Port. func (p *Port) AddInitHook(hook InitHook) { p.mu.Lock() defer p.mu.Unlock() @@ -33,7 +31,7 @@ func (p *Port) AddInitHook(hook InitHook) { p.initHooks = append(p.initHooks, hook) } -// Link connects two Port to enable communication with each other. +// Link connects two Ports to enable communication with each other. func (p *Port) Link(port *Port) { p.link(port) port.link(p) @@ -45,7 +43,7 @@ func (p *Port) Unlink(port *Port) { port.unlink(p) } -// Links return length of linked. +// Links returns the number of linked Ports. func (p *Port) Links() int { p.mu.RLock() defer p.mu.RUnlock() @@ -53,8 +51,9 @@ func (p *Port) Links() int { return len(p.links) } -// Open Stream to communicate. For each process, Stream is opened independently. -// When Process is closed, Stream is also closed. Stream Send and Receive Packet to Broadcast to all other Port connected to the Port. +// Open creates or returns an existing Stream for communication with a process. +// The Stream is closed when the associated Process or Port is closed. +// It broadcasts sent and received packets to all other Ports connected to it. func (p *Port) Open(proc *process.Process) *Stream { select { case <-proc.Done(): @@ -133,8 +132,8 @@ func (p *Port) Done() <-chan struct{} { return p.done } -// Close the Port. -// All Stream currently open will also be shut down and any Packet that are not processed will be discard. +// Close closes the Port. +// All Streams currently open will also be shut down, and any unprocessed packets will be discarded. func (p *Port) Close() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/port/port_test.go b/pkg/port/port_test.go index 34093fdb..ea4aba6c 100644 --- a/pkg/port/port_test.go +++ b/pkg/port/port_test.go @@ -94,7 +94,7 @@ func TestPort_Open(t *testing.T) { proc := process.New() stream := port.Open(proc) - proc.Close() + proc.Exit() select { case <-stream.Done(): @@ -105,7 +105,7 @@ func TestPort_Open(t *testing.T) { t.Run("process closed", func(t *testing.T) { proc := process.New() - proc.Close() + proc.Exit() stream := port.Open(proc) diff --git a/pkg/port/stream.go b/pkg/port/stream.go index bcf9f83e..b9458c88 100644 --- a/pkg/port/stream.go +++ b/pkg/port/stream.go @@ -7,19 +7,17 @@ import ( "github.com/siyul-park/uniflow/pkg/packet" ) -type ( - // Stream is a channel where you can exchange *packet.Packet. - Stream struct { - id ulid.ULID - read *ReadPipe - write *WritePipe - links []*Stream - done chan struct{} - mu sync.RWMutex - } -) +// Stream represents a communication channel for exchanging *packet.Packet. +type Stream struct { + id ulid.ULID + read *ReadPipe + write *WritePipe + links []*Stream + done chan struct{} + mu sync.RWMutex +} -// NewStream returns a new Stream. +// NewStream creates a new Stream instance. func NewStream() *Stream { return &Stream{ id: ulid.Make(), @@ -29,51 +27,48 @@ func NewStream() *Stream { } } -// ID returns the ID. +// ID returns the Stream's ID. func (s *Stream) ID() ulid.ULID { s.mu.RLock() defer s.mu.RUnlock() - return s.id } -// Send sends a Packet to linked Stream. +// Send sends a *packet.Packet to linked Streams. func (s *Stream) Send(pck *packet.Packet) { s.write.Send(pck) } -// Receive receives a Packet from linked Stream. +// Receive returns a channel for receiving *packet.Packet from linked Streams. func (s *Stream) Receive() <-chan *packet.Packet { return s.read.Receive() } -// Link connects two Stream to enable communication with each other. +// Link connects two Streams for communication. func (s *Stream) Link(stream *Stream) { s.link(stream) stream.link(s) } -// Unlink removes the linked Stream from being able to communicate further. +// Unlink disconnects two linked Streams. func (s *Stream) Unlink(stream *Stream) { s.unlink(stream) stream.unlink(s) } -// Links returns length of linked. +// Links returns the number of linked Streams. func (s *Stream) Links() int { s.mu.RLock() defer s.mu.RUnlock() - return len(s.links) } -// Done returns a channel which is closed when the Stream is closed. +// Done returns a channel that's closed when the Stream is closed. func (s *Stream) Done() <-chan struct{} { return s.done } -// Close closes the Stream. -// Shut down and any Packet that are not processed will be discard. +// Close closes the Stream, discarding any unprocessed packets. func (s *Stream) Close() { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +84,7 @@ func (s *Stream) Close() { s.write.Close() } +// link connects the current Stream with another for communication. func (s *Stream) link(stream *Stream) { if stream == s { return @@ -107,6 +103,7 @@ func (s *Stream) link(stream *Stream) { s.write.Link(stream.read) } +// unlink disconnects the current Stream from another. func (s *Stream) unlink(stream *Stream) { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/process/process.go b/pkg/process/process.go index 2a2bee77..7aadb151 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -6,15 +6,14 @@ import ( "github.com/oklog/ulid/v2" ) -type ( - // Process is a processing unit that isolates data processing with others. - Process struct { - id ulid.ULID - stack *Stack - done chan struct{} - mu sync.RWMutex - } -) +// Process is a processing unit that isolates data processing from others. + +type Process struct { + id ulid.ULID + stack *Stack + done chan struct{} + mu sync.RWMutex +} // New creates a new Process. func New() *Process { @@ -26,7 +25,7 @@ func New() *Process { } } -// ID returns the ID. +// ID returns the ID of the process. func (p *Process) ID() ulid.ULID { p.mu.RLock() defer p.mu.RUnlock() @@ -34,7 +33,7 @@ func (p *Process) ID() ulid.ULID { return p.id } -// Stack returns a Stack +// Stack returns a process's stack. func (p *Process) Stack() *Stack { p.mu.RLock() defer p.mu.RUnlock() @@ -42,13 +41,13 @@ func (p *Process) Stack() *Stack { return p.stack } -// Done returns a channel that is closed when is closed. +// Done returns a channel that is closed when the process is closed. func (p *Process) Done() <-chan struct{} { return p.done } -// Close closes the Process. -func (p *Process) Close() { +// Exit closes the process. +func (p *Process) Exit() { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/process/process_test.go b/pkg/process/process_test.go index 84e1e3aa..7107fe58 100644 --- a/pkg/process/process_test.go +++ b/pkg/process/process_test.go @@ -9,21 +9,21 @@ import ( func TestNew(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotNil(t, proc) } func TestProcess_ID(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotEqual(t, ulid.ULID{}, proc.ID()) } func TestProcess_Stack(t *testing.T) { proc := New() - defer proc.Close() + defer proc.Exit() assert.NotNil(t, proc.Stack()) } @@ -37,7 +37,7 @@ func TestProcess_Close(t *testing.T) { default: } - proc.Close() + proc.Exit() select { case <-proc.Done(): diff --git a/pkg/process/stack.go b/pkg/process/stack.go index 46b6a280..93238e29 100644 --- a/pkg/process/stack.go +++ b/pkg/process/stack.go @@ -6,19 +6,17 @@ import ( "github.com/oklog/ulid/v2" ) -type ( - // Stack is trace object. - Stack struct { - stems map[ulid.ULID][]ulid.ULID - leaves map[ulid.ULID][]ulid.ULID - stacks map[ulid.ULID][]ulid.ULID - heads map[ulid.ULID][]ulid.ULID - wait sync.RWMutex - mu sync.RWMutex - } -) +// Stack is a data structure that manages relationships between ULIDs in a trace. +type Stack struct { + stems map[ulid.ULID][]ulid.ULID + leaves map[ulid.ULID][]ulid.ULID + stacks map[ulid.ULID][]ulid.ULID + heads map[ulid.ULID][]ulid.ULID + wait sync.RWMutex + mu sync.RWMutex +} -// NewStack returns a new Stack. +// NewStack creates a new Stack instance. func NewStack() *Stack { return &Stack{ stems: make(map[ulid.ULID][]ulid.ULID), @@ -28,7 +26,7 @@ func NewStack() *Stack { } } -// Link adds an relation. +// Link establishes a relationship between two ULIDs, a stem, and a leaf. func (s *Stack) Link(stem, leaf ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -51,7 +49,7 @@ func (s *Stack) Link(stem, leaf ulid.ULID) { s.leaves[stem] = append(s.leaves[stem], leaf) } -// Unlink deletes an relation. +// Unlink removes a relationship between a stem and a leaf. func (s *Stack) Unlink(stem, leaf ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -84,7 +82,7 @@ func (s *Stack) Unlink(stem, leaf ulid.ULID) { } } -// Push pushes the value. +// Push adds a value to the stack associated with a key. func (s *Stack) Push(key, value ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -96,7 +94,7 @@ func (s *Stack) Push(key, value ulid.ULID) { s.wait.RLock() } -// Pop pops the value. +// Pop removes and returns the top value from the stack associated with a key. func (s *Stack) Pop(key, value ulid.ULID) (ulid.ULID, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -169,7 +167,7 @@ func (s *Stack) Pop(key, value ulid.ULID) (ulid.ULID, bool) { return ulid.ULID{}, false } -// Clear removes a link from the child. +// Clear removes links from the child associated with a key. func (s *Stack) Clear(key ulid.ULID) { s.mu.Lock() defer s.mu.Unlock() @@ -219,7 +217,7 @@ func (s *Stack) Clear(key ulid.ULID) { } } -// Len return the number of values. +// Len returns the number of values in the stack associated with a key. func (s *Stack) Len(key ulid.ULID) int { s.mu.RLock() defer s.mu.RUnlock() @@ -263,13 +261,13 @@ func (s *Stack) Len(key ulid.ULID) int { return count } -// Wait blocks until is empty. +// Wait blocks until the stack is empty. func (s *Stack) Wait() { s.wait.Lock() defer s.wait.Unlock() } -// Close closes all resources. +// Close releases all resources associated with the Stack. func (s *Stack) Close() { s.mu.Lock() defer s.mu.Unlock() diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 0da767de..9725d4ec 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -14,60 +14,53 @@ import ( "github.com/siyul-park/uniflow/pkg/symbol" ) -type ( - // Config is a config for for the Runtime. - Config struct { - Namespace string - Hooks *hook.Hook - Scheme *scheme.Scheme - Database database.Database - } +// Config holds the configuration options for the Runtime. +type Config struct { + Namespace string // Namespace is the namespace for the Runtime. + Hooks *hook.Hook // Hooks represent the hooks for the Runtime. + Scheme *scheme.Scheme // Scheme is the scheme for the Runtime. + Database database.Database // Database is the database for the Runtime. +} - // Runtime is an execution environment that runs Flows. - Runtime struct { - namespace string - hooks *hook.Hook - scheme *scheme.Scheme - storage *storage.Storage - table *symbol.Table - loader *loader.Loader - reconciler *loader.Reconciler - } -) +// Runtime represents an execution environment for running Flows. +type Runtime struct { + namespace string + hooks *hook.Hook + scheme *scheme.Scheme + storage *storage.Storage + table *symbol.Table + loader *loader.Loader + reconciler *loader.Reconciler +} -// New returns a new Runtime. +// New creates a new Runtime instance with the specified configuration. func New(ctx context.Context, config Config) (*Runtime, error) { - ns := config.Namespace - hk := config.Hooks - sc := config.Scheme - db := config.Database - - if hk == nil { - hk = hook.New() + if config.Hooks == nil { + config.Hooks = hook.New() } - if sc == nil { - sc = scheme.New() + if config.Scheme == nil { + config.Scheme = scheme.New() } - if db == nil { - db = memdb.New("") + if config.Database == nil { + config.Database = memdb.New("") } st, err := storage.New(ctx, storage.Config{ - Scheme: sc, - Database: db, + Scheme: config.Scheme, + Database: config.Database, }) if err != nil { return nil, err } tb := symbol.NewTable(symbol.TableOptions{ - LoadHooks: []symbol.LoadHook{hk}, - UnloadHooks: []symbol.UnloadHook{hk}, + LoadHooks: []symbol.LoadHook{config.Hooks}, + UnloadHooks: []symbol.UnloadHook{config.Hooks}, }) ld := loader.New(loader.Config{ - Namespace: ns, - Scheme: sc, + Namespace: config.Namespace, + Scheme: config.Scheme, Storage: st, Table: tb, }) @@ -76,9 +69,10 @@ func New(ctx context.Context, config Config) (*Runtime, error) { } var filter *storage.Filter - if ns != "" { - filter = storage.Where[string](scheme.KeyNamespace).EQ(ns) + if config.Namespace != "" { + filter = storage.Where[string](scheme.KeyNamespace).EQ(config.Namespace) } + rc := loader.NewReconciler(loader.ReconcilerConfig{ Storage: st, Loader: ld, @@ -86,9 +80,9 @@ func New(ctx context.Context, config Config) (*Runtime, error) { }) return &Runtime{ - namespace: ns, - hooks: hk, - scheme: sc, + namespace: config.Namespace, + hooks: config.Hooks, + scheme: config.Scheme, storage: st, table: tb, loader: ld, @@ -96,7 +90,7 @@ func New(ctx context.Context, config Config) (*Runtime, error) { }, nil } -// Lookup lookup node.Node in symbol.Table, and if it not exist load it from storage.Storage. +// Lookup searches for a node.Node in the symbol.Table. If not found, it loads it from storage.Storage. func (r *Runtime) Lookup(ctx context.Context, id ulid.ULID) (node.Node, error) { if s, ok := r.table.LookupByID(id); !ok { return r.loader.LoadOne(ctx, id) @@ -105,14 +99,14 @@ func (r *Runtime) Lookup(ctx context.Context, id ulid.ULID) (node.Node, error) { } } -// Free unload node.Node from symbol.Table. +// Free unloads a node.Node from the symbol.Table. func (r *Runtime) Free(_ context.Context, id ulid.ULID) (bool, error) { return r.table.Free(id) } -// Start starts the Runtime. -// Runtime load all scheme.Spec as node.Node from the database.Collection, -// and then keeps node.Node up-to-date and runs by continuously tracking scheme.Spec. +// Start initiates the Runtime. +// It loads all scheme.Specs as node.Nodes from the database.Collection, +// and continuously monitors and runs them by staying up-to-date with scheme.Spec changes. func (r *Runtime) Start(ctx context.Context) error { if err := r.reconciler.Watch(ctx); err != nil { return err @@ -123,10 +117,10 @@ func (r *Runtime) Start(ctx context.Context) error { return r.reconciler.Reconcile(ctx) } -// Close is close the Runtime. +// Close shuts down the Runtime. func (r *Runtime) Close(ctx context.Context) error { if err := r.reconciler.Close(); err != nil { return err } return r.table.Close() -} +} \ No newline at end of file diff --git a/pkg/scheme/builder.go b/pkg/scheme/builder.go index e4a311fa..c0b980a7 100644 --- a/pkg/scheme/builder.go +++ b/pkg/scheme/builder.go @@ -1,16 +1,14 @@ package scheme -type ( - // Builder builds a new Scheme. - Builder []func(*Scheme) error -) +// Builder is a collection of functions to build a new Scheme. +type Builder []func(*Scheme) error -// NewBuilder returns a new SchemeBuilder. +// NewBuilder creates a new SchemeBuilder with the provided functions. func NewBuilder(funcs ...func(*Scheme) error) Builder { return Builder(funcs) } -// AddToScheme adds all registered types to s. +// AddToScheme adds all registered types to the given Scheme. func (b *Builder) AddToScheme(s *Scheme) error { for _, f := range *b { if err := f(s); err != nil { @@ -20,7 +18,7 @@ func (b *Builder) AddToScheme(s *Scheme) error { return nil } -// Register adds one or more Spec. +// Register adds one or more functions to register Spec types. func (b *Builder) Register(funcs ...func(*Scheme) error) { *b = append(*b, funcs...) } diff --git a/pkg/scheme/codec.go b/pkg/scheme/codec.go index 9f44ad9e..952664b4 100644 --- a/pkg/scheme/codec.go +++ b/pkg/scheme/codec.go @@ -15,16 +15,18 @@ type ( CodecFunc func(spec Spec) (node.Node, error) ) +// CodecWithType creates a new CodecFunc for the specified type T. func CodecWithType[T Spec](decode func(spec T) (node.Node, error)) Codec { return CodecFunc(func(spec Spec) (node.Node, error) { - if spec, ok := spec.(T); !ok { - return nil, errors.WithStack(encoding.ErrUnsupportedValue) - } else { - return decode(spec) + if converted, ok := spec.(T); ok { + return decode(converted) } + return nil, errors.WithStack(encoding.ErrUnsupportedValue) }) } +// Decode implements the Decode method for CodecFunc. func (c CodecFunc) Decode(spec Spec) (node.Node, error) { return c(spec) } + diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 9d0e045d..7aeb79f1 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -9,18 +9,16 @@ import ( "github.com/siyul-park/uniflow/pkg/node" ) -type ( - // Scheme defines methods for decode Spec. - Scheme struct { - types map[string]reflect.Type - codecs map[string]Codec - mu sync.RWMutex - } -) +// Scheme defines a registry for handling decoding of Spec objects. +type Scheme struct { + types map[string]reflect.Type + codecs map[string]Codec + mu sync.RWMutex +} var _ Codec = &Scheme{} -// New returns a new Scheme. +// New creates a new Scheme instance. func New() *Scheme { return &Scheme{ types: make(map[string]reflect.Type), @@ -28,7 +26,7 @@ func New() *Scheme { } } -// AddKnownType adds a new Type and Spec to the Scheme. +// AddKnownType adds a new Spec type to the Scheme, associating it with a kind. func (s *Scheme) AddKnownType(kind string, spec Spec) { s.mu.Lock() defer s.mu.Unlock() @@ -45,7 +43,7 @@ func (s *Scheme) KnownType(kind string) (reflect.Type, bool) { return t, ok } -// AddCodec adds a new Codec to the Scheme. +// AddCodec associates a Codec with a specific kind in the Scheme. func (s *Scheme) AddCodec(kind string, codec Codec) { s.mu.Lock() defer s.mu.Unlock() @@ -53,7 +51,7 @@ func (s *Scheme) AddCodec(kind string, codec Codec) { s.codecs[kind] = codec } -// Codec returns Codec with the given kind. +// Codec returns a Codec associated with the given kind. func (s *Scheme) Codec(kind string) (Codec, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -62,7 +60,7 @@ func (s *Scheme) Codec(kind string) (Codec, bool) { return c, ok } -// New returns a new Spec with the given kind. +// New creates a new instance of Spec with the given kind. func (s *Scheme) New(kind string) (Spec, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -107,7 +105,7 @@ func (s *Scheme) Decode(spec Spec) (node.Node, error) { return nil, errors.WithStack(encoding.ErrUnsupportedValue) } -// Kinds returns the kinds of the given Spec. +// Kinds returns the kinds associated with the given Spec. func (s *Scheme) Kinds(spec Spec) []string { s.mu.RLock() defer s.mu.RUnlock() diff --git a/pkg/scheme/spec.go b/pkg/scheme/spec.go index 69fb59f6..b5e6f101 100644 --- a/pkg/scheme/spec.go +++ b/pkg/scheme/spec.go @@ -4,90 +4,95 @@ import ( "github.com/oklog/ulid/v2" ) -type ( - // Spec is a specification that defines how node.Node should be defined and linked. - Spec interface { - // GetID returns the ID. - GetID() ulid.ULID - // SetID set the ID. - SetID(val ulid.ULID) - // GetKind returns the Kind. - GetKind() string - // SetKind set the Kind. - SetKind(val string) - // GetNamespace returns the Namespace. - GetNamespace() string - // SetNamespace set the Namespace. - SetNamespace(val string) - // GetName returns the Name. - GetName() string - // SetName set the Name. - SetName(val string) - // GetLinks returns the Links. - GetLinks() map[string][]PortLocation - // SetLinks set the Links. - SetLinks(val map[string][]PortLocation) - } - - // SpecMeta is metadata that all persisted resources must have, which includes all objects users must create. - SpecMeta struct { - ID ulid.ULID `json:"id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` - Kind string `json:"kind,omitempty" yaml:"kind,omitempty" map:"kind,omitempty"` - Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"` - Name string `json:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` - Links map[string][]PortLocation `json:"links,omitempty" yaml:"links,omitempty" map:"links,omitempty"` - } +// Spec represents the specification defining the attributes and connections of a node. +type Spec interface { + // GetID returns the unique identifier of the node. + GetID() ulid.ULID + // SetID sets the unique identifier of the node. + SetID(val ulid.ULID) + // GetKind returns the category or type of the node. + GetKind() string + // SetKind sets the category or type of the node. + SetKind(val string) + // GetNamespace returns the logical grouping of nodes, allowing for better organization. + GetNamespace() string + // SetNamespace sets the logical grouping of nodes. + SetNamespace(val string) + // GetName returns the human-readable name of the node. + GetName() string + // SetName sets the human-readable name of the node. + SetName(val string) + // GetLinks returns the connections or links between nodes. + GetLinks() map[string][]PortLocation + // SetLinks sets the connections or links between nodes. + SetLinks(val map[string][]PortLocation) +} - // PortLocation is the location of a port in the network. - PortLocation struct { - ID ulid.ULID `json:"id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` - Name string `json:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` - Port string `json:"port" yaml:"port" map:"port"` - } -) +// SpecMeta is the metadata that every persisted resource must have, including user-created objects. +type SpecMeta struct { + ID ulid.ULID `json:"id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` + Kind string `json:"kind,omitempty" yaml:"kind,omitempty" map:"kind,omitempty"` + Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"` + Name string `json:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` + Links map[string][]PortLocation `json:"links,omitempty" yaml:"links,omitempty" map:"links,omitempty"` +} -var _ Spec = &SpecMeta{} +// PortLocation represents the location of a port within the network. +type PortLocation struct { + ID ulid.ULID `json:"id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` + Name string `json:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` + Port string `json:"port" yaml:"port" map:"port"` +} -const ( - NamespaceDefault = "default" -) +// DefaultNamespace is the default value for logical node grouping. +const DefaultNamespace = "default" +// GetID returns the unique identifier of the SpecMeta. func (m *SpecMeta) GetID() ulid.ULID { return m.ID } +// SetID sets the unique identifier of the SpecMeta. func (m *SpecMeta) SetID(val ulid.ULID) { m.ID = val } +// GetKind returns the category or type of the SpecMeta. func (m *SpecMeta) GetKind() string { return m.Kind } +// SetKind sets the category or type of the SpecMeta. func (m *SpecMeta) SetKind(val string) { m.Kind = val } +// GetNamespace returns the logical grouping of the SpecMeta. func (m *SpecMeta) GetNamespace() string { return m.Namespace } +// SetNamespace sets the logical grouping of the SpecMeta. func (m *SpecMeta) SetNamespace(val string) { m.Namespace = val } +// GetName returns the human-readable name of the SpecMeta. func (m *SpecMeta) GetName() string { return m.Name } +// SetName sets the human-readable name of the SpecMeta. func (m *SpecMeta) SetName(val string) { m.Name = val } +// GetLinks returns the connections or links of the SpecMeta. func (m *SpecMeta) GetLinks() map[string][]PortLocation { return m.Links } +// SetLinks sets the connections or links of the SpecMeta. func (m *SpecMeta) SetLinks(val map[string][]PortLocation) { m.Links = val } diff --git a/pkg/scheme/unstructured.go b/pkg/scheme/unstructured.go index efc8eb3f..a8cdce30 100644 --- a/pkg/scheme/unstructured.go +++ b/pkg/scheme/unstructured.go @@ -7,16 +7,13 @@ import ( "github.com/siyul-park/uniflow/pkg/primitive" ) -type ( - // Unstructured is an Spec that is not marshaled for structuring. - Unstructured struct { - doc *primitive.Map - mu sync.RWMutex - } -) - -var _ Spec = &Unstructured{} +// Unstructured is a data structure that implements the Spec interface and is not marshaled for structuring. +type Unstructured struct { + doc *primitive.Map + mu sync.RWMutex +} +// Key constants for commonly used fields in Unstructured. const ( KeyID = "id" KeyKind = "kind" @@ -25,7 +22,9 @@ const ( KeyLinks = "links" ) -// NewUnstructured returns a new Unstructured. +var _ Spec = &Unstructured{} + +// NewUnstructured returns a new Unstructured instance with an optional primitive.Map. func NewUnstructured(doc *primitive.Map) *Unstructured { if doc == nil { doc = primitive.NewMap() diff --git a/pkg/storage/event.go b/pkg/storage/event.go index 37eda30a..872dafd5 100644 --- a/pkg/storage/event.go +++ b/pkg/storage/event.go @@ -3,7 +3,7 @@ package storage import "github.com/oklog/ulid/v2" type ( - // Event is an event that occurs when an scheme.Spec is changed. + // Event is an event that occurs when a scheme.Spec is changed. Event struct { OP eventOP NodeID ulid.ULID @@ -12,7 +12,10 @@ type ( ) const ( + // EventInsert indicates an event for inserting a scheme.Spec. EventInsert eventOP = iota + // EventUpdate indicates an event for updating a scheme.Spec. EventUpdate + // EventDelete indicates an event for deleting a scheme.Spec. EventDelete ) diff --git a/pkg/storage/filter.go b/pkg/storage/filter.go index a06ce99e..cfadf569 100644 --- a/pkg/storage/filter.go +++ b/pkg/storage/filter.go @@ -5,26 +5,24 @@ import ( "github.com/siyul-park/uniflow/pkg/primitive" ) -type ( - // Filter is a filter for find matched primitive. - Filter struct { - OP database.OP - Key string - Value any - Children []*Filter - } +// Filter is a filter for finding matched document. +type Filter struct { + OP database.OP // Operator for the filter. + Key string // Key specifies the field for the filter. + Value any // Value is the filter value. + Children []*Filter // Children are nested filters for AND and OR operations. +} - filterHelper[T any] struct { - key string - } -) +type filterHelper[T any] struct { + key string +} +// Where creates a new filterHelper with the specified key. func Where[T any](key string) *filterHelper[T] { - return &filterHelper[T]{ - key: key, - } + return &filterHelper[T]{key: key} } +// EQ creates an equality filter. func (fh *filterHelper[T]) EQ(value T) *Filter { return &Filter{ OP: database.EQ, @@ -33,6 +31,7 @@ func (fh *filterHelper[T]) EQ(value T) *Filter { } } +// NE creates a not-equal filter. func (fh *filterHelper[T]) NE(value T) *Filter { return &Filter{ OP: database.NE, @@ -49,6 +48,7 @@ func (fh *filterHelper[T]) LT(value T) *Filter { } } +// LTE creates a less-than-or-equal filter. func (fh *filterHelper[T]) LTE(value T) *Filter { return &Filter{ OP: database.LTE, @@ -57,6 +57,7 @@ func (fh *filterHelper[T]) LTE(value T) *Filter { } } +// GT creates a greater-than filter. func (fh *filterHelper[T]) GT(value T) *Filter { return &Filter{ OP: database.GT, @@ -65,6 +66,7 @@ func (fh *filterHelper[T]) GT(value T) *Filter { } } +// GTE creates a greater-than-or-equal filter. func (fh *filterHelper[T]) GTE(value T) *Filter { return &Filter{ OP: database.GTE, @@ -73,6 +75,7 @@ func (fh *filterHelper[T]) GTE(value T) *Filter { } } +// IN creates a filter for values in a given slice. func (fh *filterHelper[T]) IN(slice ...T) *Filter { value := make([]any, len(slice)) for i, e := range slice { @@ -85,6 +88,7 @@ func (fh *filterHelper[T]) IN(slice ...T) *Filter { } } +// NotIN creates a filter for values not in a given slice. func (fh *filterHelper[T]) NotIN(slice ...T) *Filter { value := make([]any, len(slice)) for i, e := range slice { @@ -97,6 +101,7 @@ func (fh *filterHelper[T]) NotIN(slice ...T) *Filter { } } +// IsNull creates a filter for null values. func (fh *filterHelper[T]) IsNull() *Filter { return &Filter{ OP: database.NULL, @@ -104,6 +109,7 @@ func (fh *filterHelper[T]) IsNull() *Filter { } } +// IsNotNull creates a filter for non-null values. func (fh *filterHelper[T]) IsNotNull() *Filter { return &Filter{ OP: database.NNULL, @@ -111,6 +117,7 @@ func (fh *filterHelper[T]) IsNotNull() *Filter { } } +// And creates a filter that combines multiple filters with a logical AND. func (ft *Filter) And(x ...*Filter) *Filter { var v []*Filter for _, e := range append([]*Filter{ft}, x...) { @@ -125,6 +132,7 @@ func (ft *Filter) And(x ...*Filter) *Filter { } } +// Or creates a filter that combines multiple filters with a logical OR. func (ft *Filter) Or(x ...*Filter) *Filter { var v []*Filter for _, e := range append([]*Filter{ft}, x...) { @@ -139,6 +147,7 @@ func (ft *Filter) Or(x ...*Filter) *Filter { } } +// Encode encodes the filter to a database.Filter. func (ft *Filter) Encode() (*database.Filter, error) { if ft == nil { return nil, nil diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 8f1af900..c97098d9 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -11,96 +11,97 @@ import ( "github.com/siyul-park/uniflow/pkg/scheme" ) -type ( - // Config is a config for Storage. - Config struct { - Scheme *scheme.Scheme - Database database.Database - } +// Config is a configuration struct for Storage. +type Config struct { + Scheme *scheme.Scheme + Database database.Database +} - // Storage is the storage that stores scheme.Spec. - Storage struct { - scheme *scheme.Scheme - collection database.Collection - mu sync.RWMutex - } -) +// Storage is responsible for storing scheme.Spec. +type Storage struct { + scheme *scheme.Scheme + nodes database.Collection + mu sync.RWMutex +} -const ( - CollectionNodes = "nodes" -) +// CollectionNodes is the name of the nodes collection in the storage. +const CollectionNodes = "nodes" -var ( - indexes = []database.IndexModel{ - { - Name: "namespace_name", - Keys: []string{scheme.KeyNamespace, scheme.KeyName}, - Unique: true, - Partial: database.Where(scheme.KeyName).NE(primitive.NewString("")).And(database.Where(scheme.KeyName).IsNotNull()), - }, - } -) +var indexesNode = []database.IndexModel{ + { + Name: "namespace_name", + Keys: []string{scheme.KeyNamespace, scheme.KeyName}, + Unique: true, + Partial: database.Where(scheme.KeyName).NE(primitive.NewString("")).And(database.Where(scheme.KeyName).IsNotNull()), + }, +} -// New returns a new Storage. +// New creates a new Storage instance. func New(ctx context.Context, config Config) (*Storage, error) { scheme := config.Scheme db := config.Database - collection, err := db.Collection(ctx, CollectionNodes) + nodes, err := db.Collection(ctx, CollectionNodes) if err != nil { return nil, err } s := &Storage{ - scheme: scheme, - collection: collection, + scheme: scheme, + nodes: nodes, } - if exists, err := s.collection.Indexes().List(ctx); err != nil { + if err := s.ensureIndexes(ctx); err != nil { return nil, err - } else { - for _, index := range indexes { - index = database.IndexModel{ - Name: index.Name, - Keys: index.Keys, - Unique: index.Unique, - Partial: index.Partial, - } + } + + return s, nil +} + +// ensureIndexes creates required indexes for the storage collection. +func (s *Storage) ensureIndexes(ctx context.Context) error { + existingIndexes, err := s.nodes.Indexes().List(ctx) + if err != nil { + return err + } - var ok bool - for _, i := range exists { - if i.Name == index.Name { - if reflect.DeepEqual(i, index) { - s.collection.Indexes().Drop(ctx, i.Name) - } + for _, index := range indexesNode { + var indexExists bool + + for _, existingIndex := range existingIndexes { + if existingIndex.Name == index.Name { + indexExists = true + if !reflect.DeepEqual(existingIndex, index) { + s.nodes.Indexes().Drop(ctx, existingIndex.Name) break } } - if ok { - continue - } - s.collection.Indexes().Create(ctx, index) + } + + if !indexExists { + s.nodes.Indexes().Create(ctx, index) } } - return s, nil + return nil } -// Watch returns Stream to track changes. +// Watch returns a Stream to track changes based on the provided filter. func (s *Storage) Watch(ctx context.Context, filter *Filter) (*Stream, error) { f, err := filter.Encode() if err != nil { return nil, err } - stream, err := s.collection.Watch(ctx, f) + stream, err := s.nodes.Watch(ctx, f) if err != nil { return nil, err } + return NewStream(stream), nil } -// InsertOne inserts a single scheme.Spec and return ID. +// InsertOne inserts a single scheme.Spec and returns its ID. func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -110,8 +111,9 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e if err := unstructured.Marshal(spec); err != nil { return ulid.ULID{}, err } + if unstructured.GetNamespace() == "" { - unstructured.SetNamespace(scheme.NamespaceDefault) + unstructured.SetNamespace(scheme.DefaultNamespace) } if unstructured.GetID() == (ulid.ULID{}) { unstructured.SetID(ulid.Make()) @@ -122,30 +124,36 @@ func (s *Storage) InsertOne(ctx context.Context, spec scheme.Spec) (ulid.ULID, e } var id ulid.ULID - if pk, err := s.collection.InsertOne(ctx, unstructured.Doc()); err != nil { + + pk, err := s.nodes.InsertOne(ctx, unstructured.Doc()) + if err != nil { return ulid.ULID{}, err - } else if err := primitive.Unmarshal(pk, &id); err != nil { - _, _ = s.collection.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk)) + } + + if err := primitive.Unmarshal(pk, &id); err != nil { + _, _ = s.nodes.DeleteOne(ctx, database.Where(scheme.KeyID).EQ(pk)) return ulid.ULID{}, err - } else { - return id, nil } + + return id, nil } -// InsertMany inserts multiple scheme.Spec and return IDs. +// InsertMany inserts multiple scheme.Spec instances and returns their IDs. func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.ULID, error) { s.mu.RLock() defer s.mu.RUnlock() var docs []*primitive.Map + for _, spec := range objs { unstructured := scheme.NewUnstructured(nil) if err := unstructured.Marshal(spec); err != nil { return nil, err } + if unstructured.GetNamespace() == "" { - unstructured.SetNamespace(scheme.NamespaceDefault) + unstructured.SetNamespace(scheme.DefaultNamespace) } if unstructured.GetID() == (ulid.ULID{}) { unstructured.SetID(ulid.Make()) @@ -158,15 +166,18 @@ func (s *Storage) InsertMany(ctx context.Context, objs []scheme.Spec) ([]ulid.UL docs = append(docs, unstructured.Doc()) } - ids := make([]ulid.ULID, 0) - if pks, err := s.collection.InsertMany(ctx, docs); err != nil { + pks, err := s.nodes.InsertMany(ctx, docs) + if err != nil { return nil, err - } else if err := primitive.Unmarshal(primitive.NewSlice(pks...), &ids); err != nil { - _, _ = s.collection.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...)) + } + + var ids []ulid.ULID + if err := primitive.Unmarshal(primitive.NewSlice(pks...), &ids); err != nil { + _, _ = s.nodes.DeleteMany(ctx, database.Where(scheme.KeyID).IN(pks...)) return nil, err - } else { - return ids, nil } + + return ids, nil } // UpdateOne updates a single scheme.Spec and returns success or failure. @@ -179,8 +190,9 @@ func (s *Storage) UpdateOne(ctx context.Context, spec scheme.Spec) (bool, error) if err := unstructured.Marshal(spec); err != nil { return false, err } + if unstructured.GetNamespace() == "" { - unstructured.SetNamespace(scheme.NamespaceDefault) + unstructured.SetNamespace(scheme.DefaultNamespace) } if unstructured.GetID() == (ulid.ULID{}) { return false, nil @@ -191,23 +203,25 @@ func (s *Storage) UpdateOne(ctx context.Context, spec scheme.Spec) (bool, error) } filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode() - return s.collection.UpdateOne(ctx, filter, unstructured.Doc()) + return s.nodes.UpdateOne(ctx, filter, unstructured.Doc()) } -// UpdateMany multiple scheme.Spec and return the number of success. +// UpdateMany updates multiple scheme.Spec instances and returns the number of successes. func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, error) { s.mu.RLock() defer s.mu.RUnlock() var unstructureds []*scheme.Unstructured + for _, spec := range objs { unstructured := scheme.NewUnstructured(nil) if err := unstructured.Marshal(spec); err != nil { return 0, err } + if unstructured.GetNamespace() == "" { - unstructured.SetNamespace(scheme.NamespaceDefault) + unstructured.SetNamespace(scheme.DefaultNamespace) } if unstructured.GetID() == (ulid.ULID{}) { continue @@ -223,10 +237,10 @@ func (s *Storage) UpdateMany(ctx context.Context, objs []scheme.Spec) (int, erro count := 0 for _, unstructured := range unstructureds { filter, _ := Where[ulid.ULID](scheme.KeyID).EQ(unstructured.GetID()).Encode() - if ok, err := s.collection.UpdateOne(ctx, filter, unstructured.Doc()); err != nil { + if ok, err := s.nodes.UpdateOne(ctx, filter, unstructured.Doc()); err != nil { return count, err } else if ok { - count += 1 + count++ } } @@ -243,10 +257,10 @@ func (s *Storage) DeleteOne(ctx context.Context, filter *Filter) (bool, error) { return false, err } - return s.collection.DeleteOne(ctx, f) + return s.nodes.DeleteOne(ctx, f) } -// DeleteMany deletes multiple scheme.Spec and returns the number of success. +// DeleteMany deletes multiple scheme.Spec instances and returns the number of successes. func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -256,10 +270,10 @@ func (s *Storage) DeleteMany(ctx context.Context, filter *Filter) (int, error) { return 0, err } - return s.collection.DeleteMany(ctx, f) + return s.nodes.DeleteMany(ctx, f) } -// FindOne return the single scheme.Spec which is matched by the filter. +// FindOne returns a single scheme.Spec matched by the filter. func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*database.FindOptions) (scheme.Spec, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -269,23 +283,26 @@ func (s *Storage) FindOne(ctx context.Context, filter *Filter, options ...*datab return nil, err } - if doc, err := s.collection.FindOne(ctx, f, options...); err != nil { + doc, err := s.nodes.FindOne(ctx, f, options...) + if err != nil { return nil, err - } else if doc != nil { - unstructured := scheme.NewUnstructured(doc) - if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { - return unstructured, nil - } else if err := unstructured.Unmarshal(spec); err != nil { - return nil, err - } else { - return spec, nil - } } - return nil, nil + if doc == nil { + return nil, nil + } + + unstructured := scheme.NewUnstructured(doc) + if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { + return unstructured, nil + } else if err := unstructured.Unmarshal(spec); err != nil { + return nil, err + } else { + return spec, nil + } } -// FindMany returns multiple scheme.Spec which is matched by the filter. +// FindMany returns multiple scheme.Spec instances matched by the filter. func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*database.FindOptions) ([]scheme.Spec, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -295,26 +312,28 @@ func (s *Storage) FindMany(ctx context.Context, filter *Filter, options ...*data return nil, err } - var specs []scheme.Spec - if docs, err := s.collection.FindMany(ctx, f, options...); err != nil { + docs, err := s.nodes.FindMany(ctx, f, options...) + if err != nil { return nil, err - } else { - for _, doc := range docs { - if doc == nil { - continue - } - unstructured := scheme.NewUnstructured(doc) - if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { - specs = append(specs, unstructured) - } else if err := unstructured.Unmarshal(spec); err != nil { - return nil, err - } else { - specs = append(specs, spec) - } + } + + var specs []scheme.Spec + for _, doc := range docs { + if doc == nil { + continue } - return specs, nil + unstructured := scheme.NewUnstructured(doc) + if spec, ok := s.scheme.New(unstructured.GetKind()); !ok { + specs = append(specs, unstructured) + } else if err := unstructured.Unmarshal(spec); err != nil { + return nil, err + } else { + specs = append(specs, spec) + } } + + return specs, nil } func (s *Storage) validate(unstructured *scheme.Unstructured) error { diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 0cc5a508..19d19373 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -31,7 +31,7 @@ func TestStorage_Watch(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } stream, err := st.Watch(context.Background(), nil) @@ -100,12 +100,12 @@ func TestStorage_InsertMany(t *testing.T) { &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, }, &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, }, } @@ -169,12 +169,12 @@ func TestStorage_UpdateMany(t *testing.T) { &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, }, &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, }, } @@ -208,7 +208,7 @@ func TestStorage_DeleteOne(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } ok, err := st.DeleteOne(context.Background(), Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) @@ -241,7 +241,7 @@ func TestStorage_DeleteMany(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } count, err := st.DeleteMany(context.Background(), Where[ulid.ULID](scheme.KeyID).EQ(spec.GetID())) @@ -275,7 +275,7 @@ func TestStorage_FindOne(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } _, _ = st.InsertOne(context.Background(), spec) @@ -305,7 +305,7 @@ func TestStorage_FindOne(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.Word(), } @@ -337,7 +337,7 @@ func TestStorage_FindMany(t *testing.T) { spec := &scheme.SpecMeta{ ID: ulid.Make(), Kind: kind, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, } _, _ = st.InsertOne(context.Background(), spec) diff --git a/pkg/storage/stream.go b/pkg/storage/stream.go index 75dcf480..71fb2399 100644 --- a/pkg/storage/stream.go +++ b/pkg/storage/stream.go @@ -7,10 +7,10 @@ import ( ) type ( - // Stream is a stream to track scheme.Spec is changed. + // Stream is a stream to track scheme.Spec changes. Stream struct { - stream database.Stream - channel chan Event + stream database.Stream + channel chan Event done chan struct{} } ) @@ -59,7 +59,7 @@ func NewStream(stream database.Stream) *Stream { return s } -// Next returns a channel that is received Event. +// Next returns a channel that receives Event. func (s *Stream) Next() <-chan Event { return s.channel } diff --git a/pkg/symbol/loadhook.go b/pkg/symbol/loadhook.go index f0ef0b31..005a1fa5 100644 --- a/pkg/symbol/loadhook.go +++ b/pkg/symbol/loadhook.go @@ -3,16 +3,19 @@ package symbol import "github.com/siyul-park/uniflow/pkg/node" type ( - // LoadHook is a hook that is called node.Node is loaded. + // LoadHook is an interface for hooks that are called when node.Node is loaded. LoadHook interface { + // Load is called when node.Node is loaded. Load(n node.Node) error } + // LoadHookFunc is a function type that implements the LoadHook interface. LoadHookFunc func(n node.Node) error ) var _ LoadHook = LoadHookFunc(func(n node.Node) error { return nil }) +// Load is the implementation of the Load method for LoadHookFunc. func (f LoadHookFunc) Load(n node.Node) error { return f(n) } diff --git a/pkg/symbol/symbol.go b/pkg/symbol/symbol.go index f357954a..efb725f9 100644 --- a/pkg/symbol/symbol.go +++ b/pkg/symbol/symbol.go @@ -7,13 +7,11 @@ import ( "github.com/siyul-park/uniflow/pkg/scheme" ) -type ( - // Symbol represents an object that binds a Node and a Spec. - Symbol struct { - Node node.Node - Spec scheme.Spec - } -) +// Symbol represents an object that binds a Node and a Spec. +type Symbol struct { + Node node.Node + Spec scheme.Spec +} var _ node.Node = (*Symbol)(nil) diff --git a/pkg/symbol/symbol_test.go b/pkg/symbol/symbol_test.go index b4a9f332..075871e9 100644 --- a/pkg/symbol/symbol_test.go +++ b/pkg/symbol/symbol_test.go @@ -16,7 +16,7 @@ func TestSymbol_Getter(t *testing.T) { spec := &scheme.SpecMeta{ ID: n.ID(), Kind: faker.Word(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), Links: map[string][]scheme.PortLocation{ node.PortOut: { diff --git a/pkg/symbol/table_test.go b/pkg/symbol/table_test.go index 145242a6..e0bf1b19 100644 --- a/pkg/symbol/table_test.go +++ b/pkg/symbol/table_test.go @@ -25,7 +25,7 @@ func TestTable_Insert(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: n1.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -37,7 +37,7 @@ func TestTable_Insert(t *testing.T) { } spec2 := &scheme.SpecMeta{ ID: n2.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -49,7 +49,7 @@ func TestTable_Insert(t *testing.T) { } spec3 := &scheme.SpecMeta{ ID: n3.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -103,7 +103,7 @@ func TestTable_Insert(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: id, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -115,7 +115,7 @@ func TestTable_Insert(t *testing.T) { } spec2 := &scheme.SpecMeta{ ID: id, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -127,7 +127,7 @@ func TestTable_Insert(t *testing.T) { } spec3 := &scheme.SpecMeta{ ID: n3.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -139,7 +139,7 @@ func TestTable_Insert(t *testing.T) { } spec4 := &scheme.SpecMeta{ ID: n4.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -188,17 +188,17 @@ func TestTable_Insert(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: n1.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), } spec2 := &scheme.SpecMeta{ ID: n2.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), } spec3 := &scheme.SpecMeta{ ID: n3.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), } @@ -271,22 +271,22 @@ func TestTable_Insert(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: id, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: name, } spec2 := &scheme.SpecMeta{ ID: id, - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: name, } spec3 := &scheme.SpecMeta{ ID: n3.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), } spec4 := &scheme.SpecMeta{ ID: n4.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.UUIDHyphenated(), } @@ -361,7 +361,7 @@ func TestTable_Free(t *testing.T) { spec1 := &scheme.SpecMeta{ ID: n1.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -373,7 +373,7 @@ func TestTable_Free(t *testing.T) { } spec2 := &scheme.SpecMeta{ ID: n2.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -385,7 +385,7 @@ func TestTable_Free(t *testing.T) { } spec3 := &scheme.SpecMeta{ ID: n3.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Links: map[string][]scheme.PortLocation{ node.PortOut: { { @@ -452,7 +452,7 @@ func TestTable_LookupByName(t *testing.T) { defer n.Close() spec := &scheme.SpecMeta{ ID: n.ID(), - Namespace: scheme.NamespaceDefault, + Namespace: scheme.DefaultNamespace, Name: faker.Word(), } sym := &Symbol{Node: n, Spec: spec} diff --git a/pkg/symbol/unloadhook.go b/pkg/symbol/unloadhook.go index 70e50769..2dc61295 100644 --- a/pkg/symbol/unloadhook.go +++ b/pkg/symbol/unloadhook.go @@ -3,16 +3,19 @@ package symbol import "github.com/siyul-park/uniflow/pkg/node" type ( - // UnloadHook is a hook that is called node.Node is unloaded. + // UnloadHook is an interface for hooks that are called when node.Node is unloaded. UnloadHook interface { + // Unload is called when node.Node is unloaded. Unload(n node.Node) error } + // UnloadHookFunc is a function type that implements the UnloadHook interface. UnloadHookFunc func(n node.Node) error ) var _ UnloadHook = UnloadHookFunc(func(n node.Node) error { return nil }) +// Unload is the implementation of the Unload method for UnloadHookFunc. func (f UnloadHookFunc) Unload(n node.Node) error { return f(n) }