Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aenv/src/aenv/client/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,13 @@ async def list_env_services(

try:
api_response = APIResponse(**response.json())
if api_response.success and api_response.data:
# Fix: Check success explicitly, allow empty list as valid data
if api_response.success:
if isinstance(api_response.data, list):
from aenv.core.models import EnvService

return [EnvService(**item) for item in api_response.data]
# Return empty list if data is None or not a list
return []
else:
error_msg = api_response.get_error_message()
Expand Down
5 changes: 4 additions & 1 deletion controller/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ COPY api-service ./api-service

# Build
WORKDIR /workspace/controller
RUN go build -v -a -o controller ./cmd
# Set build args for cross-compilation
ARG TARGETOS=linux
ARG TARGETARCH=amd64
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -v -a -ldflags="-w -s" -o controller ./cmd

WORKDIR /workspace

Expand Down
66 changes: 53 additions & 13 deletions controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (

aenvhubserver "controller/pkg/aenvhub_http_server"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)
Expand All @@ -39,9 +42,10 @@ const (
)

var (
defaultNamespace string
logDir string
serverPort int
defaultNamespace string
logDir string
serverPort int
enableLeaderElection bool

controllerManager manager.Manager
)
Expand All @@ -62,14 +66,23 @@ func StartHttpServer() {

klog.Infof("starting AENV http server...")

// AENV Pod Manager
aenvPodManager, err := aenvhubserver.NewAEnvPodHandler()
// Create a shared clientset from manager's config
// All handlers will share the same clientset and rate limiter
klog.Infof("🔗 Creating shared Kubernetes clientset for all handlers...")
sharedClientset, err := kubernetes.NewForConfig(controllerManager.GetConfig())
if err != nil {
klog.Fatalf("failed to create shared Kubernetes clientset, err is %v", err)
}
klog.Infof("✅ Shared clientset created with QPS=%.0f Burst=%d (shared rate limiter active)", controllerManager.GetConfig().QPS, controllerManager.GetConfig().Burst)

// AENV Pod Manager - use shared clientset
aenvPodManager, err := aenvhubserver.NewAEnvPodHandlerWithClientset(sharedClientset)
if err != nil {
klog.Fatalf("failed to create AENV Pod manager, err is %v", err)
}

// AENV Service Manager
aenvServiceManager, err := aenvhubserver.NewAEnvServiceHandler()
// AENV Service Manager - use shared clientset
aenvServiceManager, err := aenvhubserver.NewAEnvServiceHandlerWithClientset(sharedClientset)
if err != nil {
klog.Fatalf("failed to create AENV Service manager, err is %v", err)
}
Expand Down Expand Up @@ -104,7 +117,6 @@ func SetUpController() {
qps int
burst int

enableLeaderElection bool
leaderDuration, leaderRenewDuration, leaderRetryPeriodDuation string
)
flag.StringVar(&metricsAddr, "metrics-addr", ":8088", "The address the metric endpoint binds to.")
Expand All @@ -113,8 +125,8 @@ func SetUpController() {
flag.StringVar(&leaderDuration, "leader-elect-lease-duration", "65s", "leader election lease duration")
flag.StringVar(&leaderRenewDuration, "leader-elect-renew-deadline", "60s", "leader election renew deadline")
flag.StringVar(&leaderRetryPeriodDuation, "leader-elect-retry-period", "2s", "leader election retry period")
flag.IntVar(&qps, "qps", 50, "QPS for kubernetes clientset config.")
flag.IntVar(&burst, "burst", 100, "Burst for kubernetes clienset config.")
flag.IntVar(&qps, "qps", 20, "QPS for kubernetes clientset config.")
flag.IntVar(&burst, "burst", 40, "Burst for kubernetes clienset config.")

flag.Parse()

Expand Down Expand Up @@ -143,7 +155,7 @@ func SetUpController() {

// Get a config to talk to the apiserver
klog.Infof("setting up client for manager")
cfg, err := config.GetConfig()
cfg, err := rest.InClusterConfig()
if err != nil {
klog.Errorf("unable to set up client config, err is %v", err)
os.Exit(1)
Expand All @@ -153,6 +165,24 @@ func SetUpController() {
cfg.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
cfg.UserAgent = "aenv-controller"

// LOG: Confirm rate limiting configuration
klog.Infof("🔧 API Rate Limiting configured: QPS=%.0f, Burst=%d (fix/controller branch changes applied)", cfg.QPS, cfg.Burst)

// Ensure APIPath is set for discovery client
if cfg.APIPath == "" {
cfg.APIPath = "/api"
}

// Create a lazy REST mapper to avoid expensive discovery on startup
// Critical for clusters with 300+ CRDs to prevent "too many requests" errors
klog.Infof("🚀 Creating lazy REST mapper to avoid expensive CRD discovery...")
lazyMapper, err := apiutil.NewDynamicRESTMapper(cfg, apiutil.WithLazyDiscovery)
if err != nil {
klog.Errorf("unable to create lazy REST mapper, err is %v", err)
os.Exit(1)
}
klog.Infof("✅ Lazy REST mapper created successfully")

// Create a new Cmd to provide shared dependencies and start components
klog.Infof("setting up manager")
controllerManager, err = manager.New(cfg, manager.Options{
Expand All @@ -163,6 +193,12 @@ func SetUpController() {
LeaseDuration: &leaseTime,
RenewDeadline: &leaseRenewTime,
RetryPeriod: &leaderRetryPeriodTIme,
// Use lazy mapper to avoid upfront discovery of all 300+ CRDs
MapperProvider: func(c *rest.Config) (meta.RESTMapper, error) {
return lazyMapper, nil
},
// Limit manager to watch only specific namespace
Namespace: defaultNamespace,
})

if err != nil {
Expand Down Expand Up @@ -206,7 +242,11 @@ func AddReadiness(mgr manager.Manager) {
<-mgr.Elected() // When closed, it means leader has been acquired
isLeader.Store(true)

klog.Infof("This controller is now the leader")
if enableLeaderElection {
klog.Infof("This controller is now the leader")
} else {
klog.Infof("Leader election disabled, starting HTTP server")
}

StartHttpServer()
}()
Expand Down
59 changes: 41 additions & 18 deletions controller/pkg/aenvhub_http_server/aenv_pod_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
Expand All @@ -42,34 +40,59 @@ func NewAEnvPodCache(clientset kubernetes.Interface, namespace string) *AEnvPodC

klog.Infof("Pod cache initialization starts (namespace: %s)", namespace)

factory := informers.NewFilteredSharedInformerFactory(
clientset,
5*time.Minute,
// Create a specific pod lister/watcher instead of SharedInformerFactory
// to avoid creating informers for all resource types
klog.Infof("🎯 Using optimized ListWatcher (avoiding SharedInformerFactory for all resource types)")
listWatcher := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"pods",
namespace,
func(options *metav1.ListOptions) {
options.FieldSelector = fields.Everything().String()
},
fields.Everything(),
)

podInformer := factory.Core().V1().Pods().Informer()
// Create indexer and informer manually
indexer, informer := cache.NewIndexerInformer(
listWatcher,
&corev1.Pod{},
30*time.Minute, // Resync period
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(4).Infof("Pod added: %s/%s", pod.Namespace, pod.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
klog.V(4).Infof("Pod updated: %s/%s", pod.Namespace, pod.Name)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(4).Infof("Pod deleted: %s/%s", pod.Namespace, pod.Name)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

stopCh := make(chan struct{})

podCache := &AEnvPodCache{
cache: podInformer.GetIndexer(),
informer: podInformer,
cache: indexer,
informer: informer,
stopCh: stopCh,
}

// Start cache synchronization
go podInformer.Run(stopCh)
// Start cache synchronization in background
go informer.Run(stopCh)

// Wait for cache synchronization to complete
if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
klog.Fatalf("failed to wait for cache sync!")
}
// Start async sync watcher
go func() {
klog.Infof("Waiting for pod cache sync (namespace: %s)...", namespace)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
klog.Errorf("failed to wait for pod cache sync in namespace %s", namespace)
return
}
klog.Infof("Pod cache sync completed (namespace: %s), number of pods: %d", namespace, len(podCache.cache.ListKeys()))
}()
Comment on lines +86 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Changing the cache synchronization to be asynchronous is a great improvement for startup performance and resilience. However, this introduces a time window where the cache is not yet synced, and list operations might return incomplete data. It would be beneficial to expose the sync status, for example by adding an IsSynced() bool method to AEnvPodCache, so that callers like listPod can handle this state gracefully (e.g., by returning a '503 Service Unavailable' if the cache is not ready).


klog.Infof("Pod cache initialization finished (namespace: %s), number of pods is %d", namespace, len(podCache.cache.ListKeys()))
return podCache
}

Expand Down
17 changes: 14 additions & 3 deletions controller/pkg/aenvhub_http_server/aenv_pod_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,27 @@ func NewAEnvPodHandler() (*AEnvPodHandler, error) {
}
}

// Set useragent
// Set useragent and rate limits
// Use kubectl-like UserAgent to avoid potential per-client rate limiting
// Use conservative QPS/Burst to avoid "too many requests" in large clusters
config.UserAgent = "aenv-controller"
config.QPS = 1000
config.Burst = 1000
config.QPS = 20
config.Burst = 40

return NewAEnvPodHandlerWithConfig(config)
}

// NewAEnvPodHandlerWithConfig creates new PodHandler with provided config
func NewAEnvPodHandlerWithConfig(config *rest.Config) (*AEnvPodHandler, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create k8s clientset, err is %v", err)
}
return NewAEnvPodHandlerWithClientset(clientset)
}

// NewAEnvPodHandlerWithClientset creates new PodHandler with provided clientset
func NewAEnvPodHandlerWithClientset(clientset kubernetes.Interface) (*AEnvPodHandler, error) {
podHandler := &AEnvPodHandler{
clientset: clientset,
}
Expand Down
13 changes: 11 additions & 2 deletions controller/pkg/aenvhub_http_server/aenv_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,23 @@ func NewAEnvServiceHandler() (*AEnvServiceHandler, error) {
}

config.UserAgent = "aenv-controller"
config.QPS = 1000
config.Burst = 1000
config.QPS = 20
config.Burst = 40

return NewAEnvServiceHandlerWithConfig(config)
}

// NewAEnvServiceHandlerWithConfig creates new ServiceHandler with provided config
func NewAEnvServiceHandlerWithConfig(config *rest.Config) (*AEnvServiceHandler, error) {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create k8s clientset, err is %v", err)
}
return NewAEnvServiceHandlerWithClientset(clientset)
}

// NewAEnvServiceHandlerWithClientset creates new ServiceHandler with provided clientset
func NewAEnvServiceHandlerWithClientset(clientset kubernetes.Interface) (*AEnvServiceHandler, error) {
serviceHandler := &AEnvServiceHandler{
clientset: clientset,
}
Expand Down
Loading