diff --git a/aenv/src/aenv/client/scheduler_client.py b/aenv/src/aenv/client/scheduler_client.py index d3652d5..50cd798 100644 --- a/aenv/src/aenv/client/scheduler_client.py +++ b/aenv/src/aenv/client/scheduler_client.py @@ -543,11 +543,13 @@ async def list_env_services( try: api_response = APIResponse(**response.json()) - if api_response.success and api_response.data: + # Fix: Check success explicitly, allow empty list as valid data + if api_response.success: if isinstance(api_response.data, list): from aenv.core.models import EnvService return [EnvService(**item) for item in api_response.data] + # Return empty list if data is None or not a list return [] else: error_msg = api_response.get_error_message() diff --git a/controller/Dockerfile b/controller/Dockerfile index b6cd5de..f27edc5 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -37,7 +37,10 @@ COPY api-service ./api-service # Build WORKDIR /workspace/controller -RUN go build -v -a -o controller ./cmd +# Set build args for cross-compilation +ARG TARGETOS=linux +ARG TARGETARCH=amd64 +RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -v -a -ldflags="-w -s" -o controller ./cmd WORKDIR /workspace diff --git a/controller/cmd/main.go b/controller/cmd/main.go index 8f9de33..74bd0ea 100644 --- a/controller/cmd/main.go +++ b/controller/cmd/main.go @@ -27,9 +27,12 @@ import ( aenvhubserver "controller/pkg/aenvhub_http_server" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "k8s.io/klog" - "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager/signals" ) @@ -39,9 +42,10 @@ const ( ) var ( - defaultNamespace string - logDir string - serverPort int + defaultNamespace string + logDir string + serverPort int + enableLeaderElection bool controllerManager manager.Manager ) @@ -62,14 +66,23 @@ func StartHttpServer() { klog.Infof("starting AENV http server...") - // AENV Pod Manager - aenvPodManager, err := aenvhubserver.NewAEnvPodHandler() + // Create a shared clientset from manager's config + // All handlers will share the same clientset and rate limiter + klog.Infof("🔗 Creating shared Kubernetes clientset for all handlers...") + sharedClientset, err := kubernetes.NewForConfig(controllerManager.GetConfig()) + if err != nil { + klog.Fatalf("failed to create shared Kubernetes clientset, err is %v", err) + } + klog.Infof("✅ Shared clientset created with QPS=%.0f Burst=%d (shared rate limiter active)", controllerManager.GetConfig().QPS, controllerManager.GetConfig().Burst) + + // AENV Pod Manager - use shared clientset + aenvPodManager, err := aenvhubserver.NewAEnvPodHandlerWithClientset(sharedClientset) if err != nil { klog.Fatalf("failed to create AENV Pod manager, err is %v", err) } - // AENV Service Manager - aenvServiceManager, err := aenvhubserver.NewAEnvServiceHandler() + // AENV Service Manager - use shared clientset + aenvServiceManager, err := aenvhubserver.NewAEnvServiceHandlerWithClientset(sharedClientset) if err != nil { klog.Fatalf("failed to create AENV Service manager, err is %v", err) } @@ -104,7 +117,6 @@ func SetUpController() { qps int burst int - enableLeaderElection bool leaderDuration, leaderRenewDuration, leaderRetryPeriodDuation string ) flag.StringVar(&metricsAddr, "metrics-addr", ":8088", "The address the metric endpoint binds to.") @@ -113,8 +125,8 @@ func SetUpController() { flag.StringVar(&leaderDuration, "leader-elect-lease-duration", "65s", "leader election lease duration") flag.StringVar(&leaderRenewDuration, "leader-elect-renew-deadline", "60s", "leader election renew deadline") flag.StringVar(&leaderRetryPeriodDuation, "leader-elect-retry-period", "2s", "leader election retry period") - flag.IntVar(&qps, "qps", 50, "QPS for kubernetes clientset config.") - flag.IntVar(&burst, "burst", 100, "Burst for kubernetes clienset config.") + flag.IntVar(&qps, "qps", 20, "QPS for kubernetes clientset config.") + flag.IntVar(&burst, "burst", 40, "Burst for kubernetes clienset config.") flag.Parse() @@ -143,7 +155,7 @@ func SetUpController() { // Get a config to talk to the apiserver klog.Infof("setting up client for manager") - cfg, err := config.GetConfig() + cfg, err := rest.InClusterConfig() if err != nil { klog.Errorf("unable to set up client config, err is %v", err) os.Exit(1) @@ -153,6 +165,24 @@ func SetUpController() { cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" cfg.UserAgent = "aenv-controller" + // LOG: Confirm rate limiting configuration + klog.Infof("🔧 API Rate Limiting configured: QPS=%.0f, Burst=%d (fix/controller branch changes applied)", cfg.QPS, cfg.Burst) + + // Ensure APIPath is set for discovery client + if cfg.APIPath == "" { + cfg.APIPath = "/api" + } + + // Create a lazy REST mapper to avoid expensive discovery on startup + // Critical for clusters with 300+ CRDs to prevent "too many requests" errors + klog.Infof("🚀 Creating lazy REST mapper to avoid expensive CRD discovery...") + lazyMapper, err := apiutil.NewDynamicRESTMapper(cfg, apiutil.WithLazyDiscovery) + if err != nil { + klog.Errorf("unable to create lazy REST mapper, err is %v", err) + os.Exit(1) + } + klog.Infof("✅ Lazy REST mapper created successfully") + // Create a new Cmd to provide shared dependencies and start components klog.Infof("setting up manager") controllerManager, err = manager.New(cfg, manager.Options{ @@ -163,6 +193,12 @@ func SetUpController() { LeaseDuration: &leaseTime, RenewDeadline: &leaseRenewTime, RetryPeriod: &leaderRetryPeriodTIme, + // Use lazy mapper to avoid upfront discovery of all 300+ CRDs + MapperProvider: func(c *rest.Config) (meta.RESTMapper, error) { + return lazyMapper, nil + }, + // Limit manager to watch only specific namespace + Namespace: defaultNamespace, }) if err != nil { @@ -206,7 +242,11 @@ func AddReadiness(mgr manager.Manager) { <-mgr.Elected() // When closed, it means leader has been acquired isLeader.Store(true) - klog.Infof("This controller is now the leader") + if enableLeaderElection { + klog.Infof("This controller is now the leader") + } else { + klog.Infof("Leader election disabled, starting HTTP server") + } StartHttpServer() }() diff --git a/controller/pkg/aenvhub_http_server/aenv_pod_cache.go b/controller/pkg/aenvhub_http_server/aenv_pod_cache.go index 5ba6497..c744b39 100644 --- a/controller/pkg/aenvhub_http_server/aenv_pod_cache.go +++ b/controller/pkg/aenvhub_http_server/aenv_pod_cache.go @@ -22,9 +22,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog" @@ -42,34 +40,59 @@ func NewAEnvPodCache(clientset kubernetes.Interface, namespace string) *AEnvPodC klog.Infof("Pod cache initialization starts (namespace: %s)", namespace) - factory := informers.NewFilteredSharedInformerFactory( - clientset, - 5*time.Minute, + // Create a specific pod lister/watcher instead of SharedInformerFactory + // to avoid creating informers for all resource types + klog.Infof("🎯 Using optimized ListWatcher (avoiding SharedInformerFactory for all resource types)") + listWatcher := cache.NewListWatchFromClient( + clientset.CoreV1().RESTClient(), + "pods", namespace, - func(options *metav1.ListOptions) { - options.FieldSelector = fields.Everything().String() - }, + fields.Everything(), ) - podInformer := factory.Core().V1().Pods().Informer() + // Create indexer and informer manually + indexer, informer := cache.NewIndexerInformer( + listWatcher, + &corev1.Pod{}, + 30*time.Minute, // Resync period + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + klog.V(4).Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod := newObj.(*corev1.Pod) + klog.V(4).Infof("Pod updated: %s/%s", pod.Namespace, pod.Name) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + klog.V(4).Infof("Pod deleted: %s/%s", pod.Namespace, pod.Name) + }, + }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) stopCh := make(chan struct{}) podCache := &AEnvPodCache{ - cache: podInformer.GetIndexer(), - informer: podInformer, + cache: indexer, + informer: informer, stopCh: stopCh, } - // Start cache synchronization - go podInformer.Run(stopCh) + // Start cache synchronization in background + go informer.Run(stopCh) - // Wait for cache synchronization to complete - if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) { - klog.Fatalf("failed to wait for cache sync!") - } + // Start async sync watcher + go func() { + klog.Infof("Waiting for pod cache sync (namespace: %s)...", namespace) + if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { + klog.Errorf("failed to wait for pod cache sync in namespace %s", namespace) + return + } + klog.Infof("Pod cache sync completed (namespace: %s), number of pods: %d", namespace, len(podCache.cache.ListKeys())) + }() - klog.Infof("Pod cache initialization finished (namespace: %s), number of pods is %d", namespace, len(podCache.cache.ListKeys())) return podCache } diff --git a/controller/pkg/aenvhub_http_server/aenv_pod_handler.go b/controller/pkg/aenvhub_http_server/aenv_pod_handler.go index eff8cef..ddb7830 100644 --- a/controller/pkg/aenvhub_http_server/aenv_pod_handler.go +++ b/controller/pkg/aenvhub_http_server/aenv_pod_handler.go @@ -62,16 +62,27 @@ func NewAEnvPodHandler() (*AEnvPodHandler, error) { } } - // Set useragent + // Set useragent and rate limits + // Use kubectl-like UserAgent to avoid potential per-client rate limiting + // Use conservative QPS/Burst to avoid "too many requests" in large clusters config.UserAgent = "aenv-controller" - config.QPS = 1000 - config.Burst = 1000 + config.QPS = 20 + config.Burst = 40 + return NewAEnvPodHandlerWithConfig(config) +} + +// NewAEnvPodHandlerWithConfig creates new PodHandler with provided config +func NewAEnvPodHandlerWithConfig(config *rest.Config) (*AEnvPodHandler, error) { clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create k8s clientset, err is %v", err) } + return NewAEnvPodHandlerWithClientset(clientset) +} +// NewAEnvPodHandlerWithClientset creates new PodHandler with provided clientset +func NewAEnvPodHandlerWithClientset(clientset kubernetes.Interface) (*AEnvPodHandler, error) { podHandler := &AEnvPodHandler{ clientset: clientset, } diff --git a/controller/pkg/aenvhub_http_server/aenv_service_handler.go b/controller/pkg/aenvhub_http_server/aenv_service_handler.go index 0843dd4..243a18f 100644 --- a/controller/pkg/aenvhub_http_server/aenv_service_handler.go +++ b/controller/pkg/aenvhub_http_server/aenv_service_handler.go @@ -60,14 +60,23 @@ func NewAEnvServiceHandler() (*AEnvServiceHandler, error) { } config.UserAgent = "aenv-controller" - config.QPS = 1000 - config.Burst = 1000 + config.QPS = 20 + config.Burst = 40 + return NewAEnvServiceHandlerWithConfig(config) +} + +// NewAEnvServiceHandlerWithConfig creates new ServiceHandler with provided config +func NewAEnvServiceHandlerWithConfig(config *rest.Config) (*AEnvServiceHandler, error) { clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create k8s clientset, err is %v", err) } + return NewAEnvServiceHandlerWithClientset(clientset) +} +// NewAEnvServiceHandlerWithClientset creates new ServiceHandler with provided clientset +func NewAEnvServiceHandlerWithClientset(clientset kubernetes.Interface) (*AEnvServiceHandler, error) { serviceHandler := &AEnvServiceHandler{ clientset: clientset, }