diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index cdb524739a6f..0e63a3c85011 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -72,6 +72,10 @@ type Autoscaler interface { RunOnce(currentTime time.Time) errors.AutoscalerError // ExitCleanUp is a clean-up performed just before process termination. ExitCleanUp() + // LastScaleUpTime is a time of the last scale up + LastScaleUpTime() time.Time + // LastScaleUpTime is a time of the last scale down + LastScaleDownDeleteTime() time.Time } // NewAutoscaler creates an autoscaler of an appropriate type according to the parameters diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 66e128dbee19..5e67ab814822 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -215,6 +215,16 @@ func NewStaticAutoscaler( } } +// LastScaleUpTime returns last scale up time +func (a *StaticAutoscaler) LastScaleUpTime() time.Time { + return a.lastScaleUpTime +} + +// LastScaleDownDeleteTime returns the last successful scale down time +func (a *StaticAutoscaler) LastScaleDownDeleteTime() time.Time { + return a.lastScaleDownDeleteTime +} + // Start starts components running in background. func (a *StaticAutoscaler) Start() error { a.clusterStateRegistry.Start() diff --git a/cluster-autoscaler/loop/run.go b/cluster-autoscaler/loop/run.go new file mode 100644 index 000000000000..ae7605f85a3f --- /dev/null +++ b/cluster-autoscaler/loop/run.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +type autoscaler interface { + // RunOnce represents an iteration in the control-loop of CA. + RunOnce(currentTime time.Time) errors.AutoscalerError +} + +// RunAutoscalerOnce triggers a single autoscaling iteration. +func RunAutoscalerOnce(autoscaler autoscaler, healthCheck *metrics.HealthCheck, loopStart time.Time) { + metrics.UpdateLastTime(metrics.Main, loopStart) + healthCheck.UpdateLastActivity(loopStart) + + err := autoscaler.RunOnce(loopStart) + if err != nil && err.Type() != errors.TransientError { + metrics.RegisterError(err) + } else { + healthCheck.UpdateLastSuccessfulRun(time.Now()) + } + + metrics.UpdateDurationFromStart(metrics.Main, loopStart) +} diff --git a/cluster-autoscaler/loop/trigger.go b/cluster-autoscaler/loop/trigger.go new file mode 100644 index 000000000000..52ef962ba1b0 --- /dev/null +++ b/cluster-autoscaler/loop/trigger.go @@ -0,0 +1,144 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import ( + "context" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + kube_client "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + podv1 "k8s.io/kubernetes/pkg/api/v1/pod" +) + +const maxPodChangeAge = 10 * time.Second + +var ( + podsResource = "pods" + unschedulablePodSelector = fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + + string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) +) + +// scalingTimesGetter exposes recent autoscaler activity +type scalingTimesGetter interface { + LastScaleUpTime() time.Time + LastScaleDownDeleteTime() time.Time +} + +// LoopTrigger object implements criteria used to start new autoscaling iteration +type LoopTrigger struct { + podObserver *UnschedulablePodObserver + scanInterval time.Duration + scalingTimesGetter scalingTimesGetter +} + +// NewLoopTrigger creates a LoopTrigger object +func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger { + return &LoopTrigger{ + podObserver: podObserver, + scanInterval: scanInterval, + scalingTimesGetter: scalingTimesGetter, + } +} + +// Wait waits for the next autoscaling iteration +func (t *LoopTrigger) Wait(lastRun time.Time) { + sleepStart := time.Now() + defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart) + + // To improve scale-up throughput, Cluster Autoscaler starts new iteration + // immediately if the previous one was productive. + if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) || + !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { + select { + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + default: + klog.Infof("Autoscaler loop triggered immediately after a productive iteration") + } + return + } + + // Unschedulable pod triggers autoscaling immediately. + select { + case <-time.After(t.scanInterval): + klog.Infof("Autoscaler loop triggered by a %v timer", t.scanInterval) + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + } +} + +// UnschedulablePodObserver triggers a new loop if there are new unschedulable pods +type UnschedulablePodObserver struct { + unschedulablePodChan <-chan any +} + +// StartPodObserver creates an informer and starts a goroutine watching for newly added +// or updated pods. Each time a new unschedulable pod appears or a change causes a pod to become +// unschedulable, a message is sent to the UnschedulablePodObserver's channel. +func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *UnschedulablePodObserver { + podChan := make(chan any, 1) + listWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), podsResource, apiv1.NamespaceAll, unschedulablePodSelector) + informer := cache.NewSharedInformer(listWatch, &apiv1.Pod{}, time.Hour) + addEventHandlerFunc := func(obj any) { + if isRecentUnschedulablePod(obj) { + klog.V(5).Infof(" filterPodChanUntilClose emits signal") + select { + case podChan <- struct{}{}: + default: + } + } + } + updateEventHandlerFunc := func(old any, newOjb any) { addEventHandlerFunc(newOjb) } + _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: addEventHandlerFunc, + UpdateFunc: updateEventHandlerFunc, + }) + go informer.Run(ctx.Done()) + return &UnschedulablePodObserver{ + unschedulablePodChan: podChan, + } +} + +// isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently. +func isRecentUnschedulablePod(obj any) bool { + pod, ok := obj.(*apiv1.Pod) + if !ok { + return false + } + if pod.Status.Phase == apiv1.PodSucceeded || pod.Status.Phase == apiv1.PodFailed { + return false + } + if pod.Spec.NodeName != "" { + return false + } + _, scheduledCondition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if scheduledCondition == nil { + return false + } + if scheduledCondition.Status != apiv1.ConditionFalse || scheduledCondition.Reason != "Unschedulable" { + return false + } + if scheduledCondition.LastTransitionTime.Time.Add(maxPodChangeAge).Before(time.Now()) { + return false + } + return true +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 92555ac6acdc..f3bd86653855 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/loop" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -62,7 +63,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -258,6 +258,7 @@ var ( "Priority evictor reuses the concepts of drain logic in kubelet(https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2712-pod-priority-based-graceful-node-shutdown#migration-from-the-node-graceful-shutdown-feature)."+ "Eg. flag usage: '10000:20,1000:100,0:60'") provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.") + frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed") ) func isFlagPassed(name string) bool { @@ -591,23 +592,21 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho } // Autoscale ad infinitum. - for { - select { - case <-time.After(*scanInterval): - { - loopStart := time.Now() - metrics.UpdateLastTime(metrics.Main, loopStart) - healthCheck.UpdateLastActivity(loopStart) - - err := autoscaler.RunOnce(loopStart) - if err != nil && err.Type() != errors.TransientError { - metrics.RegisterError(err) - } else { - healthCheck.UpdateLastSuccessfulRun(time.Now()) - } - - metrics.UpdateDurationFromStart(metrics.Main, loopStart) - } + context, cancel := ctx.WithCancel(ctx.Background()) + defer cancel() + if *frequentLoopsEnabled { + podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts)) + trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval) + lastRun := time.Now() + for { + trigger.Wait(lastRun) + lastRun = time.Now() + loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun) + } + } else { + for { + time.Sleep(*scanInterval) + loop.RunAutoscalerOnce(autoscaler, healthCheck, time.Now()) } } } diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 1809a7edffab..52526d5bd419 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -114,6 +114,7 @@ const ( Poll FunctionLabel = "poll" Reconfigure FunctionLabel = "reconfigure" Autoscaling FunctionLabel = "autoscaling" + LoopWait FunctionLabel = "loopWait" ) var (