diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 7730df7d85..7b92b9b8a5 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -32,6 +32,7 @@ import ( flag "github.com/spf13/pflag" crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -43,8 +44,10 @@ import ( frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned" "k8s.io/ingress-gce/pkg/instancegroups" "k8s.io/ingress-gce/pkg/l4lb" + multiprojectgce "k8s.io/ingress-gce/pkg/multiproject/gce" multiprojectstart "k8s.io/ingress-gce/pkg/multiproject/start" "k8s.io/ingress-gce/pkg/network" + providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" "k8s.io/ingress-gce/pkg/psc" "k8s.io/ingress-gce/pkg/serviceattachment" serviceattachmentclient "k8s.io/ingress-gce/pkg/serviceattachment/client/clientset/versioned" @@ -243,17 +246,28 @@ func main() { rootLogger.Info("Multi-project mode is enabled, starting project-syncer") runWithWg(func() { + gceCreator, err := multiprojectgce.NewDefaultGCECreator(rootLogger) + if err != nil { + klog.Fatalf("Failed to create GCE creator: %v", err) + } + providerConfigClient, err := providerconfigclient.NewForConfig(kubeConfig) + if err != nil { + klog.Fatalf("Failed to create ProviderConfig client: %v", err) + } + informersFactory := informers.NewSharedInformerFactory(kubeClient, flags.F.ResyncPeriod) if flags.F.LeaderElection.LeaderElect { err := multiprojectstart.StartWithLeaderElection( context.Background(), leaderElectKubeClient, hostname, - kubeConfig, rootLogger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, + providerConfigClient, + informersFactory, + gceCreator, namer, stopCh, ) @@ -262,12 +276,14 @@ func main() { } } else { multiprojectstart.Start( - kubeConfig, rootLogger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, + providerConfigClient, + informersFactory, + gceCreator, namer, stopCh, ) diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index e3ef7dedaf..f747d14d79 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -332,7 +332,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.BoolVar(&F.EnableMultiProjectMode, "enable-multi-project-mode", false, "Enable running in multi-project mode.") flag.BoolVar(&F.EnableL4ILBMixedProtocol, "enable-l4ilb-mixed-protocol", false, "Enable support for mixed protocol L4 internal load balancers.") flag.BoolVar(&F.EnableL4NetLBMixedProtocol, "enable-l4netlb-mixed-protocol", false, "Enable support for mixed protocol L4 external load balancers.") - flag.StringVar(&F.ProviderConfigNameLabelKey, "provider-config-name-label-key", "", "The label key for provider-config name, which is used to identify the provider-config of objects in multi-project mode.") + flag.StringVar(&F.ProviderConfigNameLabelKey, "provider-config-name-label-key", "cloud.gke.io/provider-config-name", "The label key for provider-config name, which is used to identify the provider-config of objects in multi-project mode.") flag.BoolVar(&F.EnableIPV6OnlyNEG, "enable-ipv6-only-neg", false, "Enable support for IPV6 Only NEG's.") } diff --git a/pkg/multiproject/controller/controller.go b/pkg/multiproject/controller/controller.go index fc58efd128..c1b11352b1 100644 --- a/pkg/multiproject/controller/controller.go +++ b/pkg/multiproject/controller/controller.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math/rand" + "runtime/debug" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -55,8 +56,14 @@ func NewProviderConfigController(manager ProviderConfigControllerManager, provid providerConfigInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { pcc.providerConfigQueue.Enqueue(obj) }, - UpdateFunc: func(old, cur interface{}) { pcc.providerConfigQueue.Enqueue(cur) }, + AddFunc: func(obj interface{}) { + pcc.logger.V(4).Info("Enqueue add event", "object", obj) + pcc.providerConfigQueue.Enqueue(obj) + }, + UpdateFunc: func(old, cur interface{}) { + pcc.logger.V(4).Info("Enqueue update event", "old", old, "new", cur) + pcc.providerConfigQueue.Enqueue(cur) + }, }) pcc.logger.Info("ProviderConfig controller created") @@ -70,6 +77,7 @@ func (pcc *ProviderConfigController) Run() { ctx, cancel := context.WithCancel(context.Background()) go func() { <-pcc.stopCh + pcc.logger.Info("Stop channel closed, cancelling context") cancel() }() @@ -98,7 +106,8 @@ func (pcc *ProviderConfigController) syncWrapper(key string) error { defer func() { if r := recover(); r != nil { - svcLogger.Error(fmt.Errorf("panic in ProviderConfig sync worker goroutine: %v", r), "Recovered from panic") + stack := string(debug.Stack()) + svcLogger.Error(fmt.Errorf("panic in ProviderConfig sync worker goroutine: %v, stack: %s", r, stack), "Recovered from panic") } }() err := pcc.sync(key, svcLogger) @@ -131,12 +140,12 @@ func (pcc *ProviderConfigController) sync(key string, logger klog.Logger) error return nil } - logger.V(2).Info("Syncing providerConfig", "providerConfig", pc) + logger.Info("Syncing providerConfig", "providerConfig", pc) err = pcc.manager.StartControllersForProviderConfig(pc) if err != nil { return fmt.Errorf("failed to start controllers for providerConfig %v: %w", pc, err) } - logger.V(2).Info("Successfully synced providerConfig", "providerConfig", pc) + logger.Info("Successfully synced providerConfig", "providerConfig", pc) return nil } diff --git a/pkg/multiproject/gce/fake.go b/pkg/multiproject/gce/fake.go new file mode 100644 index 0000000000..b781a5300e --- /dev/null +++ b/pkg/multiproject/gce/fake.go @@ -0,0 +1,104 @@ +package gce + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + compute "google.golang.org/api/compute/v1" + cloudgce "k8s.io/cloud-provider-gcp/providers/gce" + v1 "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/ingress-gce/pkg/test" + "k8s.io/klog/v2" +) + +type GCEFake struct { + defaultTestClusterValues cloudgce.TestClusterValues + clientsForProviderConfigs map[string]*cloudgce.Cloud +} + +func NewGCEFake() *GCEFake { + return &GCEFake{ + defaultTestClusterValues: cloudgce.DefaultTestClusterValues(), + clientsForProviderConfigs: make(map[string]*cloudgce.Cloud), + } +} + +func providerConfigKey(providerConfig *v1.ProviderConfig) string { + return fmt.Sprintf("%s/%s", providerConfig.Namespace, providerConfig.Name) +} + +// GCEForProviderConfig returns a new Fake GCE client for the given provider config. +// It stores the client in the GCEFake and returns it if the same provider config is requested again. +func (g *GCEFake) GCEForProviderConfig(providerConfig *v1.ProviderConfig, logger klog.Logger) (*cloudgce.Cloud, error) { + pcKey := providerConfigKey(providerConfig) + if g.clientsForProviderConfigs[pcKey] != nil { + return g.clientsForProviderConfigs[pcKey], nil + } + + // Copy the default test cluster values + updatedConfig := g.defaultTestClusterValues + updatedConfig.ProjectID = providerConfig.Spec.ProjectID + updatedConfig.NetworkURL = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/global/networks/%s", providerConfig.Spec.ProjectID, providerConfig.Spec.NetworkConfig.Network) + updatedConfig.SubnetworkURL = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", providerConfig.Spec.ProjectID, updatedConfig.Region, providerConfig.Spec.NetworkConfig.DefaultSubnetwork) + logger.Info("Creating GCEFake for provider config", "providerConfig", providerConfig.Name, "updatedConfig", updatedConfig) + fakeCloud := cloudgce.NewFakeGCECloud(updatedConfig) + _, err := createNetwork(fakeCloud, providerConfig.Spec.NetworkConfig.Network) + if err != nil { + return nil, err + } + _, err = createSubnetwork(fakeCloud, providerConfig.Spec.NetworkConfig.DefaultSubnetwork, providerConfig.Spec.NetworkConfig.Network) + if err != nil { + return nil, err + } + if err := createAndInsertNodes(fakeCloud, []string{"test-node-1"}, updatedConfig.ZoneName); err != nil { + return nil, err + } + + g.clientsForProviderConfigs[pcKey] = fakeCloud + return fakeCloud, nil +} + +func createAndInsertNodes(cloud *cloudgce.Cloud, nodeNames []string, zone string) error { + if _, err := test.CreateAndInsertNodes(cloud, nodeNames, zone); err != nil { + return err + } + return nil +} + +func createNetwork(c *cloudgce.Cloud, networkName string) (*compute.Network, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + + key := meta.GlobalKey(networkName) + err := c.Compute().Networks().Insert(ctx, key, &compute.Network{Name: networkName}) + if err != nil { + return nil, err + } + + network, err := c.Compute().Networks().Get(ctx, key) + if err != nil { + return nil, err + } + return network, nil +} + +func createSubnetwork(c *cloudgce.Cloud, subnetworkName string, networkName string) (*compute.Subnetwork, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + + key := meta.GlobalKey(subnetworkName) + err := c.Compute().Subnetworks().Insert(ctx, key, &compute.Subnetwork{Name: subnetworkName, Network: networkName}) + if err != nil { + return nil, err + } + + subnetwork, err := c.Compute().Subnetworks().Get(ctx, key) + if err != nil { + return nil, err + } + return subnetwork, nil +} + +// assert that the GCEFake implements the GCECreator interface +var _ GCECreator = &GCEFake{} diff --git a/pkg/multiproject/gce/fake_test.go b/pkg/multiproject/gce/fake_test.go new file mode 100644 index 0000000000..fd5a8944dd --- /dev/null +++ b/pkg/multiproject/gce/fake_test.go @@ -0,0 +1,38 @@ +package gce + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/klog/v2" +) + +func TestNewGCEForProviderConfig(t *testing.T) { + fake := NewGCEFake() + + providerConfig := &v1.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-config", + }, + Spec: v1.ProviderConfigSpec{ + ProjectID: "custom-project-id", + NetworkConfig: &v1.NetworkConfig{ + Network: "custom-network", + DefaultSubnetwork: "custom-subnetwork", + }, + }, + } + + logger := klog.TODO() + cloud, err := fake.GCEForProviderConfig(providerConfig, logger) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if cloud == nil { + t.Fatal("expected cloud instance, got nil") + } + if cloud.ProjectID() != providerConfig.Spec.ProjectID { + t.Errorf("expected project id %q, got %q", providerConfig.Spec.ProjectID, cloud.ProjectID()) + } +} diff --git a/pkg/multiproject/gce/gce.go b/pkg/multiproject/gce/gce.go index e419f80bef..ccfa68cbc0 100644 --- a/pkg/multiproject/gce/gce.go +++ b/pkg/multiproject/gce/gce.go @@ -21,11 +21,29 @@ func init() { ini.PrettySection = true } -// NewGCEForProviderConfig returns a new GCE client for the given project. +type GCECreator interface { + GCEForProviderConfig(providerConfig *v1.ProviderConfig, logger klog.Logger) (*cloudgce.Cloud, error) +} + +type DefaultGCECreator struct { + defaultConfigFileString string +} + +func NewDefaultGCECreator(logger klog.Logger) (*DefaultGCECreator, error) { + defaultGCEConfig, err := app.GCEConfString(logger) + if err != nil { + return nil, fmt.Errorf("error getting default cluster GCE config: %v", err) + } + return &DefaultGCECreator{ + defaultConfigFileString: defaultGCEConfig, + }, nil +} + +// GCEForProviderConfig returns a new GCE client for the given project. // If providerConfig is nil, it returns the default cloud associated with the cluster's project. // It modifies the default configuration when a providerConfig is provided. -func NewGCEForProviderConfig(defaultConfigContent string, providerConfig *v1.ProviderConfig, logger klog.Logger) (*cloudgce.Cloud, error) { - modifiedConfigContent, err := generateConfigForProviderConfig(defaultConfigContent, providerConfig) +func (g *DefaultGCECreator) GCEForProviderConfig(providerConfig *v1.ProviderConfig, logger klog.Logger) (*cloudgce.Cloud, error) { + modifiedConfigContent, err := generateConfigForProviderConfig(g.defaultConfigFileString, providerConfig) if err != nil { return nil, fmt.Errorf("failed to modify config content: %v", err) } diff --git a/pkg/multiproject/manager/manager.go b/pkg/multiproject/manager/manager.go index 8e461e7851..3c648a21fd 100644 --- a/pkg/multiproject/manager/manager.go +++ b/pkg/multiproject/manager/manager.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/kubernetes" providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" "k8s.io/ingress-gce/pkg/multiproject/finalizer" + "k8s.io/ingress-gce/pkg/multiproject/gce" "k8s.io/ingress-gce/pkg/multiproject/neg" "k8s.io/ingress-gce/pkg/neg/syncers/labels" providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" @@ -35,7 +36,7 @@ type ProviderConfigControllersManager struct { clusterNamer *namer.Namer l4Namer *namer.L4Namer lpConfig labels.PodLabelPropagationConfig - defaultCloudConfig string + gceCreator gce.GCECreator globalStopCh <-chan struct{} } @@ -53,7 +54,7 @@ func NewProviderConfigControllerManager( clusterNamer *namer.Namer, l4Namer *namer.L4Namer, lpConfig labels.PodLabelPropagationConfig, - defaultCloudConfig string, + gceCreator gce.GCECreator, globalStopCh <-chan struct{}, logger klog.Logger, ) *ProviderConfigControllersManager { @@ -69,7 +70,7 @@ func NewProviderConfigControllerManager( clusterNamer: clusterNamer, l4Namer: l4Namer, lpConfig: lpConfig, - defaultCloudConfig: defaultCloudConfig, + gceCreator: gceCreator, globalStopCh: globalStopCh, } } @@ -83,6 +84,8 @@ func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig( defer pccm.mu.Unlock() pcKey := providerConfigKey(pc) + + pccm.logger.Info("Starting controllers for provider config", "providerConfigId", pcKey) if _, exists := pccm.controllers[pcKey]; exists { pccm.logger.Info("Controllers for provider config already exist, skipping start", "providerConfigId", pcKey) return nil @@ -93,6 +96,11 @@ func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig( return fmt.Errorf("failed to ensure NEG cleanup finalizer for project %s: %v", pcKey, err) } + cloud, err := pccm.gceCreator.GCEForProviderConfig(pc, pccm.logger) + if err != nil { + return fmt.Errorf("failed to create GCE client for provider config %+v: %v", pc, err) + } + negControllerStopCh, err := neg.StartNEGController( pccm.informersFactory, pccm.kubeClient, @@ -104,7 +112,7 @@ func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig( pccm.clusterNamer, pccm.l4Namer, pccm.lpConfig, - pccm.defaultCloudConfig, + cloud, pccm.globalStopCh, pccm.logger, pc, diff --git a/pkg/multiproject/neg/neg.go b/pkg/multiproject/neg/neg.go index 95255bcae1..2cb2e111e2 100644 --- a/pkg/multiproject/neg/neg.go +++ b/pkg/multiproject/neg/neg.go @@ -15,7 +15,6 @@ import ( providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/multiproject/filteredinformer" - multiprojectgce "k8s.io/ingress-gce/pkg/multiproject/gce" "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/pkg/neg/syncers/labels" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -43,7 +42,7 @@ func StartNEGController( clusterNamer *namer.Namer, l4Namer *namer.L4Namer, lpConfig labels.PodLabelPropagationConfig, - defaultCloudConfig string, + cloud *gce.Cloud, globalStopCh <-chan struct{}, logger klog.Logger, providerConfig *providerconfig.ProviderConfig, @@ -51,11 +50,6 @@ func StartNEGController( providerConfigName := providerConfig.Name logger.V(2).Info("Initializing NEG controller", "providerConfig", providerConfigName) - cloud, err := multiprojectgce.NewGCEForProviderConfig(defaultCloudConfig, providerConfig, logger) - if err != nil { - return nil, fmt.Errorf("failed to create GCE client for provider config %+v: %v", providerConfig, err) - } - // The ProviderConfig-specific stop channel. We close this in StopControllersForProviderConfig. providerConfigStopCh := make(chan struct{}) @@ -145,9 +139,10 @@ func initializeInformers( informersFactory.Discovery().V1().EndpointSlices().Informer(), providerConfigName, ) - err := endpointSliceInformer.AddIndexers(map[string]cache.IndexFunc{ - endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc, - }) + // Even though we created separate "provider-config-filtered" informer, informers from the same + // factory will share indexers. That's why we need to add the indexer only if it's not present. + // This basically means we will only add indexer to the first provider config's informer. + err := addIndexerIfNotPresent(endpointSliceInformer.GetIndexer(), endpointslices.EndpointSlicesByServiceIndex, endpointslices.EndpointSlicesByServiceFunc) if err != nil { return nil, nil, fmt.Errorf("failed to add indexers to endpointSliceInformer: %v", err) } @@ -229,6 +224,16 @@ func initializeInformers( return informers, hasSynced, nil } +// addIndexerIfNotPresent adds an indexer to the indexer if it's not present. +// This is needed because informers from the same factory will share indexers. +func addIndexerIfNotPresent(indexer cache.Indexer, indexName string, indexFunc cache.IndexFunc) error { + indexers := indexer.GetIndexers() + if _, ok := indexers[indexName]; ok { + return nil + } + return indexer.AddIndexers(cache.Indexers{indexName: indexFunc}) +} + func createNEGController( kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, diff --git a/pkg/multiproject/start/start.go b/pkg/multiproject/start/start.go index 8863a2692f..fd7b533b88 100644 --- a/pkg/multiproject/start/start.go +++ b/pkg/multiproject/start/start.go @@ -10,13 +10,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/ingress-gce/cmd/glbc/app" "k8s.io/ingress-gce/pkg/flags" _ "k8s.io/ingress-gce/pkg/klog" pccontroller "k8s.io/ingress-gce/pkg/multiproject/controller" + "k8s.io/ingress-gce/pkg/multiproject/gce" "k8s.io/ingress-gce/pkg/multiproject/manager" "k8s.io/ingress-gce/pkg/neg/syncers/labels" providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" @@ -34,18 +33,20 @@ func StartWithLeaderElection( ctx context.Context, leaderElectKubeClient kubernetes.Interface, hostname string, - kubeConfig *rest.Config, logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, + providerConfigClient providerconfigclient.Interface, + informersFactory informers.SharedInformerFactory, + gceCreator gce.GCECreator, rootNamer *namer.Namer, stopCh <-chan struct{}, ) error { recordersManager := recorders.NewManager(eventRecorderKubeClient, logger) - leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, informersFactory, gceCreator, rootNamer, stopCh) if err != nil { return err } @@ -60,12 +61,14 @@ func makeLeaderElectionConfig( leaderElectKubeClient kubernetes.Interface, hostname string, recordersManager *recorders.Manager, - kubeConfig *rest.Config, logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, + providerConfigClient providerconfigclient.Interface, + informersFactory informers.SharedInformerFactory, + gceCreator gce.GCECreator, rootNamer *namer.Namer, stopCh <-chan struct{}, ) (*leaderelection.LeaderElectionConfig, error) { @@ -93,7 +96,7 @@ func makeLeaderElectionConfig( RetryPeriod: flags.F.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(context.Context) { - Start(kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + Start(logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, informersFactory, gceCreator, rootNamer, stopCh) }, OnStoppedLeading: func() { logger.Info("Stop running multi-project leader election") @@ -103,22 +106,19 @@ func makeLeaderElectionConfig( } // Start starts the ProviderConfig controller. -// It builds required clients, context and starts the controller. +// It builds required context and starts the controller. func Start( - kubeConfig *rest.Config, logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, + providerConfigClient providerconfigclient.Interface, + informersFactory informers.SharedInformerFactory, + gceCreator gce.GCECreator, rootNamer *namer.Namer, stopCh <-chan struct{}, ) { - providerConfigClient, err := providerconfigclient.NewForConfig(kubeConfig) - if err != nil { - klog.Fatalf("Failed to create ProviderConfig client: %v", err) - } - lpConfig := labels.PodLabelPropagationConfig{} if flags.F.EnableNEGLabelPropagation { lpConfigEnvVar := os.Getenv("LABEL_PROPAGATION_CONFIG") @@ -127,13 +127,6 @@ func Start( } } - defaultGCEConfig, err := app.GCEConfString(logger) - if err != nil { - klog.Fatalf("Error getting default cluster GCE config: %v", err) - } - - informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) - providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Cloud().V1().ProviderConfigs().Informer() go providerConfigInformer.Run(stopCh) @@ -147,7 +140,7 @@ func Start( rootNamer, namer.NewL4Namer(string(kubeSystemUID), rootNamer), lpConfig, - defaultGCEConfig, + gceCreator, stopCh, logger, ) diff --git a/pkg/multiproject/start/start_test.go b/pkg/multiproject/start/start_test.go new file mode 100644 index 0000000000..486443bc96 --- /dev/null +++ b/pkg/multiproject/start/start_test.go @@ -0,0 +1,597 @@ +package start + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + cloudgce "k8s.io/cloud-provider-gcp/providers/gce" + "k8s.io/ingress-gce/pkg/annotations" + providerconfigv1 "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/ingress-gce/pkg/flags" + multiprojectgce "k8s.io/ingress-gce/pkg/multiproject/gce" + "k8s.io/ingress-gce/pkg/multiproject/testutil" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + pcclientfake "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned/fake" + svcnegfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/utils/namer" + klog "k8s.io/klog/v2" +) + +const ( + negAnnVal = `{"exposed_ports":{"80":{}}}` + testNamedPort = "named-Port" + managedByEPSControllerValue = "endpointslice-controller.k8s.io" +) + +// TestMain adjusts global test settings. It sets the verbosity for klog, etc. +func TestMain(m *testing.M) { + flag.Parse() + + // Set klog verbosity, for example to 5 for debugging output + fs := flag.NewFlagSet("mock-flags", flag.PanicOnError) + klog.InitFlags(fs) + _ = fs.Set("v", "5") + + os.Exit(m.Run()) +} + +// TestStartProviderConfigIntegration creates ProviderConfig, Services inside, +// and verifies that the actual NEG is created and the Service is updated with the NEG status. +func TestStartProviderConfigIntegration(t *testing.T) { + flags.Register() + flags.F.ProviderConfigNameLabelKey = "cloud.gke.io/provider-config-name" + + providerConfigName1 := "test-pc1" + providerConfigName2 := "test-pc2" + + testCases := []struct { + desc string + providerConfigs []*providerconfigv1.ProviderConfig + services []*corev1.Service + }{ + { + desc: "Single ProviderConfig, Single Service", + providerConfigs: []*providerconfigv1.ProviderConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: providerConfigName1, + Namespace: providerConfigName1, + }, + Spec: providerconfigv1.ProviderConfigSpec{ + ProjectID: "my-project", + ProjectNumber: 12345, + NetworkConfig: &providerconfigv1.NetworkConfig{ + Network: "my-network", + DefaultSubnetwork: "my-subnetwork", + }, + }, + }, + }, + services: []*corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svc", + Namespace: providerConfigName1, + Labels: map[string]string{ + "cloud.gke.io/provider-config-name": providerConfigName1, + "app": "testapp", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "testapp"}, + Ports: []corev1.ServicePort{ + {Port: 80}, + }, + }, + }, + }, + }, + { + desc: "Multiple ProviderConfigs, Multiple Services referencing each", + providerConfigs: []*providerconfigv1.ProviderConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: providerConfigName1, + Namespace: providerConfigName1, + }, + Spec: providerconfigv1.ProviderConfigSpec{ + ProjectID: "project-1", + ProjectNumber: 1111, + NetworkConfig: &providerconfigv1.NetworkConfig{ + Network: "my-network-1", + DefaultSubnetwork: "my-subnetwork-1", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: providerConfigName2, + Namespace: providerConfigName2, + }, + Spec: providerconfigv1.ProviderConfigSpec{ + ProjectID: "project-2", + ProjectNumber: 2222, + NetworkConfig: &providerconfigv1.NetworkConfig{ + Network: "my-network-2", + DefaultSubnetwork: "my-subnetwork-2", + }, + }, + }, + }, + services: []*corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1-1", + Namespace: providerConfigName1, + Labels: map[string]string{ + "cloud.gke.io/provider-config-name": providerConfigName1, + "app": "svc1-app", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "svc1-app"}, + Ports: []corev1.ServicePort{ + {Port: 80}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "svc2-2", + Namespace: providerConfigName2, + Labels: map[string]string{ + "cloud.gke.io/provider-config-name": providerConfigName2, + "app": "svc2-app", + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{"app": "svc2-app"}, + Ports: []corev1.ServicePort{ + {Port: 80}, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + flags.F.ResyncPeriod = 10000 * time.Second + + // Build fake clients. + kubeClient := fake.NewSimpleClientset() + pcClient := pcclientfake.NewSimpleClientset() + svcNegClient := svcnegfake.NewSimpleClientset() + + // This simulates the automatic labeling that the real environment does. + // ProviderConfig name label is set to the namespace of the object. + testutil.EmulateProviderConfigLabelingWebhook(svcNegClient.Tracker(), &svcNegClient.Fake, "servicenetworkendpointgroups") + + logger := klog.TODO() + gceCreator := multiprojectgce.NewGCEFake() + informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) + + rootNamer := namer.NewNamer("test-clusteruid", "", logger) + kubeSystemUID := types.UID("test-kube-system-uid") + + ctx, cancel := context.WithCancel(context.Background()) + stopCh := make(chan struct{}) + defer func() { + cancel() + close(stopCh) + }() + + // Start the multi-project code in the background. + go func() { + Start( + logger, + kubeClient, + svcNegClient, + kubeSystemUID, + kubeClient, // eventRecorderKubeClient can be the same as main client + pcClient, + informersFactory, + gceCreator, + rootNamer, + stopCh, + ) + }() + + // Create the test's ProviderConfigs. + for _, pc := range tc.providerConfigs { + addressPrefix := "10.100" + if pc.Name == providerConfigName2 { + addressPrefix = "20.100" + } + populateFakeNodeInformer( + kubeClient, + informersFactory.Core().V1().Nodes().Informer(), + pc.Name, + addressPrefix, + ) + + if _, err := pcClient.CloudV1().ProviderConfigs(pc.Namespace).Create(ctx, pc, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create ProviderConfig %q: %v", pc.Name, err) + } + } + + // Confirm that all ProviderConfigs have the finalizer. + for _, pc := range tc.providerConfigs { + if err := waitForProviderConfigFinalizer(ctx, t, pcClient, pc); err != nil { + t.Errorf("Timed out waiting for ProviderConfig %q finalizer: %v", pc.Name, err) + } + } + + // Create the test's Services, including the NEG annotation. + for _, svc := range tc.services { + if svc.Annotations == nil { + svc.Annotations = map[string]string{} + } + svc.Annotations[annotations.NEGAnnotationKey] = negAnnVal + + if _, err := kubeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Service %q: %v", svc.Name, err) + } + + // Populate endpoint slices in the fake informer. + addressPrefix := "10.100" + if svc.Labels[flags.F.ProviderConfigNameLabelKey] == providerConfigName2 { + addressPrefix = "20.100" + } + populateFakeEndpointSlices( + informersFactory.Discovery().V1().EndpointSlices().Informer(), + svc.Name, + svc.Labels[flags.F.ProviderConfigNameLabelKey], + addressPrefix, + ) + } + + // Validate each service against the corresponding ProviderConfig. + // (In the second test case, we rely on the index i matching the PC array.) + for i, svc := range tc.services { + pc := tc.providerConfigs[i] + validateService(ctx, t, kubeClient, svcNegClient, gceCreator, svc, pc) + } + }) + } +} + +// validateService checks the final states of the Service, SvcNEG, and the GCE NEG. +func validateService( + ctx context.Context, + t *testing.T, + kubeClient kubernetes.Interface, + svcNegClient *svcnegfake.Clientset, + gceCreator *multiprojectgce.GCEFake, + svc *corev1.Service, + pc *providerconfigv1.ProviderConfig, +) { + t.Helper() + + negNames, err := checkNEGStatus(ctx, t, kubeClient, svc, []string{"80"}) + if err != nil { + t.Errorf("NEG status annotation on Service %q is not in the expected state: %v", svc.Name, err) + return + } + + for _, negName := range negNames { + if err := checkSvcNEG(ctx, t, svcNegClient, svc, negName); err != nil { + t.Errorf("Svc NEG on Service %q is not in the expected state: %v", svc.Name, err) + } + + gce, err := gceCreator.GCEForProviderConfig(pc, klog.TODO()) + if err != nil { + t.Errorf("Failed to get GCE for ProviderConfig %q: %v", pc.Name, err) + continue + } + if err := verifyCloudNEG(t, gce, negName); err != nil { + t.Errorf("NEG %q on Service %q is not in the expected state: %v", negName, svc.Name, err) + } + } +} + +// checkNEGStatus polls until the NEG Status annotation is set on the Service and validates it. +func checkNEGStatus( + ctx context.Context, + t *testing.T, + kubeClient kubernetes.Interface, + svc *corev1.Service, + expectSvcPorts []string, +) ([]string, error) { + t.Helper() + + var latestSvc *corev1.Service + if err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var errSvc error + latestSvc, errSvc = kubeClient.CoreV1().Services(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{}) + if errSvc != nil { + return false, errSvc + } + val, ok := latestSvc.Annotations[annotations.NEGStatusKey] + return ok && val != "", nil + }); err != nil { + return nil, fmt.Errorf("timed out waiting for NEG status on service %q: %v", svc.Name, err) + } + + annotation := latestSvc.Annotations[annotations.NEGStatusKey] + negStatus, err := annotations.ParseNegStatus(annotation) + if err != nil { + return nil, fmt.Errorf("invalid neg status annotation %q on service %s/%s: %v", + annotation, latestSvc.Namespace, latestSvc.Name, err) + } + + expectedPorts := sets.NewString(expectSvcPorts...) + existingPorts := sets.NewString() + for port := range negStatus.NetworkEndpointGroups { + existingPorts.Insert(port) + } + if !expectedPorts.Equal(existingPorts) { + return nil, fmt.Errorf( + "service %s/%s annotation mismatch: got ports %q, want %q", + svc.Namespace, svc.Name, existingPorts.List(), expectedPorts.List(), + ) + } + + var negNames []string + for _, neg := range negStatus.NetworkEndpointGroups { + negNames = append(negNames, neg) + } + return negNames, nil +} + +// checkSvcNEG polls until the SvcNEG resource is created for a given Service/NEG name. +func checkSvcNEG( + ctx context.Context, + t *testing.T, + svcNegClient *svcnegfake.Clientset, + svc *corev1.Service, + negName string, +) error { + t.Helper() + + return wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) { + negCheck, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Get(ctx, negName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + t.Logf("Svc NEG found: %s/%s", negCheck.Namespace, negCheck.Name) + return true, nil + }) +} + +// waitForProviderConfigFinalizer ensures a ProviderConfig has the expected finalizer. +func waitForProviderConfigFinalizer( + ctx context.Context, + t *testing.T, + pcClient *pcclientfake.Clientset, + pc *providerconfigv1.ProviderConfig, +) error { + t.Helper() + + return wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + pcCheck, err := pcClient.CloudV1().ProviderConfigs(pc.Namespace).Get(ctx, pc.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, f := range pcCheck.Finalizers { + if f == "multiproject.networking.gke.io/neg-cleanup" { + t.Logf("ProviderConfig %q has expected finalizer", pc.Name) + return true, nil + } + } + return false, nil + }) +} + +// verifyCloudNEG checks that the NEG exists in the GCE fake for the default region (us-central1). +func verifyCloudNEG( + t *testing.T, + gceCloud *cloudgce.Cloud, + negName string, +) error { + t.Helper() + + neg, err := gceCloud.GetNetworkEndpointGroup(negName, "us-central1") + if err != nil { + return fmt.Errorf("failed to get NEG %q from cloud: %v", negName, err) + } + t.Logf("Verified cloud NEG: %s", neg.Name) + return nil +} + +// populateFakeNodeInformer creates and indexes a few fake Node objects to simulate existing cluster nodes. +func populateFakeNodeInformer( + client *fake.Clientset, + nodeInformer cache.SharedIndexInformer, + pcName string, + addressPrefix string, +) { + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", negtypes.TestInstance1, pcName), + Labels: map[string]string{ + flags.F.ProviderConfigNameLabelKey: pcName, + }, + }, + Spec: corev1.NodeSpec{ + ProviderID: fmt.Sprintf("gce://foo-project/us-central1/%s", negtypes.TestInstance1), + PodCIDR: fmt.Sprintf("%s.1.0/24", addressPrefix), + PodCIDRs: []string{fmt.Sprintf("%s.1.0/24", addressPrefix)}, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "1.2.3.1"}, + }, + Conditions: []corev1.NodeCondition{ + {Type: corev1.NodeReady, Status: corev1.ConditionTrue}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", negtypes.TestInstance2, pcName), + Labels: map[string]string{ + flags.F.ProviderConfigNameLabelKey: pcName, + }, + }, + Spec: corev1.NodeSpec{ + ProviderID: fmt.Sprintf("gce://foo-project/us-central1/%s", negtypes.TestInstance2), + PodCIDR: fmt.Sprintf("%s.2.0/24", addressPrefix), + PodCIDRs: []string{fmt.Sprintf("%s.2.0/24", addressPrefix)}, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: "1.2.3.2"}, + }, + Conditions: []corev1.NodeCondition{ + {Type: corev1.NodeReady, Status: corev1.ConditionTrue}, + }, + }, + }, + } + + for _, node := range nodes { + if _, err := client.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}); err != nil { + klog.Warningf("Failed to create node %q: %v", node.Name, err) + continue + } + if err := nodeInformer.GetIndexer().Add(node); err != nil { + klog.Warningf("Failed to add node %q to informer: %v", node.Name, err) + } + } +} + +// populateFakeEndpointSlices indexes a set of fake EndpointSlices to simulate the real endpoints in the cluster. +func populateFakeEndpointSlices( + endpointSliceInformer cache.SharedIndexInformer, + serviceName, providerConfigName, addressPrefix string, +) { + endpointSlices := getTestEndpointSlices(serviceName, providerConfigName, addressPrefix) + for _, es := range endpointSlices { + if err := endpointSliceInformer.GetIndexer().Add(es); err != nil { + klog.Warningf("Failed to add endpoint slice %q: %v", es.Name, err) + } + } +} + +func getTestEndpointSlices( + serviceName, providerConfigName, addressPrefix string, +) []*discovery.EndpointSlice { + instance1 := fmt.Sprintf("%s-%s", negtypes.TestInstance1, providerConfigName) + instance2 := fmt.Sprintf("%s-%s", negtypes.TestInstance2, providerConfigName) + notReady := false + emptyNamedPort := "" + namedPort := testNamedPort + port80 := int32(80) + port81 := int32(81) + protocolTCP := corev1.ProtocolTCP + + return []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName + "-1", + Namespace: providerConfigName, + Labels: map[string]string{ + discovery.LabelServiceName: serviceName, + discovery.LabelManagedBy: managedByEPSControllerValue, + flags.F.ProviderConfigNameLabelKey: providerConfigName, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{fmt.Sprintf("%s.1.1", addressPrefix)}, + NodeName: &instance1, + TargetRef: &corev1.ObjectReference{ + Namespace: providerConfigName, + Name: "pod1", + }, + }, + { + Addresses: []string{fmt.Sprintf("%s.1.2", addressPrefix)}, + NodeName: &instance1, + TargetRef: &corev1.ObjectReference{ + Namespace: providerConfigName, + Name: "pod2", + }, + }, + { + Addresses: []string{fmt.Sprintf("%s.2.1", addressPrefix)}, + NodeName: &instance2, + TargetRef: &corev1.ObjectReference{ + Namespace: providerConfigName, + Name: "pod3", + }, + }, + { + Addresses: []string{fmt.Sprintf("%s.1.3", addressPrefix)}, + NodeName: &instance1, + TargetRef: &corev1.ObjectReference{Namespace: providerConfigName, Name: "pod5"}, + Conditions: discovery.EndpointConditions{Ready: ¬Ready}, + }, + { + Addresses: []string{fmt.Sprintf("%s.1.4", addressPrefix)}, + NodeName: &instance1, + TargetRef: &corev1.ObjectReference{Namespace: providerConfigName, Name: "pod6"}, + Conditions: discovery.EndpointConditions{Ready: ¬Ready}, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName + "-2", + Namespace: providerConfigName, + Labels: map[string]string{ + discovery.LabelServiceName: serviceName, + discovery.LabelManagedBy: managedByEPSControllerValue, + flags.F.ProviderConfigNameLabelKey: providerConfigName, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{fmt.Sprintf("%s.2.2", addressPrefix)}, + NodeName: &instance2, + TargetRef: &corev1.ObjectReference{ + Namespace: providerConfigName, + Name: "pod7", + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &namedPort, + Port: &port81, + Protocol: &protocolTCP, + }, + }, + }, + } +} diff --git a/pkg/multiproject/testutil/providerconfigwebhook.go b/pkg/multiproject/testutil/providerconfigwebhook.go new file mode 100644 index 0000000000..b7ce01c746 --- /dev/null +++ b/pkg/multiproject/testutil/providerconfigwebhook.go @@ -0,0 +1,40 @@ +package testutil + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/testing" + "k8s.io/ingress-gce/pkg/flags" + klog "k8s.io/klog/v2" +) + +type FakeTracker interface { + Add(obj runtime.Object) error +} + +// EmulateProviderConfigLabelingWebhook is a helper function that emulates the behaviour +// of the providerconfig webhook. +// It will set the providerconfig name label on the object, on creation. +// This is needed, as in unit/integration tests, when we create objects, we expect +// the providerconfig name label to be set. +// +// Important: this function sets ProviderConfig name label to the namespace of the object. +// However, in the real world, multiple namespaces can have the same providerconfig name. +// +// The function takes a fake client and the name of the CRD. +func EmulateProviderConfigLabelingWebhook(tracker FakeTracker, fake *testing.Fake, crName string) { + fake.PrependReactor("create", crName, func(action testing.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(testing.CreateAction) + obj := createAction.GetObject() + pc := obj.(metav1.Object) + pc.GetLabels()[flags.F.ProviderConfigNameLabelKey] = pc.GetNamespace() + + err = tracker.Add(obj) + if err != nil { + klog.Errorf("Failed to add object to tracker: %v", err) + } + + klog.Infof("EmulateProviderConfigWebhook: %s/%s", pc.GetNamespace(), pc.GetName()) + return true, obj, nil + }) +}