Skip to content

Commit

Permalink
refactor: divide ports to inbound and outbound
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Dec 18, 2024
1 parent dee5558 commit 21b13be
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 54 deletions.
5 changes: 3 additions & 2 deletions docs/key_concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ specs:
name: sql
driver: sqlite3
source: file::{{ .FILENAME }}:?cache=shared
ports:
inbound:
in:
- name: sql
port: in
port: in
outbound:
out:
- name: sql
port: out
Expand Down
3 changes: 2 additions & 1 deletion docs/key_concepts_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ specs:
name: sql
driver: sqlite3
source: file::{{ .FILENAME }}:?cache=shared
ports:
inbound:
in:
- name: sql
port: in
outbound:
out:
- name: sql
port: out
Expand Down
46 changes: 25 additions & 21 deletions pkg/chart/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,14 @@ import (

// Chart defines the structure that combines multiple nodes into a cluster node.
type Chart struct {
// Unique identifier of the chart.
ID uuid.UUID `json:"id,omitempty" bson:"_id,omitempty" yaml:"id,omitempty" map:"id,omitempty"`
// Logical grouping or environment.
Namespace string `json:"namespace,omitempty" bson:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"`
// Name of the chart or cluster node.
Name string `json:"name,omitempty" bson:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"`
// Additional metadata.
Annotations map[string]string `json:"annotations,omitempty" bson:"annotations,omitempty" yaml:"annotations,omitempty" map:"annotations,omitempty"`
// Specifications that define the nodes and their configurations within the chart.
Specs []spec.Spec `json:"specs,omitempty" bson:"specs,omitempty" yaml:"specs,omitempty" map:"specs,omitempty"`
// Node connections within the chart.
Ports map[string][]Port `json:"ports,omitempty" bson:"ports,omitempty" yaml:"ports,omitempty" map:"ports,omitempty"`
// Sensitive configuration data or secrets.
Env map[string][]Value `json:"env,omitempty" bson:"env,omitempty" yaml:"env,omitempty" map:"env,omitempty"`
ID uuid.UUID `json:"id,omitempty" bson:"_id,omitempty" yaml:"id,omitempty" map:"id,omitempty"`
Namespace string `json:"namespace,omitempty" bson:"namespace,omitempty" yaml:"namespace,omitempty" map:"namespace,omitempty"`
Name string `json:"name,omitempty" bson:"name,omitempty" yaml:"name,omitempty" map:"name,omitempty"`
Annotations map[string]string `json:"annotations,omitempty" bson:"annotations,omitempty" yaml:"annotations,omitempty" map:"annotations,omitempty"`
Specs []spec.Spec `json:"specs,omitempty" bson:"specs,omitempty" yaml:"specs,omitempty" map:"specs,omitempty"`
Inbound map[string][]Port `json:"inbound,omitempty" bson:"inbound,omitempty" yaml:"inbound,omitempty" map:"inbound,omitempty"`
Outbound map[string][]Port `json:"outbound,omitempty" bson:"outbound,omitempty" yaml:"outbound,omitempty" map:"outbound,omitempty"`
Env map[string][]Value `json:"env,omitempty" bson:"env,omitempty" yaml:"env,omitempty" map:"env,omitempty"`
}

// Port represents a connection point for a node.
Expand Down Expand Up @@ -56,7 +50,7 @@ const (
KeyName = "name"
KeyAnnotations = "annotations"
KetSpecs = "specs"
KeyPorts = "ports"
KeyPorts = "inbound"
KeyEnv = "env"
)

Expand Down Expand Up @@ -216,14 +210,24 @@ func (c *Chart) SetSpecs(val []spec.Spec) {
c.Specs = val
}

// GetPorts returns the chart's ports.
func (c *Chart) GetPorts() map[string][]Port {
return c.Ports
// GetInbound returns the chart's inbound.
func (c *Chart) GetInbound() map[string][]Port {
return c.Inbound
}

// SetInbound sets the chart's inbound.
func (c *Chart) SetInbound(val map[string][]Port) {
c.Inbound = val
}

// GetOutbound returns the chart's outbound.
func (c *Chart) GetOutbound() map[string][]Port {
return c.Outbound
}

// SetPorts sets the chart's ports.
func (c *Chart) SetPorts(val map[string][]Port) {
c.Ports = val
// SetOutbound sets the chart's outbound.
func (c *Chart) SetOutbound(val map[string][]Port) {
c.Outbound = val
}

// GetEnv returns the chart's environment data.
Expand Down
13 changes: 8 additions & 5 deletions pkg/chart/chart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ func TestChart_Get(t *testing.T) {
Name: faker.UUIDHyphenated(),
},
},
Ports: map[string][]Port{"out": {{Name: faker.Word(), Port: "in"}}},
Env: map[string][]Value{"env1": {{Name: "secret1", Value: "value1"}}},
Inbound: map[string][]Port{"out": {{Name: faker.Word(), Port: "in"}}},
Env: map[string][]Value{"env1": {{Name: "secret1", Value: "value1"}}},
}

assert.Equal(t, chrt.ID, chrt.GetID())
assert.Equal(t, chrt.Namespace, chrt.GetNamespace())
assert.Equal(t, chrt.Name, chrt.GetName())
assert.Equal(t, chrt.Annotations, chrt.GetAnnotations())
assert.Equal(t, chrt.Specs, chrt.GetSpecs())
assert.Equal(t, chrt.Ports, chrt.GetPorts())
assert.Equal(t, chrt.Inbound, chrt.GetInbound())
assert.Equal(t, chrt.Env, chrt.GetEnv())
}

Expand Down Expand Up @@ -167,8 +167,11 @@ func TestChart_Set(t *testing.T) {
chrt.SetSpecs(specs)
assert.Equal(t, specs, chrt.GetSpecs())

chrt.SetPorts(ports)
assert.Equal(t, ports, chrt.GetPorts())
chrt.SetInbound(ports)
assert.Equal(t, ports, chrt.GetInbound())

chrt.SetOutbound(ports)
assert.Equal(t, ports, chrt.GetOutbound())

chrt.SetEnv(env)
assert.Equal(t, env, chrt.GetEnv())
Expand Down
65 changes: 55 additions & 10 deletions pkg/chart/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chart

import (
"github.com/gofrs/uuid"
"sync"

"github.com/siyul-park/uniflow/pkg/node"
Expand Down Expand Up @@ -33,20 +34,30 @@ func NewClusterNode(table *symbol.Table) *ClusterNode {
}

// Inbound sets up an input port and links it to the provided port.
func (n *ClusterNode) Inbound(name string, prt *port.InPort) {
func (n *ClusterNode) Inbound(source string, id uuid.UUID, target string) bool {
n.mu.Lock()
defer n.mu.Unlock()

inPort, ok1 := n.inPorts[name]
sb := n.table.Lookup(id)
if sb == nil {
return false
}

prt := sb.In(target)
if prt == nil {
return false
}

inPort, ok1 := n.inPorts[source]
if !ok1 {
inPort = port.NewIn()
n.inPorts[name] = inPort
n.inPorts[source] = inPort
}

outPort, ok2 := n._outPorts[name]
outPort, ok2 := n._outPorts[source]
if !ok2 {
outPort = port.NewOut()
n._outPorts[name] = outPort
n._outPorts[source] = outPort
}

if !ok1 {
Expand All @@ -57,23 +68,34 @@ func (n *ClusterNode) Inbound(name string, prt *port.InPort) {
}

outPort.Link(prt)
return true
}

// Outbound sets up an output port and links it to the provided port.
func (n *ClusterNode) Outbound(name string, prt *port.OutPort) {
func (n *ClusterNode) Outbound(source string, id uuid.UUID, target string) bool {
n.mu.Lock()
defer n.mu.Unlock()

inPort, ok1 := n._inPorts[name]
sb := n.table.Lookup(id)
if sb == nil {
return false
}

prt := sb.Out(target)
if prt == nil {
return false
}

inPort, ok1 := n._inPorts[source]
if !ok1 {
inPort = port.NewIn()
n._inPorts[name] = inPort
n._inPorts[source] = inPort
}

outPort, ok2 := n.outPorts[name]
outPort, ok2 := n.outPorts[source]
if !ok2 {
outPort = port.NewOut()
n.outPorts[name] = outPort
n.outPorts[source] = outPort
}

if !ok1 {
Expand All @@ -84,6 +106,29 @@ func (n *ClusterNode) Outbound(name string, prt *port.OutPort) {
}

prt.Link(inPort)
return true
}

// Symbols retrieves all symbols from the table.
func (n *ClusterNode) Symbols() []*symbol.Symbol {
n.mu.RLock()
defer n.mu.RUnlock()

var symbols []*symbol.Symbol
for _, key := range n.table.Keys() {
if sym := n.table.Lookup(key); sym != nil {
symbols = append(symbols, sym)
}
}
return symbols
}

// Symbol retrieves a specific symbol by UUID.
func (n *ClusterNode) Symbol(id uuid.UUID) *symbol.Symbol {
n.mu.RLock()
defer n.mu.RUnlock()

return n.table.Lookup(id)
}

// In returns the input port by name.
Expand Down
48 changes: 43 additions & 5 deletions pkg/chart/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestClusterNode_Inbound(t *testing.T) {
n := NewClusterNode(tb)
defer n.Close()

n.Inbound(node.PortIn, sb.In(node.PortIn))
n.Inbound(node.PortIn, sb.ID(), node.PortIn)
assert.NotNil(t, n.In(node.PortIn))
}

Expand All @@ -57,11 +57,49 @@ func TestClusterNode_Outbound(t *testing.T) {
n := NewClusterNode(tb)
defer n.Close()

n.Outbound(node.PortOut, sb.Out(node.PortOut))
n.Outbound(node.PortOut, sb.ID(), node.PortOut)
assert.NotNil(t, n.Out(node.PortOut))
}

func NewClusterNode_SendAndReceive(t *testing.T) {
func TestClusterNode_Symbols(t *testing.T) {
tb := symbol.NewTable()

sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
defer n.Close()

symbols := n.Symbols()
assert.Len(t, symbols, 1)
assert.Equal(t, sb, symbols[0])
}

func TestClusterNode_Symbol(t *testing.T) {
tb := symbol.NewTable()

sb := &symbol.Symbol{
Spec: &spec.Meta{
ID: uuid.Must(uuid.NewV7()),
Kind: faker.Word(),
},
Node: node.NewOneToOneNode(nil),
}
tb.Insert(sb)

n := NewClusterNode(tb)
defer n.Close()

assert.Equal(t, sb, n.Symbol(sb.ID()))
}

func TestClusterNode_SendAndReceive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

Expand All @@ -81,8 +119,8 @@ func NewClusterNode_SendAndReceive(t *testing.T) {
n := NewClusterNode(tb)
defer n.Close()

n.Inbound(node.PortIn, sb.In(node.PortIn))
n.Outbound(node.PortOut, sb.Out(node.PortOut))
n.Inbound(node.PortIn, sb.ID(), node.PortIn)
n.Outbound(node.PortOut, sb.ID(), node.PortOut)

in := port.NewOut()
in.Link(n.In(node.PortIn))
Expand Down
25 changes: 16 additions & 9 deletions pkg/chart/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,36 @@ func (l *Linker) Link(chrt *Chart) error {

for _, sb := range symbols {
if err := table.Insert(sb); err != nil {
table.Close()
_ = table.Close()
for _, sb := range symbols {
sb.Close()
_ = sb.Close()
}
return nil, err
}
}

n := NewClusterNode(table)
for name, ports := range chrt.GetPorts() {

for name, ports := range chrt.GetInbound() {
for _, port := range ports {
for _, sb := range symbols {
if port.Name == "" || sb.ID() == port.ID || sb.Name() == port.Name {
if in := sb.In(port.Port); in != nil {
n.Inbound(name, in)
}
if out := sb.Out(port.Port); out != nil {
n.Outbound(name, out)
}
n.Inbound(name, sb.ID(), port.Port)
}
}
}
}

for name, ports := range chrt.GetOutbound() {
for _, port := range ports {
for _, sb := range symbols {
if port.Name == "" || sb.ID() == port.ID || sb.Name() == port.Name {
n.Outbound(name, sb.ID(), port.Port)
}
}
}
}

return n, nil
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/chart/linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestLinker_Load(t *testing.T) {
},
},
},
Ports: map[string][]Port{
Inbound: map[string][]Port{
node.PortIn: {
{
Name: "dummy",
Expand Down

0 comments on commit 21b13be

Please sign in to comment.