From 856db0a03873ce4f7320246f34ca49dbb08b1b47 Mon Sep 17 00:00:00 2001 From: siyual-park Date: Mon, 18 Nov 2024 20:35:25 +0900 Subject: [PATCH] feat: support default spec --- README.md | 4 - README_kr.md | 4 - cmd/pkg/uniflow/main.go | 17 +++ docs/architecture.md | 4 - docs/architecture_kr.md | 4 - examples/httpproxy.yaml | 4 - examples/ping.yaml | 4 - examples/system.yaml | 4 - examples/wsproxy.yaml | 4 - ext/pkg/control/block.go | 16 +-- ext/pkg/control/block_test.go | 4 +- ext/pkg/control/builder.go | 65 ++++------ ext/pkg/io/builder.go | 26 ++-- ext/pkg/network/builder.go | 35 ++--- ext/pkg/system/builder.go | 19 ++- pkg/chart/chart.go | 11 +- pkg/chart/linker.go | 9 ++ pkg/process/local.go | 5 +- pkg/runtime/runtime.go | 2 +- pkg/scheme/register.go | 12 ++ pkg/scheme/register_test.go | 27 ++++ pkg/scheme/scheme.go | 233 ++++++++++++++++++++++++++++++++-- pkg/scheme/scheme_test.go | 58 ++++++++- pkg/spec/spec.go | 89 ------------- pkg/spec/spec_test.go | 51 -------- pkg/symbol/loader.go | 8 +- 26 files changed, 443 insertions(+), 276 deletions(-) create mode 100644 pkg/scheme/register_test.go diff --git a/README.md b/README.md index 186cd593..3e8d7b8e 100644 --- a/README.md +++ b/README.md @@ -45,14 +45,10 @@ Try a basic HTTP request handler using [ping.yaml](./examples/ping.yaml): - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/README_kr.md b/README_kr.md index 1fabeb30..f831134a 100644 --- a/README_kr.md +++ b/README_kr.md @@ -45,14 +45,10 @@ make build - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/cmd/pkg/uniflow/main.go b/cmd/pkg/uniflow/main.go index 5578dd62..eb53fc0b 100644 --- a/cmd/pkg/uniflow/main.go +++ b/cmd/pkg/uniflow/main.go @@ -151,6 +151,23 @@ func main() { schemeBuilder.Register(network.AddToScheme()) schemeBuilder.Register(system.AddToScheme(nativeTable)) + schemeBuilder.Register(scheme.AddKnownValues(map[string]spec.Spec{ + network.KindListener: &spec.Unstructured{ + Meta: spec.Meta{ + Env: map[string][]spec.Value{ + "PORT": { + { + Data: "{{ .PORT }}", + }, + }, + }, + }, + Fields: map[string]any{ + "port": "{{ .PORT }}", + }, + }, + })) + hookBuilder.Register(control.AddToHook()) hookBuilder.Register(network.AddToHook()) diff --git a/docs/architecture.md b/docs/architecture.md index 53807f20..83d3cc2f 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -38,7 +38,6 @@ Users can update node specifications by using a Command-Line Interface (CLI) or - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -46,9 +45,6 @@ Users can update node specifications by using a Command-Line Interface (CLI) or error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/docs/architecture_kr.md b/docs/architecture_kr.md index 8821c9ca..e985296a 100644 --- a/docs/architecture_kr.md +++ b/docs/architecture_kr.md @@ -36,7 +36,6 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -44,9 +43,6 @@ error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/httpproxy.yaml b/examples/httpproxy.yaml index ce24c70f..584e7f93 100644 --- a/examples/httpproxy.yaml +++ b/examples/httpproxy.yaml @@ -1,14 +1,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: proxy port: in - env: - PORT: - data: '{{ .PORT }}' - kind: proxy name: proxy diff --git a/examples/ping.yaml b/examples/ping.yaml index 506ddb35..36e1ae81 100644 --- a/examples/ping.yaml +++ b/examples/ping.yaml @@ -1,14 +1,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/system.yaml b/examples/system.yaml index 36599633..f72df155 100644 --- a/examples/system.yaml +++ b/examples/system.yaml @@ -1,7 +1,6 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router @@ -9,9 +8,6 @@ error: - name: catch port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/examples/wsproxy.yaml b/examples/wsproxy.yaml index 72e0b465..5884f1ef 100644 --- a/examples/wsproxy.yaml +++ b/examples/wsproxy.yaml @@ -10,14 +10,10 @@ - kind: listener name: listener protocol: http - port: '{{ .PORT }}' ports: out: - name: router port: in - env: - PORT: - data: '{{ .PORT }}' - kind: router name: router diff --git a/ext/pkg/control/block.go b/ext/pkg/control/block.go index 1b8407e9..71de94f0 100644 --- a/ext/pkg/control/block.go +++ b/ext/pkg/control/block.go @@ -13,7 +13,7 @@ import ( // BlockNodeSpec defines the specification for creating a BlockNode. type BlockNodeSpec struct { spec.Meta `map:",inline"` - Specs []*spec.Unstructured `map:"specs"` + Specs []spec.Spec `map:"specs"` } // BlockNode systematically manages complex data processing flows and executes multiple sub-nodes sequentially. @@ -32,17 +32,17 @@ var _ node.Node = (*BlockNode)(nil) // NewBlockNodeCodec creates a new codec for BlockNodeSpec. func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec { - return scheme.CodecWithType(func(spec *BlockNodeSpec) (node.Node, error) { - symbols := make([]*symbol.Symbol, 0, len(spec.Specs)) - for _, sp := range spec.Specs { - decoded, err := s.Decode(sp) + return scheme.CodecWithType(func(sp *BlockNodeSpec) (node.Node, error) { + symbols := make([]*symbol.Symbol, 0, len(sp.Specs)) + for _, sp := range sp.Specs { + sp, err := s.Decode(sp) if err != nil { for _, n := range symbols { n.Close() } - return nil, err } - n, err := s.Compile(decoded) + + n, err := s.Compile(sp) if err != nil { for _, n := range symbols { n.Close() @@ -50,7 +50,7 @@ func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec { return nil, err } symbols = append(symbols, &symbol.Symbol{ - Spec: decoded, + Spec: sp, Node: n, }) } diff --git a/ext/pkg/control/block_test.go b/ext/pkg/control/block_test.go index 6cca679c..6ee9b071 100644 --- a/ext/pkg/control/block_test.go +++ b/ext/pkg/control/block_test.go @@ -31,8 +31,8 @@ func TestBlockNodeCodec_Decode(t *testing.T) { codec := NewBlockNodeCodec(s) spec := &BlockNodeSpec{ - Specs: []*spec.Unstructured{ - { + Specs: []spec.Spec{ + &spec.Unstructured{ Meta: spec.Meta{ ID: uuid.Must(uuid.NewV7()), Kind: kind, diff --git a/ext/pkg/control/builder.go b/ext/pkg/control/builder.go index 91cb9298..70d5873a 100644 --- a/ext/pkg/control/builder.go +++ b/ext/pkg/control/builder.go @@ -4,6 +4,7 @@ import ( "github.com/siyul-park/uniflow/ext/pkg/language" "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" "github.com/siyul-park/uniflow/pkg/symbol" ) @@ -40,47 +41,31 @@ func AddToScheme(module *language.Module, lang string) scheme.Register { return err } - s.AddKnownType(KindBlock, &BlockNodeSpec{}) - s.AddCodec(KindBlock, NewBlockNodeCodec(s)) - - s.AddKnownType(KindPipe, &PipeNodeSpec{}) - s.AddCodec(KindPipe, NewPipeNodeCodec()) - - s.AddKnownType(KindFork, &ForkNodeSpec{}) - s.AddCodec(KindFork, NewForkNodeCodec()) - - s.AddKnownType(KindIf, &IfNodeSpec{}) - s.AddCodec(KindIf, NewIfNodeCodec(expr)) - - s.AddKnownType(KindLoop, &LoopNodeSpec{}) - s.AddCodec(KindLoop, NewLoopNodeCodec()) - - s.AddKnownType(KindMerge, &MergeNodeSpec{}) - s.AddCodec(KindMerge, NewMergeNodeCodec()) - - s.AddKnownType(KindNOP, &NOPNodeSpec{}) - s.AddCodec(KindNOP, NewNOPNodeCodec()) - - s.AddKnownType(KindReduce, &ReduceNodeSpec{}) - s.AddCodec(KindReduce, NewReduceNodeCodec(expr)) - - s.AddKnownType(KindRetry, &RetryNodeSpec{}) - s.AddCodec(KindRetry, NewRetryNodeCodec()) - - s.AddKnownType(KindSession, &SessionNodeSpec{}) - s.AddCodec(KindSession, NewSessionNodeCodec()) - - s.AddKnownType(KindSnippet, &SnippetNodeSpec{}) - s.AddCodec(KindSnippet, NewSnippetNodeCodec(module)) - - s.AddKnownType(KindSplit, &SplitNodeSpec{}) - s.AddCodec(KindSplit, NewSplitNodeCodec()) - - s.AddKnownType(KindSwitch, &SwitchNodeSpec{}) - s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr)) + definitions := []struct { + kind string + codec scheme.Codec + spec spec.Spec + }{ + {KindBlock, NewBlockNodeCodec(s), &BlockNodeSpec{}}, + {KindPipe, NewPipeNodeCodec(), &PipeNodeSpec{}}, + {KindFork, NewForkNodeCodec(), &ForkNodeSpec{}}, + {KindIf, NewIfNodeCodec(expr), &IfNodeSpec{}}, + {KindLoop, NewLoopNodeCodec(), &LoopNodeSpec{}}, + {KindMerge, NewMergeNodeCodec(), &MergeNodeSpec{}}, + {KindNOP, NewNOPNodeCodec(), &NOPNodeSpec{}}, + {KindReduce, NewReduceNodeCodec(expr), &ReduceNodeSpec{}}, + {KindRetry, NewRetryNodeCodec(), &RetryNodeSpec{}}, + {KindSession, NewSessionNodeCodec(), &SessionNodeSpec{}}, + {KindSnippet, NewSnippetNodeCodec(module), &SnippetNodeSpec{}}, + {KindSplit, NewSplitNodeCodec(), &SplitNodeSpec{}}, + {KindSwitch, NewSwitchNodeCodec(expr), &SwitchNodeSpec{}}, + {KindWait, NewWaitNodeCodec(), &WaitNodeSpec{}}, + } - s.AddKnownType(KindWait, &WaitNodeSpec{}) - s.AddCodec(KindWait, NewWaitNodeCodec()) + for _, def := range definitions { + s.AddKnownType(def.kind, def.spec) + s.AddCodec(def.kind, def.codec) + } return nil }) diff --git a/ext/pkg/io/builder.go b/ext/pkg/io/builder.go index fe20d9c7..a89088f7 100644 --- a/ext/pkg/io/builder.go +++ b/ext/pkg/io/builder.go @@ -1,19 +1,29 @@ package io -import "github.com/siyul-park/uniflow/pkg/scheme" +import ( + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" +) // AddToScheme returns a function that adds node types and codecs to the provided spec. func AddToScheme(fs FileSystem) scheme.Register { return scheme.RegisterFunc(func(s *scheme.Scheme) error { - s.AddKnownType(KindSQL, &SQLNodeSpec{}) - s.AddCodec(KindSQL, NewSQLNodeCodec()) + definitions := []struct { + kind string + codec scheme.Codec + spec spec.Spec + }{ + {KindSQL, NewSQLNodeCodec(), &SQLNodeSpec{}}, + {KindPrint, NewPrintNodeCodec(fs), &PrintNodeSpec{}}, + {KindScan, NewScanNodeCodec(fs), &ScanNodeSpec{}}, + } - s.AddKnownType(KindPrint, &PrintNodeSpec{}) - s.AddCodec(KindPrint, NewPrintNodeCodec(fs)) - - s.AddKnownType(KindScan, &ScanNodeSpec{}) - s.AddCodec(KindScan, NewScanNodeCodec(fs)) + for _, def := range definitions { + s.AddKnownType(def.kind, def.spec) + s.AddCodec(def.kind, def.codec) + } return nil }) + } diff --git a/ext/pkg/network/builder.go b/ext/pkg/network/builder.go index 1e0c9414..97001730 100644 --- a/ext/pkg/network/builder.go +++ b/ext/pkg/network/builder.go @@ -3,6 +3,7 @@ package network import ( "github.com/siyul-park/uniflow/pkg/hook" "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" "github.com/siyul-park/uniflow/pkg/symbol" ) @@ -30,23 +31,23 @@ func AddToHook() hook.Register { // AddToScheme returns a function that adds node types and codecs to the provided spec. func AddToScheme() scheme.Register { return scheme.RegisterFunc(func(s *scheme.Scheme) error { - s.AddKnownType(KindHTTP, &HTTPNodeSpec{}) - s.AddCodec(KindHTTP, NewHTTPNodeCodec()) - - s.AddKnownType(KindListener, &ListenNodeSpec{}) - s.AddCodec(KindListener, NewListenNodeCodec()) - - s.AddKnownType(KindProxy, &ProxyNodeSpec{}) - s.AddCodec(KindProxy, NewProxyNodeCodec()) - - s.AddKnownType(KindRouter, &RouteNodeSpec{}) - s.AddCodec(KindRouter, NewRouteNodeCodec()) - - s.AddKnownType(KindWebSocket, &WebSocketNodeSpec{}) - s.AddCodec(KindWebSocket, NewWebSocketNodeCodec()) - - s.AddKnownType(KindGateway, &GatewayNodeSpec{}) - s.AddCodec(KindGateway, NewGatewayNodeCodec()) + definitions := []struct { + kind string + codec scheme.Codec + spec spec.Spec + }{ + {KindHTTP, NewHTTPNodeCodec(), &HTTPNodeSpec{}}, + {KindListener, NewListenNodeCodec(), &ListenNodeSpec{}}, + {KindProxy, NewProxyNodeCodec(), &ProxyNodeSpec{}}, + {KindRouter, NewRouteNodeCodec(), &RouteNodeSpec{}}, + {KindWebSocket, NewWebSocketNodeCodec(), &WebSocketNodeSpec{}}, + {KindGateway, NewGatewayNodeCodec(), &GatewayNodeSpec{}}, + } + + for _, def := range definitions { + s.AddKnownType(def.kind, def.spec) + s.AddCodec(def.kind, def.codec) + } return nil }) diff --git a/ext/pkg/system/builder.go b/ext/pkg/system/builder.go index 15415e22..0489b767 100644 --- a/ext/pkg/system/builder.go +++ b/ext/pkg/system/builder.go @@ -1,12 +1,25 @@ package system -import "github.com/siyul-park/uniflow/pkg/scheme" +import ( + "github.com/siyul-park/uniflow/pkg/scheme" + "github.com/siyul-park/uniflow/pkg/spec" +) // AddToScheme returns a function that adds node types and codecs to the provided spec. func AddToScheme(table *NativeTable) scheme.Register { return scheme.RegisterFunc(func(s *scheme.Scheme) error { - s.AddKnownType(KindNative, &NativeNodeSpec{}) - s.AddCodec(KindNative, NewNativeNodeCodec(table)) + definitions := []struct { + kind string + codec scheme.Codec + spec spec.Spec + }{ + {KindNative, NewNativeNodeCodec(table), &NativeNodeSpec{}}, + } + + for _, def := range definitions { + s.AddKnownType(def.kind, def.spec) + s.AddCodec(def.kind, def.codec) + } return nil }) diff --git a/pkg/chart/chart.go b/pkg/chart/chart.go index 2a93741e..fbac6830 100644 --- a/pkg/chart/chart.go +++ b/pkg/chart/chart.go @@ -139,7 +139,7 @@ func (c *Chart) Build(sp spec.Spec) ([]spec.Spec, error) { env := map[string][]spec.Value{} for key, vals := range c.Env { for _, val := range vals { - if val.IsIdentified() { + if !val.IsIdentified() { v, err := template.Execute(val.Value, data) if err != nil { return nil, err @@ -162,14 +162,11 @@ func (c *Chart) Build(sp spec.Spec) ([]spec.Spec, error) { return nil, err } - unstructured.SetEnv(env) - - bind, err := spec.Bind(unstructured) - if err != nil { - return nil, err + if len(env) > 0 { + unstructured.SetEnv(env) } - specs = append(specs, bind) + specs = append(specs, unstructured) } return specs, nil } diff --git a/pkg/chart/linker.go b/pkg/chart/linker.go index 819ca0aa..8df0ef52 100644 --- a/pkg/chart/linker.go +++ b/pkg/chart/linker.go @@ -58,6 +58,15 @@ func (l *Linker) Link(chrt *Chart) error { symbols := make([]*symbol.Symbol, 0, len(specs)) for _, sp := range specs { + if build, err := l.scheme.Decode(sp); err != nil { + for _, sb := range symbols { + sb.Close() + } + return nil, err + } else { + sp = build + } + n, err := l.scheme.Compile(sp) if err != nil { for _, sb := range symbols { diff --git a/pkg/process/local.go b/pkg/process/local.go index bce6a323..186b0306 100644 --- a/pkg/process/local.go +++ b/pkg/process/local.go @@ -22,10 +22,11 @@ func (l *Local[T]) AddStoreHook(proc *Process, hook StoreHook[T]) bool { l.mu.Lock() defer l.mu.Unlock() - if _, ok := l.data[proc]; ok { + if val, ok := l.data[proc]; ok { l.mu.Unlock() defer l.mu.Lock() - hook.Store(l.data[proc]) + + hook.Store(val) return true } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index cf91e907..8a2890a5 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -212,7 +212,7 @@ func (r *Runtime) Reconcile(ctx context.Context) error { var specs []spec.Spec for _, id := range r.symbolTable.Keys() { sb := r.symbolTable.Lookup(id) - if sb != nil && spec.IsBound(sb.Spec, secrets...) { + if sb != nil && r.scheme.IsBound(sb.Spec, secrets...) { specs = append(specs, sb.Spec) } } diff --git a/pkg/scheme/register.go b/pkg/scheme/register.go index 206fb136..47121a01 100644 --- a/pkg/scheme/register.go +++ b/pkg/scheme/register.go @@ -1,5 +1,7 @@ package scheme +import "github.com/siyul-park/uniflow/pkg/spec" + // Register defines an interface for registering types with a Scheme. type Register interface { // AddToScheme adds types to the given Scheme. @@ -12,6 +14,16 @@ type register struct { var _ Register = (*register)(nil) +// AddKnownValues registers the provided specs into the scheme as known values. +func AddKnownValues(specs map[string]spec.Spec) Register { + return RegisterFunc(func(s *Scheme) error { + for kind, sp := range specs { + s.AddKnownValue(kind, sp) + } + return nil + }) +} + // RegisterFunc creates a new Register from the provided function. func RegisterFunc(addToScheme func(*Scheme) error) Register { return ®ister{addToScheme: addToScheme} diff --git a/pkg/scheme/register_test.go b/pkg/scheme/register_test.go new file mode 100644 index 00000000..a7fd3041 --- /dev/null +++ b/pkg/scheme/register_test.go @@ -0,0 +1,27 @@ +package scheme + +import ( + "github.com/go-faker/faker/v4" + "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/spec" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestAddKnownValues(t *testing.T) { + s := New() + kind := faker.UUIDHyphenated() + + meta := &spec.Meta{ + Kind: kind, + ID: uuid.Must(uuid.NewV7()), + } + + register := AddKnownValues(map[string]spec.Spec{ + kind: meta, + }) + + err := register.AddToScheme(s) + assert.NoError(t, err) + assert.NotNil(t, s.KnownValue(kind)) +} diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 3233f7c5..f2c6351d 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -2,6 +2,9 @@ package scheme import ( "github.com/gofrs/uuid" + "github.com/siyul-park/uniflow/pkg/resource" + "github.com/siyul-park/uniflow/pkg/secret" + "github.com/siyul-park/uniflow/pkg/template" "reflect" "slices" "sync" @@ -16,6 +19,7 @@ import ( // Scheme manages type information and decodes spec implementations into node objects within a workflow environment. type Scheme struct { types map[string]reflect.Type + values map[string]reflect.Value codecs map[string]Codec mu sync.RWMutex } @@ -26,6 +30,7 @@ var _ Codec = (*Scheme)(nil) func New() *Scheme { return &Scheme{ types: make(map[string]reflect.Type), + values: make(map[string]reflect.Value), codecs: make(map[string]Codec), } } @@ -52,6 +57,9 @@ func (s *Scheme) AddKnownType(kind string, sp spec.Spec) bool { s.mu.Lock() defer s.mu.Unlock() + if sp == nil { + return false + } if _, ok := s.types[kind]; ok { return false } @@ -79,6 +87,47 @@ func (s *Scheme) KnownType(kind string) reflect.Type { return s.types[kind] } +// AddKnownValue associates a default value with a kind and returns true if successful. +func (s *Scheme) AddKnownValue(kind string, sp spec.Spec) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if sp == nil { + return false + } + if _, ok := s.values[kind]; ok { + return false + } + s.values[kind] = reflect.ValueOf(sp) + return true +} + +// RemoveKnownValue removes the default value associated with a kind. +func (s *Scheme) RemoveKnownValue(kind string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.values[kind]; ok { + delete(s.values, kind) + return true + } + return false +} + +// KnownValue retrieves a structured spec based on the default value associated with the kind of the input spec. +func (s *Scheme) KnownValue(kind string) spec.Spec { + s.mu.RLock() + defer s.mu.RUnlock() + + value, ok := s.values[kind] + if !ok { + return nil + } + + sp, _ := s.clone(value).Interface().(spec.Spec) + return sp +} + // AddCodec associates a codec with a specific kind and returns true if successful. func (s *Scheme) AddCodec(kind string, codec Codec) bool { s.mu.Lock() @@ -111,7 +160,47 @@ func (s *Scheme) Codec(kind string) Codec { return s.codecs[kind] } -// Compile decodes the given spec into node using the associated codec. +// IsBound checks if the spec is bound to any of the provided secrets. +func (s *Scheme) IsBound(sp spec.Spec, secrets ...*secret.Secret) bool { + for _, values := range sp.GetEnv() { + for _, val := range values { + examples := make([]*secret.Secret, 0, 2) + if val.ID != uuid.Nil { + examples = append(examples, &secret.Secret{ID: val.ID}) + } + if val.Name != "" { + examples = append(examples, &secret.Secret{Namespace: sp.GetNamespace(), Name: val.Name}) + } + + for _, scrt := range secrets { + if len(resource.Match(scrt, examples...)) > 0 { + return true + } + } + } + } + return false +} + +// Decode resolves default values, binds secrets, and decodes the spec, returning the final processed spec or an error. +func (s *Scheme) Decode(sp spec.Spec, secrets ...*secret.Secret) (spec.Spec, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + sp, err := s.apply(sp) + if err != nil { + return nil, err + } + + sp, err = s.bind(sp, secrets...) + if err != nil { + return nil, err + } + + return s.decode(sp) +} + +// Compile decodes the given spec into a node using the associated codec. func (s *Scheme) Compile(sp spec.Spec) (node.Node, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -123,32 +212,106 @@ func (s *Scheme) Compile(sp spec.Spec) (node.Node, error) { return codec.Compile(sp) } -// Decode converts the provided spec.Spec into a structured representation using reflection and encoding utilities. -func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { - s.mu.RLock() - defer s.mu.RUnlock() +func (s *Scheme) apply(sp spec.Spec) (spec.Spec, error) { + value, ok := s.values[sp.GetKind()] + if !ok { + return sp, nil + } + structured, ok := s.clone(value).Interface().(spec.Spec) + if !ok { + return sp, nil + } + + if doc, err := types.Marshal(sp); err != nil { + return nil, err + } else if err := types.Unmarshal(doc, structured); err != nil { + return nil, err + } + return structured, nil +} + +func (s *Scheme) bind(sp spec.Spec, secrets ...*secret.Secret) (spec.Spec, error) { doc, err := types.Marshal(sp) if err != nil { return nil, err } + unstructured := &spec.Unstructured{} + if err := types.Unmarshal(doc, unstructured); err != nil { + return nil, err + } + + env := map[string]any{} + for key, values := range unstructured.GetEnv() { + for i, val := range values { + examples := make([]*secret.Secret, 0, 2) + if val.ID != uuid.Nil { + examples = append(examples, &secret.Secret{ID: val.ID}) + } + if val.Name != "" { + examples = append(examples, &secret.Secret{Namespace: sp.GetNamespace(), Name: val.Name}) + } + + var scrt *secret.Secret + for _, s := range secrets { + if (!s.IsIdentified() && !val.IsIdentified()) || len(resource.Match(s, examples...)) > 0 { + scrt = s + break + } + } + + if scrt != nil { + v, err := template.Execute(val.Data, scrt.Data) + if err != nil { + return nil, err + } + + val.ID = scrt.GetID() + val.Name = scrt.GetName() + val.Data = v + values[i] = val + } + + if !val.IsIdentified() || scrt != nil { + env[key] = val.Data + } + } + + if _, ok := env[key]; !ok { + return nil, errors.WithStack(encoding.ErrUnsupportedValue) + } + } + + if len(env) > 0 { + if fields, err := template.Execute(unstructured.Fields, env); err != nil { + return nil, err + } else { + unstructured.Fields = fields.(map[string]any) + } + } + return unstructured, nil +} + +func (s *Scheme) decode(sp spec.Spec) (spec.Spec, error) { typ, ok := s.types[sp.GetKind()] if !ok { return sp, nil } - val := reflect.New(typ).Elem() - if val.Kind() == reflect.Pointer { - val.Set(reflect.New(typ.Elem())) + value := reflect.New(typ).Elem() + if value.Kind() == reflect.Pointer { + value.Set(reflect.New(typ.Elem())) } - structured, ok := val.Interface().(spec.Spec) + structured, ok := value.Interface().(spec.Spec) if !ok { return sp, nil } - if err := types.Unmarshal(doc, structured); err != nil { + if doc, err := types.Marshal(sp); err != nil { + return nil, err + } else if err := types.Unmarshal(doc, structured); err != nil { return nil, err } @@ -157,3 +320,53 @@ func (s *Scheme) Decode(sp spec.Spec) (spec.Spec, error) { } return structured, nil } + +func (s *Scheme) clone(value reflect.Value) reflect.Value { + if !value.IsValid() { + return reflect.Zero(value.Type()) + } + + switch value.Kind() { + case reflect.Pointer: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + ptr := reflect.New(value.Type().Elem()) + ptr.Elem().Set(s.clone(value.Elem())) + return ptr + case reflect.Struct: + clone := reflect.New(value.Type()).Elem() + for i := 0; i < value.NumField(); i++ { + if value.Field(i).CanSet() { + clone.Field(i).Set(s.clone(value.Field(i))) + } + } + return clone + case reflect.Slice: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + clone := reflect.MakeSlice(value.Type(), value.Len(), value.Cap()) + for i := 0; i < value.Len(); i++ { + clone.Index(i).Set(s.clone(value.Index(i))) + } + return clone + case reflect.Map: + if value.IsNil() { + return reflect.Zero(value.Type()) + } + clone := reflect.MakeMapWithSize(value.Type(), value.Len()) + for _, key := range value.MapKeys() { + clone.SetMapIndex(key, s.clone(value.MapIndex(key))) + } + return clone + case reflect.Array: + clone := reflect.New(value.Type()).Elem() + for i := 0; i < value.Len(); i++ { + clone.Index(i).Set(s.clone(value.Index(i))) + } + return clone + default: + return reflect.ValueOf(value.Interface()) + } +} diff --git a/pkg/scheme/scheme_test.go b/pkg/scheme/scheme_test.go index edc4460e..0b27da54 100644 --- a/pkg/scheme/scheme_test.go +++ b/pkg/scheme/scheme_test.go @@ -1,6 +1,7 @@ package scheme import ( + "github.com/siyul-park/uniflow/pkg/secret" "testing" "github.com/go-faker/faker/v4" @@ -44,6 +45,29 @@ func TestScheme_KnownType(t *testing.T) { assert.False(t, ok) } +func TestScheme_KnownValue(t *testing.T) { + s := New() + kind := faker.UUIDHyphenated() + + meta := &spec.Meta{ + Kind: kind, + ID: uuid.Must(uuid.NewV7()), + } + ok := s.AddKnownValue(kind, meta) + assert.True(t, ok) + + ok = s.AddKnownValue(kind, meta) + assert.False(t, ok) + assert.NotNil(t, s.KnownValue(kind)) + + ok = s.RemoveKnownValue(kind) + assert.True(t, ok) + assert.Nil(t, s.KnownValue(kind)) + + ok = s.RemoveKnownValue(kind) + assert.False(t, ok) +} + func TestScheme_Codec(t *testing.T) { s := New() kind := faker.UUIDHyphenated() @@ -67,12 +91,43 @@ func TestScheme_Codec(t *testing.T) { assert.False(t, ok) } +func TestScheme_IsBound(t *testing.T) { + s := New() + + sec1 := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + } + sec2 := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + } + + meta := &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.UUIDHyphenated(), + Env: map[string][]spec.Value{ + "FOO": { + { + ID: sec1.ID, + Data: "foo", + }, + }, + }, + } + + assert.True(t, s.IsBound(meta, sec1)) + assert.False(t, s.IsBound(meta, sec2)) +} + func TestScheme_Decode(t *testing.T) { s := New() kind := faker.UUIDHyphenated() s.AddKnownType(kind, &spec.Meta{}) + s.AddKnownValue(kind, &spec.Meta{}) + sec := &secret.Secret{ + ID: uuid.Must(uuid.NewV7()), + } meta := &spec.Unstructured{ Meta: spec.Meta{ ID: uuid.Must(uuid.NewV7()), @@ -80,6 +135,7 @@ func TestScheme_Decode(t *testing.T) { Env: map[string][]spec.Value{ "FOO": { { + ID: sec.ID, Data: "foo", }, }, @@ -90,7 +146,7 @@ func TestScheme_Decode(t *testing.T) { }, } - structured, err := s.Decode(meta) + structured, err := s.Decode(meta, sec) assert.NoError(t, err) assert.Equal(t, meta.GetID(), structured.GetID()) assert.IsType(t, &spec.Meta{}, structured) diff --git a/pkg/spec/spec.go b/pkg/spec/spec.go index ae38591a..98d5fae9 100644 --- a/pkg/spec/spec.go +++ b/pkg/spec/spec.go @@ -2,12 +2,7 @@ package spec import ( "github.com/gofrs/uuid" - "github.com/pkg/errors" - "github.com/siyul-park/uniflow/pkg/encoding" "github.com/siyul-park/uniflow/pkg/resource" - "github.com/siyul-park/uniflow/pkg/secret" - "github.com/siyul-park/uniflow/pkg/template" - "github.com/siyul-park/uniflow/pkg/types" ) // Spec defines the behavior and connections of each node. @@ -88,90 +83,6 @@ func New() Spec { return &Meta{} } -// IsBound checks if the spec is bound to any of the provided secrets. -func IsBound(sp Spec, secrets ...*secret.Secret) bool { - for _, vals := range sp.GetEnv() { - for _, val := range vals { - examples := make([]*secret.Secret, 0, 2) - if val.ID != uuid.Nil { - examples = append(examples, &secret.Secret{ID: val.ID}) - } - if val.Name != "" { - examples = append(examples, &secret.Secret{Namespace: sp.GetNamespace(), Name: val.Name}) - } - - for _, scrt := range secrets { - if len(resource.Match(scrt, examples...)) > 0 { - return true - } - } - } - } - return false -} - -// Bind processes the environment variables in the spec using the provided secrets. -func Bind(sp Spec, secrets ...*secret.Secret) (Spec, error) { - doc, err := types.Marshal(sp) - if err != nil { - return nil, err - } - - unstructured := &Unstructured{} - if err := types.Unmarshal(doc, unstructured); err != nil { - return nil, err - } - - env := map[string]any{} - for key, vals := range unstructured.GetEnv() { - for i, val := range vals { - example := &secret.Secret{ - ID: val.ID, - Namespace: unstructured.GetNamespace(), - Name: val.Name, - } - - var scrt *secret.Secret - for _, s := range secrets { - if (!s.IsIdentified() && !val.IsIdentified()) || len(resource.Match(s, example)) > 0 { - scrt = s - break - } - } - - if scrt != nil { - v, err := template.Execute(val.Data, scrt.Data) - if err != nil { - return nil, err - } - - val.ID = scrt.GetID() - val.Name = scrt.GetName() - val.Data = v - vals[i] = val - } - - if !val.IsIdentified() || scrt != nil { - env[key] = val.Data - } - } - - if _, ok := env[key]; !ok { - return nil, errors.WithStack(encoding.ErrUnsupportedValue) - } - } - - if len(env) > 0 { - fields, err := template.Execute(unstructured.Fields, env) - if err != nil { - return nil, err - } - unstructured.Fields = fields.(map[string]any) - } - - return unstructured, nil -} - // GetID returns the node's unique identifier. func (m *Meta) GetID() uuid.UUID { return m.ID diff --git a/pkg/spec/spec_test.go b/pkg/spec/spec_test.go index 9f668b04..d71111c4 100644 --- a/pkg/spec/spec_test.go +++ b/pkg/spec/spec_test.go @@ -5,60 +5,9 @@ import ( "github.com/go-faker/faker/v4" "github.com/gofrs/uuid" - "github.com/siyul-park/uniflow/pkg/secret" "github.com/stretchr/testify/assert" ) -func TestIsBound(t *testing.T) { - sec1 := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - } - sec2 := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - } - - meta := &Meta{ - ID: uuid.Must(uuid.NewV7()), - Kind: faker.UUIDHyphenated(), - Env: map[string][]Value{ - "FOO": { - { - ID: sec1.ID, - Data: "foo", - }, - }, - }, - } - - assert.True(t, IsBound(meta, sec1)) - assert.False(t, IsBound(meta, sec2)) -} - -func TestBind(t *testing.T) { - scrt := &secret.Secret{ - ID: uuid.Must(uuid.NewV7()), - Data: "foo", - } - - meta := &Meta{ - ID: uuid.Must(uuid.NewV7()), - Kind: faker.UUIDHyphenated(), - Env: map[string][]Value{ - "FOO": { - { - ID: scrt.ID, - Data: "{{ . }}", - }, - }, - }, - } - - bind, err := Bind(meta, scrt) - assert.NoError(t, err) - assert.Equal(t, "foo", bind.GetEnv()["FOO"][0].Data) - assert.True(t, IsBound(bind, scrt)) -} - func TestMeta_Get(t *testing.T) { meta := &Meta{ ID: uuid.Must(uuid.NewV7()), diff --git a/pkg/symbol/loader.go b/pkg/symbol/loader.go index e57c8472..cc73624e 100644 --- a/pkg/symbol/loader.go +++ b/pkg/symbol/loader.go @@ -80,12 +80,10 @@ func (l *Loader) Load(ctx context.Context, specs ...spec.Spec) error { var symbols []*Symbol var errs []error for _, sp := range specs { - if bind, err := spec.Bind(sp, secrets...); err != nil { + if build, err := l.scheme.Decode(sp, secrets...); err != nil { errs = append(errs, err) - } else if decode, err := l.scheme.Decode(bind); err != nil { - errs = append(errs, err) - } else if decode != nil { - sp = decode + } else { + sp = build } sb := l.table.Lookup(sp.GetID())