From 6e49f07244ddf5c1b598295d12a2464fb63574bf Mon Sep 17 00:00:00 2001 From: Carl Montanari Date: Wed, 20 Mar 2024 18:40:08 -0700 Subject: [PATCH] feat: better link update handling for vxlan --- controllers/topology/deployment.go | 21 ++- controllers/topology/reconciler.go | 1 - .../allocate-tunnel-ids/updating-tunnels.json | 54 ++++++ controllers/topology/tunnels.go | 5 +- controllers/topology/tunnels_test.go | 134 +++++++++++++-- go.sum | 2 - launcher/connectivity/slurpeeth.go | 60 +------ launcher/connectivity/vxlan.go | 156 +++++++++++++++++- launcher/connectivity/watch.go | 68 ++++++++ 9 files changed, 413 insertions(+), 88 deletions(-) create mode 100755 controllers/topology/test-fixtures/golden/tunnels/allocate-tunnel-ids/updating-tunnels.json create mode 100644 launcher/connectivity/watch.go diff --git a/controllers/topology/deployment.go b/controllers/topology/deployment.go index d68acc9d..ef52ff89 100644 --- a/controllers/topology/deployment.go +++ b/controllers/topology/deployment.go @@ -1023,24 +1023,19 @@ func (r *DeploymentReconciler) Conforms( //nolint: gocyclo // rendered sub-topologies) and updates the reconcile data NodesNeedingReboot set with each node // that needs restarting due to configuration changes. func (r *DeploymentReconciler) DetermineNodesNeedingRestart( - owningTopology *clabernetesapisv1alpha1.Topology, reconcileData *ReconcileData, ) { - for nodeName, nodeConfig := range reconcileData.ResolvedConfigs { + for nodeName := range reconcileData.ResolvedConfigs { _, nodeExistedBefore := reconcileData.PreviousConfigs[nodeName] if !nodeExistedBefore { continue } - if owningTopology.Spec.Connectivity == clabernetesconstants.ConnectivitySlurpeeth { - determineNodeNeedsRestartSlurpeeth(reconcileData, nodeName) - } else if !reflect.DeepEqual(nodeConfig, reconcileData.PreviousConfigs[nodeName]) { - reconcileData.NodesNeedingReboot.Add(nodeName) - } + determineNodeNeedsRestart(reconcileData, nodeName) } } -func determineNodeNeedsRestartSlurpeeth( +func determineNodeNeedsRestart( reconcileData *ReconcileData, nodeName string, ) { @@ -1089,6 +1084,14 @@ func determineNodeNeedsRestartSlurpeeth( return } + if len(previousConfig.Topology.Links) != len(currentConfig.Topology.Links) { + // dont bother checking links since they cant be same/same, node needs rebooted to restart + // clab bits + reconcileData.NodesNeedingReboot.Add(nodeName) + + return + } + // we know (because we set this) that topology will never be nil and links will always be slices // that are len 2... so we are a little risky here but its probably ok :) for idx := range previousConfig.Topology.Links { @@ -1096,7 +1099,7 @@ func determineNodeNeedsRestartSlurpeeth( currentASide := currentConfig.Topology.Links[idx].Endpoints[0] if previousASide == currentASide { - // as long as "a" side is the same, slurpeeth will auto update itself since launcher is + // as long as "a" side is the same, things will auto update itself since launcher is // watching the connectivity cr continue } diff --git a/controllers/topology/reconciler.go b/controllers/topology/reconciler.go index 86700784..cbb5dfc9 100644 --- a/controllers/topology/reconciler.go +++ b/controllers/topology/reconciler.go @@ -673,7 +673,6 @@ func (r *Reconciler) reconcileDeploymentsHandleRestarts( r.Log.Debug("determining nodes needing restart") r.DeploymentReconciler.DetermineNodesNeedingRestart( - owningTopology, reconcileData, ) diff --git a/controllers/topology/test-fixtures/golden/tunnels/allocate-tunnel-ids/updating-tunnels.json b/controllers/topology/test-fixtures/golden/tunnels/allocate-tunnel-ids/updating-tunnels.json new file mode 100755 index 00000000..9214fa86 --- /dev/null +++ b/controllers/topology/test-fixtures/golden/tunnels/allocate-tunnel-ids/updating-tunnels.json @@ -0,0 +1,54 @@ +{ + "srl1": [ + { + "tunnelID": 1, + "destination": "topo-1-srl2.clabernetes.svc.cluster.local", + "localNode": "srl1", + "localInterface": "e1-1", + "remoteNode": "srl2", + "remoteInterface": "e1-3" + }, + { + "tunnelID": 2, + "destination": "topo-1-srl2.clabernetes.svc.cluster.local", + "localNode": "srl1", + "localInterface": "e1-2", + "remoteNode": "srl2", + "remoteInterface": "e1-1" + }, + { + "tunnelID": 3, + "destination": "topo-1-srl2.clabernetes.svc.cluster.local", + "localNode": "srl1", + "localInterface": "e1-3", + "remoteNode": "srl2", + "remoteInterface": "e1-2" + } + ], + "srl2": [ + { + "tunnelID": 1, + "destination": "topo-1-srl1.clabernetes.svc.cluster.local", + "localNode": "srl2", + "localInterface": "e1-3", + "remoteNode": "srl1", + "remoteInterface": "e1-1" + }, + { + "tunnelID": 2, + "destination": "topo-1-srl1.clabernetes.svc.cluster.local", + "localNode": "srl2", + "localInterface": "e1-1", + "remoteNode": "srl1", + "remoteInterface": "e1-2" + }, + { + "tunnelID": 3, + "destination": "topo-1-srl1.clabernetes.svc.cluster.local", + "localNode": "srl2", + "localInterface": "e1-2", + "remoteNode": "srl1", + "remoteInterface": "e1-3" + } + ] +} \ No newline at end of file diff --git a/controllers/topology/tunnels.go b/controllers/topology/tunnels.go index 70aa9635..f715c507 100644 --- a/controllers/topology/tunnels.go +++ b/controllers/topology/tunnels.go @@ -10,7 +10,7 @@ import ( // given status object by iterating over the freshly processed tunnels (as processed during a // reconciliation) and assigning any tunnels in the status without a vnid the next valid vnid. func AllocateTunnelIDs( - statusTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel, + previousTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel, processedTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel, ) { // we want to allocate ids deterministically, so lets iterate over the maps in *order* by @@ -33,7 +33,7 @@ func AllocateTunnelIDs( allocatedTunnelIDs := make(map[int]bool) for nodeName, nodeTunnels := range processedTunnels { - existingNodeTunnels, ok := statusTunnels[nodeName] + existingNodeTunnels, ok := previousTunnels[nodeName] if !ok { continue } @@ -41,6 +41,7 @@ func AllocateTunnelIDs( for _, newTunnel := range nodeTunnels { for _, existingTunnel := range existingNodeTunnels { if newTunnel.LocalInterface == existingTunnel.LocalInterface && + newTunnel.RemoteInterface == existingTunnel.RemoteInterface && newTunnel.RemoteNode == existingTunnel.RemoteNode { newTunnel.TunnelID = existingTunnel.TunnelID diff --git a/controllers/topology/tunnels_test.go b/controllers/topology/tunnels_test.go index 8f8c8202..1a584010 100644 --- a/controllers/topology/tunnels_test.go +++ b/controllers/topology/tunnels_test.go @@ -3,7 +3,6 @@ package topology_test import ( "encoding/json" "fmt" - "reflect" "testing" clabernetesapisv1alpha1 "github.com/srl-labs/clabernetes/apis/v1alpha1" @@ -20,12 +19,12 @@ const testAllocateTunnelIDsTestName = "tunnels/allocate-tunnel-ids" func TestAllocateTunnelIds(t *testing.T) { cases := []struct { name string - statusTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel + previousTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel processedTunnels map[string][]*clabernetesapisv1alpha1.PointToPointTunnel }{ { - name: "simple", - statusTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{}, + name: "simple", + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{}, processedTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ "srl1": { { @@ -51,7 +50,7 @@ func TestAllocateTunnelIds(t *testing.T) { }, { name: "simple-existing-status", - statusTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ "srl1": { { TunnelID: 0, @@ -98,7 +97,7 @@ func TestAllocateTunnelIds(t *testing.T) { }, { name: "simple-already-allocated-ids", - statusTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ "srl1": { { TunnelID: 1, @@ -145,7 +144,7 @@ func TestAllocateTunnelIds(t *testing.T) { }, { name: "simple-weirdly-allocated-ids", - statusTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ "srl1": { { TunnelID: 0, @@ -191,8 +190,8 @@ func TestAllocateTunnelIds(t *testing.T) { }, }, { - name: "meshy-links", - statusTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{}, + name: "meshy-links", + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{}, processedTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ "srl1": { { @@ -284,6 +283,117 @@ func TestAllocateTunnelIds(t *testing.T) { }, }, }, + { + name: "updating-tunnels", + previousTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ + "srl1": { + { + TunnelID: 1, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-1", + RemoteInterface: "e1-1", + }, + { + TunnelID: 2, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-2", + RemoteInterface: "e1-2", + }, + { + TunnelID: 3, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-3", + RemoteInterface: "e1-3", + }, + }, + "srl2": { + { + TunnelID: 1, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-1", + RemoteInterface: "e1-1", + }, + { + TunnelID: 2, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-2", + RemoteInterface: "e1-2", + }, + { + TunnelID: 3, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-3", + RemoteInterface: "e1-3", + }, + }, + }, + processedTunnels: map[string][]*clabernetesapisv1alpha1.PointToPointTunnel{ + "srl1": { + { + TunnelID: 0, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-1", + RemoteInterface: "e1-3", + }, + { + TunnelID: 0, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-2", + RemoteInterface: "e1-1", + }, + { + TunnelID: 0, + LocalNode: "srl1", + Destination: "topo-1-srl2.clabernetes.svc.cluster.local", + RemoteNode: "srl2", + LocalInterface: "e1-3", + RemoteInterface: "e1-2", + }, + }, + "srl2": { + { + TunnelID: 0, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-3", + RemoteInterface: "e1-1", + }, + { + TunnelID: 0, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-1", + RemoteInterface: "e1-2", + }, + { + TunnelID: 0, + LocalNode: "srl2", + Destination: "topo-1-srl1.clabernetes.svc.cluster.local", + RemoteNode: "srl1", + LocalInterface: "e1-2", + RemoteInterface: "e1-3", + }, + }, + }, + }, } for _, testCase := range cases { @@ -291,7 +401,7 @@ func TestAllocateTunnelIds(t *testing.T) { testCase.name, func(t *testing.T) { clabernetescontrollerstopology.AllocateTunnelIDs( - testCase.statusTunnels, + testCase.previousTunnels, testCase.processedTunnels, ) @@ -326,9 +436,7 @@ func TestAllocateTunnelIds(t *testing.T) { t.Fatal(err) } - if !reflect.DeepEqual(got, want) { - clabernetestesthelper.FailOutput(t, got, want) - } + clabernetestesthelper.MarshaledEqual(t, got, want) }, ) } diff --git a/go.sum b/go.sum index cb1d8861..98af13a6 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/carlmontanari/difflibgo v0.0.0-20231101235608-20f26fe20f37 h1:1cclCPimXRdSnaB1TaRf0N6cTyqqBxHa5K/CUpegsWY= -github.com/carlmontanari/difflibgo v0.0.0-20231101235608-20f26fe20f37/go.mod h1:+3MuSIeC3qmdSesR12cTLeb47R/Vvo+bHdB6hC5HShk= github.com/carlmontanari/difflibgo v0.0.0-20240227210139-93685b1c22ae h1:h4sxL/AXg3FRPf+sT2Y4daEQQE/UAkNAM3U0t4Cgha8= github.com/carlmontanari/difflibgo v0.0.0-20240227210139-93685b1c22ae/go.mod h1:+3MuSIeC3qmdSesR12cTLeb47R/Vvo+bHdB6hC5HShk= github.com/carlmontanari/slurpeeth v0.0.0-20240209224827-246fa87e31f3 h1:+XFqJFGVs1EVP5GajTTDB91tTKG+mF0jlN1U1jT90tI= diff --git a/launcher/connectivity/slurpeeth.go b/launcher/connectivity/slurpeeth.go index e8c13c17..d5159142 100644 --- a/launcher/connectivity/slurpeeth.go +++ b/launcher/connectivity/slurpeeth.go @@ -12,8 +12,6 @@ import ( clabernetesapisv1alpha1 "github.com/srl-labs/clabernetes/apis/v1alpha1" clabernetesconstants "github.com/srl-labs/clabernetes/constants" "gopkg.in/yaml.v3" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - apimachinerywatch "k8s.io/apimachinery/pkg/watch" ) const ( @@ -85,62 +83,20 @@ func (m *slurpeethManager) Run() { } }() + m.logger.Debug("initial slurpeeth tunnel creation complete") + m.logger.Debug("start connectivity custom resource watch...") - go m.watchConnectivity() + go watchConnectivity( + m.ctx, + m.logger, + m.clabernetesClient, + m.renderSlurpeethConfig, + ) m.logger.Debug("slurpeeth connectivity setup complete") } -func (m *slurpeethManager) watchConnectivity() { - nodeName := os.Getenv(clabernetesconstants.LauncherNodeNameEnv) - - listOptions := metav1.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", os.Getenv( - clabernetesconstants.LauncherTopologyNameEnv, - )), - Watch: true, - } - - watch, err := m.clabernetesClient.ClabernetesV1alpha1(). - Connectivities(os.Getenv(clabernetesconstants.PodNamespaceEnv)). - Watch(m.ctx, listOptions) - if err != nil { - m.logger.Fatalf("failed watching clabernetes connectivity, err: %s", err) - } - - for event := range watch.ResultChan() { - switch event.Type { //nolint:exhaustive - case apimachinerywatch.Modified: - m.logger.Info("processing connectivity modification event") - - tunnelsCR, ok := event.Object.(*clabernetesapisv1alpha1.Connectivity) - if !ok { - m.logger.Warn( - "failed casting event object to connectivity custom resource," + - " this is probably a bug", - ) - - continue - } - - nodeTunnels, ok := tunnelsCR.Spec.PointToPointTunnels[nodeName] - if !ok { - m.logger.Warnf( - "no tunnels found for node %q, continuing but things may be broken", - nodeName, - ) - } - - m.renderSlurpeethConfig(nodeTunnels) - default: - m.logger.Warnf( - "connectivity resource had %s event occur, ignoring...", event.Type, - ) - } - } -} - func (m *slurpeethManager) renderSlurpeethConfig( tunnels []*clabernetesapisv1alpha1.PointToPointTunnel, ) { diff --git a/launcher/connectivity/vxlan.go b/launcher/connectivity/vxlan.go index 8c1c4961..51c95e20 100644 --- a/launcher/connectivity/vxlan.go +++ b/launcher/connectivity/vxlan.go @@ -4,9 +4,11 @@ import ( "fmt" "net" "os/exec" + "reflect" "strconv" "time" + clabernetesapisv1alpha1 "github.com/srl-labs/clabernetes/apis/v1alpha1" clabernetesconstants "github.com/srl-labs/clabernetes/constants" claberneteserrors "github.com/srl-labs/clabernetes/errors" ) @@ -18,15 +20,18 @@ const ( type vxlanManager struct { *common + currentTunnels map[string]*clabernetesapisv1alpha1.PointToPointTunnel } func (m *vxlanManager) Run() { + m.currentTunnels = make(map[string]*clabernetesapisv1alpha1.PointToPointTunnel) + m.logger.Info( "connectivity mode is 'vxlan', setting up any required tunnels...", ) for _, tunnel := range m.initialTunnels { - err := m.runContainerlabVxlanTools( + err := m.runContainerlabVxlanToolsCreate( tunnel.LocalNode, tunnel.LocalInterface, tunnel.Destination, @@ -40,15 +45,27 @@ func (m *vxlanManager) Run() { err, ) } + + // we store them in a nice little map by local interface name so they're easy to + // reconcile on connectivity cr updates + m.currentTunnels[tunnel.LocalInterface] = tunnel } - m.logger.Debug("vxlan tunnel creation complete") + m.logger.Debug("initial vxlan tunnel creation complete") + + m.logger.Debug("start connectivity custom resource watch...") + + go watchConnectivity( + m.ctx, + m.logger, + m.clabernetesClient, + m.updateVxlanTunnels, + ) + + m.logger.Debug("slurpeeth connectivity setup complete") } -func (m *vxlanManager) runContainerlabVxlanTools( - localNodeName, cntLink, vxlanRemote string, - vxlanID int, -) error { +func (m *vxlanManager) resolveVXLANService(vxlanRemote string) (string, error) { var resolvedVxlanRemotes []net.IP var err error @@ -69,20 +86,30 @@ func (m *vxlanManager) runContainerlabVxlanTools( continue } - return err + return "", err } break } if len(resolvedVxlanRemotes) != 1 { - return fmt.Errorf( + return "", fmt.Errorf( "%w: did not get exactly one ip resolved for remote vxlan endpoint", claberneteserrors.ErrConnectivity, ) } - resolvedVxlanRemote := resolvedVxlanRemotes[0].String() + return resolvedVxlanRemotes[0].String(), nil +} + +func (m *vxlanManager) runContainerlabVxlanToolsCreate( + localNodeName, cntLink, vxlanRemote string, + vxlanID int, +) error { + resolvedVxlanRemote, err := m.resolveVXLANService(vxlanRemote) + if err != nil { + return err + } m.logger.Debugf("resolved remote vxlan tunnel service address as '%s'", resolvedVxlanRemote) @@ -115,3 +142,114 @@ func (m *vxlanManager) runContainerlabVxlanTools( return nil } + +func (m *vxlanManager) runContainerlabVxlanToolsDelete( + localNodeName, cntLink string, +) error { + cmd := exec.Command( //nolint:gosec + "containerlab", + "tools", + "vxlan", + "delete", + "--prefix", + fmt.Sprintf("vx-%s-%s", localNodeName, cntLink), + ) + + m.logger.Debugf( + "using following args for vxlan tunnel deletion (via containerlab) '%s'", cmd.Args, + ) + + cmd.Stdout = m.logger + cmd.Stderr = m.logger + + err := cmd.Run() + if err != nil { + return err + } + + return nil +} + +func (m *vxlanManager) updateVxlanTunnels( + tunnels []*clabernetesapisv1alpha1.PointToPointTunnel, +) { + // start with deleting extraneous tunnels... + for _, existingTunnel := range m.currentTunnels { + var found bool + + for _, tunnel := range tunnels { + if tunnel.LocalInterface == existingTunnel.LocalInterface { + found = true + + break + } + } + + if found { + // the existing tunnel (or rather its local interface) is represented in the "new" + // tunnels, nothing to do here + continue + } + + err := m.runContainerlabVxlanToolsDelete( + existingTunnel.LocalNode, existingTunnel.LocalInterface, + ) + if err != nil { + m.logger.Fatalf( + "failed deleting extraneous tunnel to remote node '%s' for local interface '%s'"+ + ", error: %s", + existingTunnel.RemoteNode, + existingTunnel.LocalInterface, + err, + ) + } + } + + tunnelsToReCreate := make([]*clabernetesapisv1alpha1.PointToPointTunnel, 0) + + for _, tunnel := range tunnels { + existingTunnel, ok := m.currentTunnels[tunnel.LocalInterface] + if ok && reflect.DeepEqual(existingTunnel, tunnel) { + // we've already got a tunnel setup for this interface, so we gotta check to see if our + // previously setup destination is the same -- if "yes" we can skip doing anything to + // this one. + continue + } + + if ok { + // tunnel for this interface exists but isnt the same as our desired setup, delete the + // old tunnel before we create the new one + err := m.runContainerlabVxlanToolsDelete( + tunnel.LocalNode, tunnel.LocalInterface, + ) + if err != nil { + m.logger.Fatalf( + "failed deleting existing tunnel to remote node '%s' for local interface '%s'"+ + " before re-configuring, error: %s", + tunnel.RemoteNode, + tunnel.LocalInterface, + err, + ) + } + } + + tunnelsToReCreate = append(tunnelsToReCreate, tunnel) + } + + for _, tunnel := range tunnelsToReCreate { + err := m.runContainerlabVxlanToolsCreate( + tunnel.LocalNode, + tunnel.LocalInterface, + tunnel.Destination, + tunnel.TunnelID, + ) + if err != nil { + m.logger.Fatalf( + "failed setting up tunnel to remote node '%s' for local interface '%s', error: %s", + tunnel.RemoteNode, + tunnel.LocalInterface, + err, + ) + } + } +} diff --git a/launcher/connectivity/watch.go b/launcher/connectivity/watch.go new file mode 100644 index 00000000..a15d9a00 --- /dev/null +++ b/launcher/connectivity/watch.go @@ -0,0 +1,68 @@ +package connectivity + +import ( + "context" + "fmt" + "os" + + clabernetesapisv1alpha1 "github.com/srl-labs/clabernetes/apis/v1alpha1" + clabernetesconstants "github.com/srl-labs/clabernetes/constants" + clabernetesgeneratedclientset "github.com/srl-labs/clabernetes/generated/clientset" + claberneteslogging "github.com/srl-labs/clabernetes/logging" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachinerywatch "k8s.io/apimachinery/pkg/watch" +) + +func watchConnectivity( + ctx context.Context, + logger claberneteslogging.Instance, + clabernetesClient *clabernetesgeneratedclientset.Clientset, + handleUpdate func(nodeTunnels []*clabernetesapisv1alpha1.PointToPointTunnel), +) { + nodeName := os.Getenv(clabernetesconstants.LauncherNodeNameEnv) + + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", os.Getenv( + clabernetesconstants.LauncherTopologyNameEnv, + )), + Watch: true, + } + + watch, err := clabernetesClient.ClabernetesV1alpha1(). + Connectivities(os.Getenv(clabernetesconstants.PodNamespaceEnv)). + Watch(ctx, listOptions) + if err != nil { + logger.Fatalf("failed watching clabernetes connectivity, err: %s", err) + } + + for event := range watch.ResultChan() { + switch event.Type { //nolint:exhaustive + case apimachinerywatch.Modified: + logger.Info("processing connectivity modification event") + + tunnelsCR, ok := event.Object.(*clabernetesapisv1alpha1.Connectivity) + if !ok { + logger.Warn( + "failed casting event object to connectivity custom resource," + + " this is probably a bug", + ) + + continue + } + + nodeTunnels, ok := tunnelsCR.Spec.PointToPointTunnels[nodeName] + if !ok { + logger.Warnf( + "no tunnels found for node %q, continuing but things may be broken", + nodeName, + ) + } + + handleUpdate(nodeTunnels) + default: + logger.Warnf( + "connectivity resource had %s event occur, ignoring...", event.Type, + ) + } + } +}