diff --git a/pkg/netceptor/netceptor.go b/pkg/netceptor/netceptor.go index d666d434a..9d3dffe7b 100644 --- a/pkg/netceptor/netceptor.go +++ b/pkg/netceptor/netceptor.go @@ -23,7 +23,7 @@ import ( "time" ) -// defaultMTU is the largest message sendable over the Netecptor network +// defaultMTU is the largest message sendable over the Netceptor network const defaultMTU = 16384 // defaultRouteUpdateTime is the interval at which regular route updates will be sent @@ -88,6 +88,7 @@ type Netceptor struct { maxForwardingHops byte maxConnectionIdleTime time.Duration allowedPeers []string + workCommands []string epoch uint64 sequence uint64 connLock *sync.RWMutex @@ -199,11 +200,12 @@ const ( // ServiceAdvertisement is the data associated with a service advertisement type ServiceAdvertisement struct { - NodeID string - Service string - Time time.Time - ConnType byte - Tags map[string]string + NodeID string + Service string + Time time.Time + ConnType byte + Tags map[string]string + WorkCommands []string } // serviceAdvertisementFull is the whole message from the network @@ -497,11 +499,12 @@ func (s *Netceptor) addLocalServiceAdvertisement(service string, connType byte, s.serviceAdsReceived[s.nodeID] = n } n[service] = &ServiceAdvertisement{ - NodeID: s.nodeID, - Service: service, - Time: time.Now(), - ConnType: connType, - Tags: tags, + NodeID: s.nodeID, + Service: service, + Time: time.Now(), + ConnType: connType, + Tags: tags, + WorkCommands: s.workCommands, } s.sendServiceAdsChan <- 0 } @@ -555,11 +558,12 @@ func (s *Netceptor) sendServiceAds() { for sn := range s.listenerRegistry { if s.listenerRegistry[sn].advertise { sa := ServiceAdvertisement{ - NodeID: s.nodeID, - Service: sn, - Time: time.Now(), - ConnType: s.listenerRegistry[sn].connType, - Tags: s.listenerRegistry[sn].adTags, + NodeID: s.nodeID, + Service: sn, + Time: time.Now(), + ConnType: s.listenerRegistry[sn].connType, + Tags: s.listenerRegistry[sn].adTags, + WorkCommands: s.workCommands, } ads = append(ads, sa) } @@ -730,6 +734,25 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) { return sc.Clone(), nil } +// AddWorkCommand records a work command so it can be included in service announcements +func (s *Netceptor) AddWorkCommand(command string) error { + if command == "" { + return fmt.Errorf("must provide a name") + } + s.serviceAdsLock.Lock() + defer s.serviceAdsLock.Unlock() + if n, ok := s.serviceAdsReceived[s.NodeID()]; ok { + // if it's the local node, just update the local service advertisement + // structs directly + for _, ad := range n { + ad.WorkCommands = append(ad.WorkCommands, command) + } + } else { + s.workCommands = append(s.workCommands, command) + } + return nil +} + // SetServerTLSConfig stores a server TLS config by name func (s *Netceptor) SetServerTLSConfig(name string, config *tls.Config) error { if name == "" { diff --git a/pkg/workceptor/workceptor.go b/pkg/workceptor/workceptor.go index e12ec50ce..798a134e2 100644 --- a/pkg/workceptor/workceptor.go +++ b/pkg/workceptor/workceptor.go @@ -92,6 +92,9 @@ func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc w.workTypes[typeName] = &workType{ newWorkerFunc: newWorkerFunc, } + if typeName != "remote" { // all workceptors get a remote command by default + w.nc.AddWorkCommand(typeName) + } w.workTypesLock.Unlock() // Check if any unknown units have now become known diff --git a/receptorctl/receptorctl/cli.py b/receptorctl/receptorctl/cli.py index 0a6e60432..06d26d5b1 100644 --- a/receptorctl/receptorctl/cli.py +++ b/receptorctl/receptorctl/cli.py @@ -104,7 +104,7 @@ def status(ctx): ads = status.pop('Advertisements', None) if ads: print() - print(f"{'Node':<{longest_node}} Service Type Last Seen Tags") + print(f"{'Node':<{longest_node}} Service Type Last Seen Tags Work Types") for ad in ads: time = dateutil.parser.parse(ad['Time']) if ad['ConnType'] == 0: @@ -113,8 +113,16 @@ def status(ctx): conn_type = 'Stream' elif ad['ConnType'] == 2: conn_type = 'StreamTLS' - print(f"{ad['NodeID']:<{longest_node}} {ad['Service']:<9} {conn_type:<10} {time:%Y-%m-%d %H:%M:%S} ", end="") - pprint(ad['Tags']) + print( + f"{ad['NodeID']:<{longest_node}} {ad['Service']:<9} {conn_type:<10} {time:%Y-%m-%d %H:%M:%S} {(ad['Tags'] or '-'):<16}", + end="" + ) + commands = ad['WorkCommands'] + if commands: + commands = ', '.join(commands) + else: + commands = '-' + print(commands) if status: print("Additional data returned from Receptor:") diff --git a/tests/functional/lib/mesh/climesh.go b/tests/functional/lib/mesh/climesh.go index 3ec3ff0af..cb873344f 100644 --- a/tests/functional/lib/mesh/climesh.go +++ b/tests/functional/lib/mesh/climesh.go @@ -467,6 +467,39 @@ func (m *CLIMesh) CheckConnections() bool { return false } +// CheckAdvertisements returns true if the advertisements are recorded in +// a manner consistent with the work-commands defined for the mesh +func (m *CLIMesh) CheckAdvertisements() bool { + statusList, err := m.Status() + if err != nil { + return false + } + for _, status := range statusList { + actual := map[string][]string{} + for _, ad := range status.Advertisements { + if len(ad.WorkCommands) > 0 { + actual[ad.NodeID] = ad.WorkCommands + } + } + expected := map[string][]string{} + for node := range m.MeshDefinition.Nodes { + for _, attr := range m.MeshDefinition.Nodes[node].Nodedef { + attrMap := attr.(map[interface{}]interface{}) + for _, cmd := range []string{"work-command", "work-kubernetes", "work-python"} { + if v, ok := attrMap[cmd]; ok { + v, _ := v.(map[interface{}]interface{}) + expected[node] = append(expected[node], v["workType"].(string)) + } + } + } + } + if reflect.DeepEqual(actual, expected) { + return true + } + } + return false +} + // CheckKnownConnectionCosts returns true if every node has the same view of the connections in the mesh func (m *CLIMesh) CheckKnownConnectionCosts() bool { meshStatus, err := m.Status() @@ -536,6 +569,9 @@ func (m *CLIMesh) WaitForReady(ctx context.Context) error { if !utils.CheckUntilTimeout(ctx, sleepInterval, m.CheckRoutes) { return errors.New("Timed out while waiting for routes to converge") } + if !utils.CheckUntilTimeout(ctx, sleepInterval, m.CheckAdvertisements) { + return errors.New("Timed out while waiting for Advertisements") + } return nil }