Skip to content

Commit

Permalink
include an array of available work commands in service announcements
Browse files Browse the repository at this point in the history
related: #21
  • Loading branch information
ryanpetrello committed Feb 10, 2021
1 parent 03f02ea commit efe8162
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 19 deletions.
55 changes: 39 additions & 16 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"
)

// defaultMTU is the largest message sendable over the Netecptor network
// defaultMTU is the largest message sendable over the Netceptor network
const defaultMTU = 16384

// defaultRouteUpdateTime is the interval at which regular route updates will be sent
Expand Down Expand Up @@ -88,6 +88,7 @@ type Netceptor struct {
maxForwardingHops byte
maxConnectionIdleTime time.Duration
allowedPeers []string
workCommands []string
epoch uint64
sequence uint64
connLock *sync.RWMutex
Expand Down Expand Up @@ -199,11 +200,12 @@ const (

// ServiceAdvertisement is the data associated with a service advertisement
type ServiceAdvertisement struct {
NodeID string
Service string
Time time.Time
ConnType byte
Tags map[string]string
NodeID string
Service string
Time time.Time
ConnType byte
Tags map[string]string
WorkCommands []string
}

// serviceAdvertisementFull is the whole message from the network
Expand Down Expand Up @@ -497,11 +499,12 @@ func (s *Netceptor) addLocalServiceAdvertisement(service string, connType byte,
s.serviceAdsReceived[s.nodeID] = n
}
n[service] = &ServiceAdvertisement{
NodeID: s.nodeID,
Service: service,
Time: time.Now(),
ConnType: connType,
Tags: tags,
NodeID: s.nodeID,
Service: service,
Time: time.Now(),
ConnType: connType,
Tags: tags,
WorkCommands: s.workCommands,
}
s.sendServiceAdsChan <- 0
}
Expand Down Expand Up @@ -555,11 +558,12 @@ func (s *Netceptor) sendServiceAds() {
for sn := range s.listenerRegistry {
if s.listenerRegistry[sn].advertise {
sa := ServiceAdvertisement{
NodeID: s.nodeID,
Service: sn,
Time: time.Now(),
ConnType: s.listenerRegistry[sn].connType,
Tags: s.listenerRegistry[sn].adTags,
NodeID: s.nodeID,
Service: sn,
Time: time.Now(),
ConnType: s.listenerRegistry[sn].connType,
Tags: s.listenerRegistry[sn].adTags,
WorkCommands: s.workCommands,
}
ads = append(ads, sa)
}
Expand Down Expand Up @@ -730,6 +734,25 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) {
return sc.Clone(), nil
}

// AddWorkCommand records a work command so it can be included in service announcements
func (s *Netceptor) AddWorkCommand(command string) error {
if command == "" {
return fmt.Errorf("must provide a name")
}
s.serviceAdsLock.Lock()
defer s.serviceAdsLock.Unlock()
if n, ok := s.serviceAdsReceived[s.NodeID()]; ok {
// if it's the local node, just update the local service advertisement
// structs directly
for _, ad := range n {
ad.WorkCommands = append(ad.WorkCommands, command)
}
} else {
s.workCommands = append(s.workCommands, command)
}
return nil
}

// SetServerTLSConfig stores a server TLS config by name
func (s *Netceptor) SetServerTLSConfig(name string, config *tls.Config) error {
if name == "" {
Expand Down
3 changes: 3 additions & 0 deletions pkg/workceptor/workceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc
w.workTypes[typeName] = &workType{
newWorkerFunc: newWorkerFunc,
}
if typeName != "remote" { // all workceptors get a remote command by default
w.nc.AddWorkCommand(typeName)
}
w.workTypesLock.Unlock()

// Check if any unknown units have now become known
Expand Down
14 changes: 11 additions & 3 deletions receptorctl/receptorctl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def status(ctx):
ads = status.pop('Advertisements', None)
if ads:
print()
print(f"{'Node':<{longest_node}} Service Type Last Seen Tags")
print(f"{'Node':<{longest_node}} Service Type Last Seen Tags Work Types")
for ad in ads:
time = dateutil.parser.parse(ad['Time'])
if ad['ConnType'] == 0:
Expand All @@ -113,8 +113,16 @@ def status(ctx):
conn_type = 'Stream'
elif ad['ConnType'] == 2:
conn_type = 'StreamTLS'
print(f"{ad['NodeID']:<{longest_node}} {ad['Service']:<9} {conn_type:<10} {time:%Y-%m-%d %H:%M:%S} ", end="")
pprint(ad['Tags'])
print(
f"{ad['NodeID']:<{longest_node}} {ad['Service']:<9} {conn_type:<10} {time:%Y-%m-%d %H:%M:%S} {(ad['Tags'] or '-'):<16}",
end=""
)
commands = ad['WorkCommands']
if commands:
commands = ', '.join(commands)
else:
commands = '-'
print(commands)

if status:
print("Additional data returned from Receptor:")
Expand Down
36 changes: 36 additions & 0 deletions tests/functional/lib/mesh/climesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,39 @@ func (m *CLIMesh) CheckConnections() bool {
return false
}

// CheckAdvertisements returns true if the advertisements are recorded in
// a manner consistent with the work-commands defined for the mesh
func (m *CLIMesh) CheckAdvertisements() bool {
statusList, err := m.Status()
if err != nil {
return false
}
for _, status := range statusList {
actual := map[string][]string{}
for _, ad := range status.Advertisements {
if len(ad.WorkCommands) > 0 {
actual[ad.NodeID] = ad.WorkCommands
}
}
expected := map[string][]string{}
for node := range m.MeshDefinition.Nodes {
for _, attr := range m.MeshDefinition.Nodes[node].Nodedef {
attrMap := attr.(map[interface{}]interface{})
for _, cmd := range []string{"work-command", "work-kubernetes", "work-python"} {
if v, ok := attrMap[cmd]; ok {
v, _ := v.(map[interface{}]interface{})
expected[node] = append(expected[node], v["workType"].(string))
}
}
}
}
if reflect.DeepEqual(actual, expected) {
return true
}
}
return false
}

// CheckKnownConnectionCosts returns true if every node has the same view of the connections in the mesh
func (m *CLIMesh) CheckKnownConnectionCosts() bool {
meshStatus, err := m.Status()
Expand Down Expand Up @@ -536,6 +569,9 @@ func (m *CLIMesh) WaitForReady(ctx context.Context) error {
if !utils.CheckUntilTimeout(ctx, sleepInterval, m.CheckRoutes) {
return errors.New("Timed out while waiting for routes to converge")
}
if !utils.CheckUntilTimeout(ctx, sleepInterval, m.CheckAdvertisements) {
return errors.New("Timed out while waiting for Advertisements")
}
return nil
}

Expand Down

0 comments on commit efe8162

Please sign in to comment.