Skip to content

Commit

Permalink
feat: support default spec
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 19, 2024
1 parent dec83fc commit 6f46ae6
Show file tree
Hide file tree
Showing 25 changed files with 442 additions and 270 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ Try a basic HTTP request handler using [ping.yaml](./examples/ping.yaml):
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions README_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ make build
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ Users can update node specifications by using a Command-Line Interface (CLI) or
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions docs/architecture_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions examples/httpproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
out:
- name: proxy
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: proxy
name: proxy
Expand Down
3 changes: 0 additions & 3 deletions examples/ping.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions examples/system.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
error:
- name: catch
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
3 changes: 0 additions & 3 deletions examples/wsproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
out:
- name: router
port: in
env:
PORT:
data: '{{ .PORT }}'

- kind: router
name: router
Expand Down
15 changes: 8 additions & 7 deletions ext/pkg/control/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// BlockNodeSpec defines the specification for creating a BlockNode.
type BlockNodeSpec struct {
spec.Meta `map:",inline"`
Specs []*spec.Unstructured `map:"specs"`
Specs []spec.Spec `map:"specs"`
}

// BlockNode systematically manages complex data processing flows and executes multiple sub-nodes sequentially.
Expand All @@ -32,25 +32,26 @@ var _ node.Node = (*BlockNode)(nil)

// NewBlockNodeCodec creates a new codec for BlockNodeSpec.
func NewBlockNodeCodec(s *scheme.Scheme) scheme.Codec {
return scheme.CodecWithType(func(spec *BlockNodeSpec) (node.Node, error) {
symbols := make([]*symbol.Symbol, 0, len(spec.Specs))
for _, sp := range spec.Specs {
decoded, err := s.Decode(sp)
return scheme.CodecWithType(func(sp *BlockNodeSpec) (node.Node, error) {
symbols := make([]*symbol.Symbol, 0, len(sp.Specs))
for _, sp := range sp.Specs {
sp, err := s.Decode(sp)
if err != nil {
for _, n := range symbols {
n.Close()
}
return nil, err
}
n, err := s.Compile(decoded)

n, err := s.Compile(sp)
if err != nil {
for _, n := range symbols {
n.Close()
}
return nil, err
}
symbols = append(symbols, &symbol.Symbol{
Spec: decoded,
Spec: sp,
Node: n,
})
}
Expand Down
4 changes: 2 additions & 2 deletions ext/pkg/control/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestBlockNodeCodec_Decode(t *testing.T) {
codec := NewBlockNodeCodec(s)

spec := &BlockNodeSpec{
Specs: []*spec.Unstructured{
{
Specs: []spec.Spec{
&spec.Unstructured{
Meta: spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: kind,
Expand Down
65 changes: 25 additions & 40 deletions ext/pkg/control/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/siyul-park/uniflow/ext/pkg/language"
"github.com/siyul-park/uniflow/pkg/hook"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/symbol"
)

Expand Down Expand Up @@ -40,47 +41,31 @@ func AddToScheme(module *language.Module, lang string) scheme.Register {
return err
}

s.AddKnownType(KindBlock, &BlockNodeSpec{})
s.AddCodec(KindBlock, NewBlockNodeCodec(s))

s.AddKnownType(KindPipe, &PipeNodeSpec{})
s.AddCodec(KindPipe, NewPipeNodeCodec())

s.AddKnownType(KindFork, &ForkNodeSpec{})
s.AddCodec(KindFork, NewForkNodeCodec())

s.AddKnownType(KindIf, &IfNodeSpec{})
s.AddCodec(KindIf, NewIfNodeCodec(expr))

s.AddKnownType(KindLoop, &LoopNodeSpec{})
s.AddCodec(KindLoop, NewLoopNodeCodec())

s.AddKnownType(KindMerge, &MergeNodeSpec{})
s.AddCodec(KindMerge, NewMergeNodeCodec())

s.AddKnownType(KindNOP, &NOPNodeSpec{})
s.AddCodec(KindNOP, NewNOPNodeCodec())

s.AddKnownType(KindReduce, &ReduceNodeSpec{})
s.AddCodec(KindReduce, NewReduceNodeCodec(expr))

s.AddKnownType(KindRetry, &RetryNodeSpec{})
s.AddCodec(KindRetry, NewRetryNodeCodec())

s.AddKnownType(KindSession, &SessionNodeSpec{})
s.AddCodec(KindSession, NewSessionNodeCodec())

s.AddKnownType(KindSnippet, &SnippetNodeSpec{})
s.AddCodec(KindSnippet, NewSnippetNodeCodec(module))

s.AddKnownType(KindSplit, &SplitNodeSpec{})
s.AddCodec(KindSplit, NewSplitNodeCodec())

s.AddKnownType(KindSwitch, &SwitchNodeSpec{})
s.AddCodec(KindSwitch, NewSwitchNodeCodec(expr))
definitions := []struct {
kind string
codec scheme.Codec
spec spec.Spec
}{
{KindBlock, NewBlockNodeCodec(s), &BlockNodeSpec{}},
{KindPipe, NewPipeNodeCodec(), &PipeNodeSpec{}},
{KindFork, NewForkNodeCodec(), &ForkNodeSpec{}},
{KindIf, NewIfNodeCodec(expr), &IfNodeSpec{}},
{KindLoop, NewLoopNodeCodec(), &LoopNodeSpec{}},
{KindMerge, NewMergeNodeCodec(), &MergeNodeSpec{}},
{KindNOP, NewNOPNodeCodec(), &NOPNodeSpec{}},
{KindReduce, NewReduceNodeCodec(expr), &ReduceNodeSpec{}},
{KindRetry, NewRetryNodeCodec(), &RetryNodeSpec{}},
{KindSession, NewSessionNodeCodec(), &SessionNodeSpec{}},
{KindSnippet, NewSnippetNodeCodec(module), &SnippetNodeSpec{}},
{KindSplit, NewSplitNodeCodec(), &SplitNodeSpec{}},
{KindSwitch, NewSwitchNodeCodec(expr), &SwitchNodeSpec{}},
{KindWait, NewWaitNodeCodec(), &WaitNodeSpec{}},
}

s.AddKnownType(KindWait, &WaitNodeSpec{})
s.AddCodec(KindWait, NewWaitNodeCodec())
for _, def := range definitions {
s.AddKnownType(def.kind, def.spec)
s.AddCodec(def.kind, def.codec)
}

return nil
})
Expand Down
26 changes: 18 additions & 8 deletions ext/pkg/io/builder.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
package io

import "github.com/siyul-park/uniflow/pkg/scheme"
import (
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
)

// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme(fs FileSystem) scheme.Register {
return scheme.RegisterFunc(func(s *scheme.Scheme) error {
s.AddKnownType(KindSQL, &SQLNodeSpec{})
s.AddCodec(KindSQL, NewSQLNodeCodec())
definitions := []struct {
kind string
codec scheme.Codec
spec spec.Spec
}{
{KindSQL, NewSQLNodeCodec(), &SQLNodeSpec{}},
{KindPrint, NewPrintNodeCodec(fs), &PrintNodeSpec{}},
{KindScan, NewScanNodeCodec(fs), &ScanNodeSpec{}},
}

s.AddKnownType(KindPrint, &PrintNodeSpec{})
s.AddCodec(KindPrint, NewPrintNodeCodec(fs))

s.AddKnownType(KindScan, &ScanNodeSpec{})
s.AddCodec(KindScan, NewScanNodeCodec(fs))
for _, def := range definitions {
s.AddKnownType(def.kind, def.spec)
s.AddCodec(def.kind, def.codec)
}

return nil
})

}
35 changes: 18 additions & 17 deletions ext/pkg/network/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"github.com/siyul-park/uniflow/pkg/hook"
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
"github.com/siyul-park/uniflow/pkg/symbol"
)

Expand Down Expand Up @@ -30,23 +31,23 @@ func AddToHook() hook.Register {
// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme() scheme.Register {
return scheme.RegisterFunc(func(s *scheme.Scheme) error {
s.AddKnownType(KindHTTP, &HTTPNodeSpec{})
s.AddCodec(KindHTTP, NewHTTPNodeCodec())

s.AddKnownType(KindListener, &ListenNodeSpec{})
s.AddCodec(KindListener, NewListenNodeCodec())

s.AddKnownType(KindProxy, &ProxyNodeSpec{})
s.AddCodec(KindProxy, NewProxyNodeCodec())

s.AddKnownType(KindRouter, &RouteNodeSpec{})
s.AddCodec(KindRouter, NewRouteNodeCodec())

s.AddKnownType(KindWebSocket, &WebSocketNodeSpec{})
s.AddCodec(KindWebSocket, NewWebSocketNodeCodec())

s.AddKnownType(KindGateway, &GatewayNodeSpec{})
s.AddCodec(KindGateway, NewGatewayNodeCodec())
definitions := []struct {
kind string
codec scheme.Codec
spec spec.Spec
}{
{KindHTTP, NewHTTPNodeCodec(), &HTTPNodeSpec{}},
{KindListener, NewListenNodeCodec(), &ListenNodeSpec{}},
{KindProxy, NewProxyNodeCodec(), &ProxyNodeSpec{}},
{KindRouter, NewRouteNodeCodec(), &RouteNodeSpec{}},
{KindWebSocket, NewWebSocketNodeCodec(), &WebSocketNodeSpec{}},
{KindGateway, NewGatewayNodeCodec(), &GatewayNodeSpec{}},
}

for _, def := range definitions {
s.AddKnownType(def.kind, def.spec)
s.AddCodec(def.kind, def.codec)
}

return nil
})
Expand Down
19 changes: 16 additions & 3 deletions ext/pkg/system/builder.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
package system

import "github.com/siyul-park/uniflow/pkg/scheme"
import (
"github.com/siyul-park/uniflow/pkg/scheme"
"github.com/siyul-park/uniflow/pkg/spec"
)

// AddToScheme returns a function that adds node types and codecs to the provided spec.
func AddToScheme(table *NativeTable) scheme.Register {
return scheme.RegisterFunc(func(s *scheme.Scheme) error {
s.AddKnownType(KindNative, &NativeNodeSpec{})
s.AddCodec(KindNative, NewNativeNodeCodec(table))
definitions := []struct {
kind string
codec scheme.Codec
spec spec.Spec
}{
{KindNative, NewNativeNodeCodec(table), &NativeNodeSpec{}},
}

for _, def := range definitions {
s.AddKnownType(def.kind, def.spec)
s.AddCodec(def.kind, def.codec)
}

return nil
})
Expand Down
11 changes: 4 additions & 7 deletions pkg/chart/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *Chart) Build(sp spec.Spec) ([]spec.Spec, error) {
env := map[string][]spec.Value{}
for key, vals := range c.Env {
for _, val := range vals {
if val.IsIdentified() {
if !val.IsIdentified() {
v, err := template.Execute(val.Value, data)
if err != nil {
return nil, err
Expand All @@ -162,14 +162,11 @@ func (c *Chart) Build(sp spec.Spec) ([]spec.Spec, error) {
return nil, err
}

unstructured.SetEnv(env)

bind, err := spec.Bind(unstructured)
if err != nil {
return nil, err
if len(env) > 0 {
unstructured.SetEnv(env)
}

specs = append(specs, bind)
specs = append(specs, unstructured)
}
return specs, nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/chart/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (l *Linker) Link(chrt *Chart) error {

symbols := make([]*symbol.Symbol, 0, len(specs))
for _, sp := range specs {
if build, err := l.scheme.Decode(sp); err != nil {
for _, sb := range symbols {
sb.Close()
}
return nil, err
} else {
sp = build
}

n, err := l.scheme.Compile(sp)
if err != nil {
for _, sb := range symbols {
Expand Down
5 changes: 3 additions & 2 deletions pkg/process/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ func (l *Local[T]) AddStoreHook(proc *Process, hook StoreHook[T]) bool {
l.mu.Lock()
defer l.mu.Unlock()

if _, ok := l.data[proc]; ok {
if val, ok := l.data[proc]; ok {
l.mu.Unlock()
defer l.mu.Lock()
hook.Store(l.data[proc])

hook.Store(val)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (r *Runtime) Reconcile(ctx context.Context) error {
var specs []spec.Spec
for _, id := range r.symbolTable.Keys() {
sb := r.symbolTable.Lookup(id)
if sb != nil && spec.IsBound(sb.Spec, secrets...) {
if sb != nil && r.scheme.IsBound(sb.Spec, secrets...) {
specs = append(specs, sb.Spec)
}
}
Expand Down
Loading

0 comments on commit 6f46ae6

Please sign in to comment.