diff --git a/docs/key_concepts.md b/docs/key_concepts.md index 23cb7cf3..eb0ccf70 100644 --- a/docs/key_concepts.md +++ b/docs/key_concepts.md @@ -71,10 +71,11 @@ specs: name: sql driver: sqlite3 source: file::{{ .FILENAME }}:?cache=shared -ports: +inbound: in: - name: sql - port: in + port: in +outbound: out: - name: sql port: out diff --git a/docs/key_concepts_kr.md b/docs/key_concepts_kr.md index 8001f0be..ba4d0a6d 100644 --- a/docs/key_concepts_kr.md +++ b/docs/key_concepts_kr.md @@ -71,10 +71,11 @@ specs: name: sql driver: sqlite3 source: file::{{ .FILENAME }}:?cache=shared -ports: +inbound: in: - name: sql port: in +outbound: out: - name: sql port: out diff --git a/pkg/chart/chart.go b/pkg/chart/chart.go index 597dca9a..e0daa6ca 100644 --- a/pkg/chart/chart.go +++ b/pkg/chart/chart.go @@ -13,20 +13,14 @@ import ( // Chart defines the structure that combines multiple nodes into a cluster node. type Chart struct { - // Unique identifier of the chart. - ID uuid.UUID `json:"id,omitempty" bson:"_id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` - // Logical grouping or environment. - Namespace string `json:"namespace,omitempty" bson:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"` - // Name of the chart or cluster node. - Name string `json:"name,omitempty" bson:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` - // Additional metadata. - Annotations map[string]string `json:"annotations,omitempty" bson:"annotations,omitempty" yaml:"annotations,omitempty" map:"annotations,omitempty"` - // Specifications that define the nodes and their configurations within the chart. - Specs []spec.Spec `json:"specs,omitempty" bson:"specs,omitempty" yaml:"specs,omitempty" map:"specs,omitempty"` - // Node connections within the chart. - Ports map[string][]Port `json:"ports,omitempty" bson:"ports,omitempty" yaml:"ports,omitempty" map:"ports,omitempty"` - // Sensitive configuration data or secrets. - Env map[string][]Value `json:"env,omitempty" bson:"env,omitempty" yaml:"env,omitempty" map:"env,omitempty"` + ID uuid.UUID `json:"id,omitempty" bson:"_id,omitempty" yaml:"id,omitempty" map:"id,omitempty"` + Namespace string `json:"namespace,omitempty" bson:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"` + Name string `json:"name,omitempty" bson:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"` + Annotations map[string]string `json:"annotations,omitempty" bson:"annotations,omitempty" yaml:"annotations,omitempty" map:"annotations,omitempty"` + Specs []spec.Spec `json:"specs,omitempty" bson:"specs,omitempty" yaml:"specs,omitempty" map:"specs,omitempty"` + Inbound map[string][]Port `json:"inbound,omitempty" bson:"inbound,omitempty" yaml:"inbound,omitempty" map:"inbound,omitempty"` + Outbound map[string][]Port `json:"outbound,omitempty" bson:"outbound,omitempty" yaml:"outbound,omitempty" map:"outbound,omitempty"` + Env map[string][]Value `json:"env,omitempty" bson:"env,omitempty" yaml:"env,omitempty" map:"env,omitempty"` } // Port represents a connection point for a node. @@ -56,7 +50,7 @@ const ( KeyName = "name" KeyAnnotations = "annotations" KetSpecs = "specs" - KeyPorts = "ports" + KeyPorts = "inbound" KeyEnv = "env" ) @@ -216,14 +210,24 @@ func (c *Chart) SetSpecs(val []spec.Spec) { c.Specs = val } -// GetPorts returns the chart's ports. -func (c *Chart) GetPorts() map[string][]Port { - return c.Ports +// GetInbound returns the chart's inbound. +func (c *Chart) GetInbound() map[string][]Port { + return c.Inbound +} + +// SetInbound sets the chart's inbound. +func (c *Chart) SetInbound(val map[string][]Port) { + c.Inbound = val +} + +// GetOutbound returns the chart's outbound. +func (c *Chart) GetOutbound() map[string][]Port { + return c.Outbound } -// SetPorts sets the chart's ports. -func (c *Chart) SetPorts(val map[string][]Port) { - c.Ports = val +// SetOutbound sets the chart's outbound. +func (c *Chart) SetOutbound(val map[string][]Port) { + c.Outbound = val } // GetEnv returns the chart's environment data. diff --git a/pkg/chart/chart_test.go b/pkg/chart/chart_test.go index 03a13638..730118a5 100644 --- a/pkg/chart/chart_test.go +++ b/pkg/chart/chart_test.go @@ -106,8 +106,8 @@ func TestChart_Get(t *testing.T) { Name: faker.UUIDHyphenated(), }, }, - Ports: map[string][]Port{"out": {{Name: faker.Word(), Port: "in"}}}, - Env: map[string][]Value{"env1": {{Name: "secret1", Value: "value1"}}}, + Inbound: map[string][]Port{"out": {{Name: faker.Word(), Port: "in"}}}, + Env: map[string][]Value{"env1": {{Name: "secret1", Value: "value1"}}}, } assert.Equal(t, chrt.ID, chrt.GetID()) @@ -115,7 +115,7 @@ func TestChart_Get(t *testing.T) { assert.Equal(t, chrt.Name, chrt.GetName()) assert.Equal(t, chrt.Annotations, chrt.GetAnnotations()) assert.Equal(t, chrt.Specs, chrt.GetSpecs()) - assert.Equal(t, chrt.Ports, chrt.GetPorts()) + assert.Equal(t, chrt.Inbound, chrt.GetInbound()) assert.Equal(t, chrt.Env, chrt.GetEnv()) } @@ -167,8 +167,11 @@ func TestChart_Set(t *testing.T) { chrt.SetSpecs(specs) assert.Equal(t, specs, chrt.GetSpecs()) - chrt.SetPorts(ports) - assert.Equal(t, ports, chrt.GetPorts()) + chrt.SetInbound(ports) + assert.Equal(t, ports, chrt.GetInbound()) + + chrt.SetOutbound(ports) + assert.Equal(t, ports, chrt.GetOutbound()) chrt.SetEnv(env) assert.Equal(t, env, chrt.GetEnv()) diff --git a/pkg/chart/cluster.go b/pkg/chart/cluster.go index e5cf7c4b..b33d214e 100644 --- a/pkg/chart/cluster.go +++ b/pkg/chart/cluster.go @@ -1,6 +1,7 @@ package chart import ( + "github.com/gofrs/uuid" "sync" "github.com/siyul-park/uniflow/pkg/node" @@ -33,20 +34,30 @@ func NewClusterNode(table *symbol.Table) *ClusterNode { } // Inbound sets up an input port and links it to the provided port. -func (n *ClusterNode) Inbound(name string, prt *port.InPort) { +func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool { n.mu.Lock() defer n.mu.Unlock() - inPort, ok1 := n.inPorts[name] + sb := n.table.Lookup(id) + if sb == nil { + return false + } + + prt := sb.In(target) + if prt == nil { + return false + } + + inPort, ok1 := n.inPorts[source] if !ok1 { inPort = port.NewIn() - n.inPorts[name] = inPort + n.inPorts[source] = inPort } - outPort, ok2 := n._outPorts[name] + outPort, ok2 := n._outPorts[source] if !ok2 { outPort = port.NewOut() - n._outPorts[name] = outPort + n._outPorts[source] = outPort } if !ok1 { @@ -57,23 +68,34 @@ func (n *ClusterNode) Inbound(name string, prt *port.InPort) { } outPort.Link(prt) + return true } // Outbound sets up an output port and links it to the provided port. -func (n *ClusterNode) Outbound(name string, prt *port.OutPort) { +func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool { n.mu.Lock() defer n.mu.Unlock() - inPort, ok1 := n._inPorts[name] + sb := n.table.Lookup(id) + if sb == nil { + return false + } + + prt := sb.Out(target) + if prt == nil { + return false + } + + inPort, ok1 := n._inPorts[source] if !ok1 { inPort = port.NewIn() - n._inPorts[name] = inPort + n._inPorts[source] = inPort } - outPort, ok2 := n.outPorts[name] + outPort, ok2 := n.outPorts[source] if !ok2 { outPort = port.NewOut() - n.outPorts[name] = outPort + n.outPorts[source] = outPort } if !ok1 { @@ -84,6 +106,29 @@ func (n *ClusterNode) Outbound(name string, prt *port.OutPort) { } prt.Link(inPort) + return true +} + +// Symbols retrieves all symbols from the table. +func (n *ClusterNode) Symbols() []*symbol.Symbol { + n.mu.RLock() + defer n.mu.RUnlock() + + var symbols []*symbol.Symbol + for _, key := range n.table.Keys() { + if sym := n.table.Lookup(key); sym != nil { + symbols = append(symbols, sym) + } + } + return symbols +} + +// Symbol retrieves a specific symbol by UUID. +func (n *ClusterNode) Symbol(id uuid.UUID) *symbol.Symbol { + n.mu.RLock() + defer n.mu.RUnlock() + + return n.table.Lookup(id) } // In returns the input port by name. diff --git a/pkg/chart/cluster_test.go b/pkg/chart/cluster_test.go index cad7c3fa..267f80f1 100644 --- a/pkg/chart/cluster_test.go +++ b/pkg/chart/cluster_test.go @@ -38,7 +38,7 @@ func TestClusterNode_Inbound(t *testing.T) { n := NewClusterNode(tb) defer n.Close() - n.Inbound(node.PortIn, sb.In(node.PortIn)) + n.Inbound(node.PortIn, sb.ID(), node.PortIn) assert.NotNil(t, n.In(node.PortIn)) } @@ -57,11 +57,49 @@ func TestClusterNode_Outbound(t *testing.T) { n := NewClusterNode(tb) defer n.Close() - n.Outbound(node.PortOut, sb.Out(node.PortOut)) + n.Outbound(node.PortOut, sb.ID(), node.PortOut) assert.NotNil(t, n.Out(node.PortOut)) } -func NewClusterNode_SendAndReceive(t *testing.T) { +func TestClusterNode_Symbols(t *testing.T) { + tb := symbol.NewTable() + + sb := &symbol.Symbol{ + Spec: &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.Word(), + }, + Node: node.NewOneToOneNode(nil), + } + tb.Insert(sb) + + n := NewClusterNode(tb) + defer n.Close() + + symbols := n.Symbols() + assert.Len(t, symbols, 1) + assert.Equal(t, sb, symbols[0]) +} + +func TestClusterNode_Symbol(t *testing.T) { + tb := symbol.NewTable() + + sb := &symbol.Symbol{ + Spec: &spec.Meta{ + ID: uuid.Must(uuid.NewV7()), + Kind: faker.Word(), + }, + Node: node.NewOneToOneNode(nil), + } + tb.Insert(sb) + + n := NewClusterNode(tb) + defer n.Close() + + assert.Equal(t, sb, n.Symbol(sb.ID())) +} + +func TestClusterNode_SendAndReceive(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() @@ -81,8 +119,8 @@ func NewClusterNode_SendAndReceive(t *testing.T) { n := NewClusterNode(tb) defer n.Close() - n.Inbound(node.PortIn, sb.In(node.PortIn)) - n.Outbound(node.PortOut, sb.Out(node.PortOut)) + n.Inbound(node.PortIn, sb.ID(), node.PortIn) + n.Outbound(node.PortOut, sb.ID(), node.PortOut) in := port.NewOut() in.Link(n.In(node.PortIn)) diff --git a/pkg/chart/linker.go b/pkg/chart/linker.go index 300adfe6..96a70ae5 100644 --- a/pkg/chart/linker.go +++ b/pkg/chart/linker.go @@ -99,29 +99,36 @@ func (l *Linker) Link(chrt *Chart) error { for _, sb := range symbols { if err := table.Insert(sb); err != nil { - table.Close() + _ = table.Close() for _, sb := range symbols { - sb.Close() + _ = sb.Close() } return nil, err } } n := NewClusterNode(table) - for name, ports := range chrt.GetPorts() { + + for name, ports := range chrt.GetInbound() { for _, port := range ports { for _, sb := range symbols { if port.Name == "" || sb.ID() == port.ID || sb.Name() == port.Name { - if in := sb.In(port.Port); in != nil { - n.Inbound(name, in) - } - if out := sb.Out(port.Port); out != nil { - n.Outbound(name, out) - } + n.Inbound(name, sb.ID(), port.Port) } } } } + + for name, ports := range chrt.GetOutbound() { + for _, port := range ports { + for _, sb := range symbols { + if port.Name == "" || sb.ID() == port.ID || sb.Name() == port.Name { + n.Outbound(name, sb.ID(), port.Port) + } + } + } + } + return n, nil }) diff --git a/pkg/chart/linker_test.go b/pkg/chart/linker_test.go index 5d2ac1a2..e00dc4fd 100644 --- a/pkg/chart/linker_test.go +++ b/pkg/chart/linker_test.go @@ -50,7 +50,7 @@ func TestLinker_Load(t *testing.T) { }, }, }, - Ports: map[string][]Port{ + Inbound: map[string][]Port{ node.PortIn: { { Name: "dummy",