From d8e67fa6520d3b4a107e6bd36f7a2ebc34457f05 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 17 Apr 2024 00:54:02 -0600 Subject: [PATCH 1/2] test: adding permit to allow for sibling pod scheduling Problem: the submit of the first index works for more controlled lengths (e.g., lammps takes a while) but was having issues with really quick jobs. Solution: try restoring the queue that allows for enabling siblings pods so any group can be scheduled. Signed-off-by: vsoch --- examples/pod-group-jobs/job1.yaml | 59 +++++++ examples/pod-group-jobs/job2.yaml | 59 +++++++ .../pkg/fluence/core/core.go | 161 +++++++++++++++--- sig-scheduler-plugins/pkg/fluence/fluence.go | 139 ++++++++++++++- .../pkg/fluence/group/group.go | 18 ++ sig-scheduler-plugins/pkg/logger/logger.go | 4 +- 6 files changed, 406 insertions(+), 34 deletions(-) create mode 100644 examples/pod-group-jobs/job1.yaml create mode 100644 examples/pod-group-jobs/job2.yaml diff --git a/examples/pod-group-jobs/job1.yaml b/examples/pod-group-jobs/job1.yaml new file mode 100644 index 0000000..e0ebba0 --- /dev/null +++ b/examples/pod-group-jobs/job1.yaml @@ -0,0 +1,59 @@ +apiVersion: v1 +kind: Service +metadata: + name: s0 +spec: + clusterIP: None + selector: + job-name: job-0 +--- +apiVersion: batch/v1 +kind: Job +metadata: + # name will be derived based on iteration + name: job-0 +spec: + completions: 4 + parallelism: 4 + completionMode: Indexed + template: + metadata: + labels: + app: job-0 + spec: + subdomain: s0 + schedulerName: fluence + restartPolicy: Never + containers: + - name: example-workload + image: bash:latest + resources: + limits: + cpu: "3" + requests: + cpu: "3" + command: + - bash + - -c + - | + if [ $JOB_COMPLETION_INDEX -ne "0" ] + then + sleep infinity + fi + echo "START: $(date +%s)" + for i in 0 1 2 3 + do + gotStatus="-1" + wantStatus="0" + while [ $gotStatus -ne $wantStatus ] + do + ping -c 1 job-0-${i}.s0 > /dev/null 2>&1 + gotStatus=$? + if [ $gotStatus -ne $wantStatus ]; then + echo "Failed to ping pod job-0-${i}.s0, retrying in 1 second..." + sleep 1 + fi + done + echo "Successfully pinged pod: job-0-${i}.s0" + done + echo "DONE: $(date +%s)" \ No newline at end of file diff --git a/examples/pod-group-jobs/job2.yaml b/examples/pod-group-jobs/job2.yaml new file mode 100644 index 0000000..c39820b --- /dev/null +++ b/examples/pod-group-jobs/job2.yaml @@ -0,0 +1,59 @@ +apiVersion: v1 +kind: Service +metadata: + name: s1 +spec: + clusterIP: None + selector: + job-name: job-1 +--- +apiVersion: batch/v1 +kind: Job +metadata: + # name will be derived based on iteration + name: job-1 +spec: + completions: 4 + parallelism: 4 + completionMode: Indexed + template: + metadata: + labels: + app: job-1 + spec: + subdomain: s1 + schedulerName: fluence + restartPolicy: Never + containers: + - name: example-workload + image: bash:latest + resources: + limits: + cpu: "3" + requests: + cpu: "3" + command: + - bash + - -c + - | + if [ $JOB_COMPLETION_INDEX -ne "0" ] + then + sleep infinity + fi + echo "START: $(date +%s)" + for i in 0 1 2 3 + do + gotStatus="-1" + wantStatus="0" + while [ $gotStatus -ne $wantStatus ] + do + ping -c 1 job-0-${i}.s1 > /dev/null 2>&1 + gotStatus=$? + if [ $gotStatus -ne $wantStatus ]; then + echo "Failed to ping pod job-0-${i}.s1, retrying in 1 second..." + sleep 1 + fi + done + echo "Successfully pinged pod: job-0-${i}.s1" + done + echo "DONE: $(date +%s)" \ No newline at end of file diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index 1e75814..ea300ce 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -39,11 +39,33 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/util" ) +type Status string + +const ( + // PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec. + PodGroupNotSpecified Status = "PodGroup not specified" + // PodGroupNotFound denotes the specified PodGroup in the Pod spec is + // not found in API server. + PodGroupNotFound Status = "PodGroup not found" + Success Status = "Success" + Wait Status = "Wait" + + permitStateKey = "PermitFluence" +) + // TODO should eventually store group name here to reassociate on reload type FluxStateData struct { NodeName string } +type PermitState struct { + Activate bool +} + +func (s *PermitState) Clone() framework.StateData { + return &PermitState{Activate: s.Activate} +} + func (s *FluxStateData) Clone() framework.StateData { clone := &FluxStateData{ NodeName: s.NodeName, @@ -58,6 +80,10 @@ type Manager interface { GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) GetCreationTimestamp(*corev1.Pod, time.Time) time.Time DeletePermittedPodGroup(string) + Permit(context.Context, *framework.CycleState, *corev1.Pod) Status + CalculateAssignedPods(string, string) int + ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) + BackoffPodGroup(string, time.Duration) } // PodGroupManager defines the scheduling operation called @@ -110,26 +136,69 @@ func NewPodGroupManager( return pgMgr } +func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { + if backoff == time.Duration(0) { + return + } + pgMgr.backedOffPG.Add(pgName, nil, backoff) +} + +// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod +// in the given state, with a reserved key "kubernetes.io/pods-to-activate". +func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { + pgName := util.GetPodGroupLabel(pod) + if pgName == "" { + return + } + + // Only proceed if it's explicitly requested to activate sibling pods. + if c, err := state.Read(permitStateKey); err != nil { + return + } else if s, ok := c.(*PermitState); !ok || !s.Activate { + return + } + + pods, err := pgMgr.podLister.Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}), + ) + if err != nil { + klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) + return + } + + for i := range pods { + if pods[i].UID == pod.UID { + pods = append(pods[:i], pods[i+1:]...) + break + } + } + + if len(pods) != 0 { + if c, err := state.Read(framework.PodsToActivateKey); err == nil { + if s, ok := c.(*framework.PodsToActivate); ok { + s.Lock() + for _, pod := range pods { + namespacedName := GetNamespacedName(pod) + s.Map[namespacedName] = pod + } + s.Unlock() + } + } + } +} + // GetStatuses string (of all pods) to show for debugging purposes -// Since we loop here, we also determine if the first pod is the one -// we are considering -func (pgMgr *PodGroupManager) GetStatusesAndIndex( +func (pgMgr *PodGroupManager) GetStatuses( pods []*corev1.Pod, pod *corev1.Pod, -) (string, bool, int) { +) string { statuses := "" // We need to distinguish 0 from the default and not finding anything - foundIndex := false - index := 0 - for i, p := range pods { - if p.Name == pod.Name { - foundIndex = true - index = i - } + for _, p := range pods { statuses += " " + fmt.Sprintf("%s", p.Status.Phase) } - return statuses, foundIndex, index + return statuses } // GetPodNode is a quick lookup to see if we have a node @@ -138,6 +207,39 @@ func (pgMgr *PodGroupManager) GetPodNode(pod *corev1.Pod) string { return node } +// Permit permits a pod to run, if the minMember match, it would send a signal to chan. +func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status { + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) + if pgFullName == "" { + return PodGroupNotSpecified + } + if pg == nil { + // A Pod with a podGroup name but without a PodGroup found is denied. + return PodGroupNotFound + } + + assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace) + // The number of pods that have been assigned nodes is calculated from the snapshot. + // The current pod in not included in the snapshot during the current scheduling cycle. + if int32(assigned)+1 >= pg.Spec.MinMember { + return Success + } + + if assigned == 0 { + // Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource) + // already pass through, so if assigned == 0, it could be due to: + // - minResource get satisfied + // - new pods added + // In either case, we should and only should use this 0-th pod to trigger activating + // its siblings. + // It'd be in-efficient if we trigger activating siblings unconditionally. + // See https://github.com/kubernetes-sigs/scheduler-plugins/issues/682 + state.Write(permitStateKey, &PermitState{Activate: true}) + } + + return Wait +} + // PreFilter filters out a pod if // 1. it belongs to a podgroup that was recently denied or // 2. the total number of pods in the podgroup is less than the minimum number of pods @@ -169,7 +271,7 @@ func (pgMgr *PodGroupManager) PreFilter( // Only allow scheduling the first in the group so the others come after // Get statuses to show for debugging - statuses, found, idx := pgMgr.GetStatusesAndIndex(pods, pod) + statuses := pgMgr.GetStatuses(pods, pod) // This shows us the number of pods we have in the set and their states pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods)) @@ -178,18 +280,6 @@ func (pgMgr *PodGroupManager) PreFilter( "current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember) } - if !found { - return fmt.Errorf("pod %s was not found in group - this should not happen", pod.Name) - } - - // We only will AskFlux for the first pod - // This makes an assumption that the order listed is the order in the queue, I'm not - // sure that is true in practice. This is the one case with retry. This design - // probably needs thinking and work. - if idx != 0 { - return fmt.Errorf("pod %s is not first in the list, will wait to schedule", pod.Name) - } - // TODO we likely can take advantage of these resources or other custom // attributes we add. For now ignore and calculate based on pod needs (above) // if pg.Spec.MinResources == nil { @@ -233,7 +323,9 @@ func (pgMgr *PodGroupManager) PreFilter( stateData := FluxStateData{NodeName: node} state.Write(framework.StateKey(pod.Name), &stateData) // Also save to the podToNode lookup + pgMgr.mutex.Lock() pgMgr.podToNode[pod.Name] = node + pgMgr.mutex.Unlock() } pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout) return nil @@ -252,6 +344,25 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time return pg.CreationTimestamp.Time } +// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. +func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { + nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() + if err != nil { + pgMgr.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err) + return 0 + } + var count int + for _, nodeInfo := range nodeInfos { + for _, podInfo := range nodeInfo.Pods { + pod := podInfo.Pod + if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { + count++ + } + } + } + return count +} + // DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { pgMgr.permittedPG.Delete(pgFullName) diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 84f3e95..099d2f3 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -22,8 +22,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - klog "k8s.io/klog/v2" "sigs.k8s.io/scheduler-plugins/pkg/logger" @@ -33,12 +33,12 @@ import ( "k8s.io/client-go/tools/cache" fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" + flabel "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" @@ -52,6 +52,7 @@ type Fluence struct { frameworkHandler framework.Handle pgMgr fcore.Manager scheduleTimeout *time.Duration + pgBackoff *time.Duration log *logger.DebugLogger } @@ -59,6 +60,15 @@ var ( _ framework.QueueSortPlugin = &Fluence{} _ framework.PreFilterPlugin = &Fluence{} _ framework.FilterPlugin = &Fluence{} + + _ framework.PostFilterPlugin = &Fluence{} + _ framework.PermitPlugin = &Fluence{} + _ framework.ReservePlugin = &Fluence{} + + _ framework.EnqueueExtensions = &Fluence{} + + permitWaitingTimeSeconds int64 = 60 + podGroupBackoffSeconds int64 = 0 ) const ( @@ -69,14 +79,12 @@ const ( // Initialize and return a new Fluence Custom Scheduler Plugin func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - // Keep these empty for now, use defaults - args := config.CoschedulingArgs{} ctx := context.TODO() // Make fluence his own little logger! // This can eventually be a flag, but just going to set for now // It shall be a very chonky file. Oh lawd he comin! - l := logger.NewDebugLogger(logger.LevelError, "/tmp/fluence.log") + l := logger.NewDebugLogger(logger.LevelDebug, "/tmp/fluence.log") scheme := runtime.NewScheme() _ = clientscheme.AddToScheme(scheme) @@ -93,7 +101,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) fluxPodsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) // PermitWaitingTimeSeconds is the waiting timeout in seconds. - scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second + scheduleTimeDuration := time.Duration(permitWaitingTimeSeconds) * time.Second pgMgr := fcore.NewPodGroupManager( client, handle.SnapshotSharedLister(), @@ -110,11 +118,13 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) }) go fluxPodsInformer.Run(ctx.Done()) + backoffSeconds := time.Duration(podGroupBackoffSeconds) * time.Second plugin := &Fluence{ frameworkHandler: handle, pgMgr: pgMgr, scheduleTimeout: &scheduleTimeDuration, log: l, + pgBackoff: &backoffSeconds, } // TODO this is not supported yet @@ -219,16 +229,131 @@ func (f *Fluence) PreFilter( node := f.pgMgr.GetPodNode(pod) f.mutex.Unlock() if node != "" { + f.log.Info("[Fluence PreFilter] assigned pod %s to node %s\n", pod.Name, node) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } + f.log.Info("[Fluence PreFilter] pod %s does not have a node assigned\n", pod.Name) + // This will populate the node name into the pod group manager err := f.pgMgr.PreFilter(ctx, pod, state) if err != nil { - f.log.Error("[Fluence PreFilter] failed pod %s: %s", klog.KObj(pod), err.Error()) + f.log.Error("[Fluence PreFilter] failed pod %s: %s", pod.Name, err.Error()) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } node = f.pgMgr.GetPodNode(pod) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } + +// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. +func (f *Fluence) PostFilter( + ctx context.Context, + state *framework.CycleState, + pod *corev1.Pod, + filteredNodeStatusMap framework.NodeToStatusMap, +) (*framework.PostFilterResult, *framework.Status) { + + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + f.log.Info("Pod does not belong to any group, pod %s", pod.Name) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") + } + + // This explicitly checks nodes, and we can skip scheduling another pod if we already + // have the minimum. For fluence since we expect an exact size this likely is not needed + assigned := f.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace) + if assigned >= int(pg.Spec.MinMember) { + f.log.Info("Assigned pods podGroup %s is assigned %s", pgName, assigned) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } + + // Took out percentage chcek here, doesn't make sense to me. + + // It's based on an implicit assumption: if the nth Pod failed, + // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + f.log.Info("PostFilter rejects the pod for podGroup %s and pod %s", pgName, waitingPod.GetPod().Name) + waitingPod.Reject(f.Name(), "optimistic rejection in PostFilter") + } + }) + + if f.pgBackoff != nil { + pods, err := f.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: flabel.GetPodGroupLabel(pod)}), + ) + if err == nil && len(pods) >= int(pg.Spec.MinMember) { + f.pgMgr.BackoffPodGroup(pgName, *f.pgBackoff) + } + } + + f.pgMgr.DeletePermittedPodGroup(pgName) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, + fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name)) +} + +// Permit is the functions invoked by the framework at "Permit" extension point. +func (f *Fluence) Permit( + ctx context.Context, + state *framework.CycleState, + pod *corev1.Pod, + nodeName string, +) (*framework.Status, time.Duration) { + + f.log.Info("Checking permit for pod %s to node %s", pod.Name, nodeName) + waitTime := *f.scheduleTimeout + s := f.pgMgr.Permit(ctx, state, pod) + var retStatus *framework.Status + switch s { + case fcore.PodGroupNotSpecified: + f.log.Info("Checking permit for pod %s to node %s: PodGroupNotSpecified", pod.Name, nodeName) + return framework.NewStatus(framework.Success, ""), 0 + case fcore.PodGroupNotFound: + f.log.Info("Checking permit for pod %s to node %s: PodGroupNotFound", pod.Name, nodeName) + return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 + case fcore.Wait: + f.log.Info("Pod %s is waiting to be scheduled to node %s", pod.Name, nodeName) + _, pg := f.pgMgr.GetPodGroup(ctx, pod) + if wait := fgroup.GetWaitTimeDuration(pg, f.scheduleTimeout); wait != 0 { + waitTime = wait + } + retStatus = framework.NewStatus(framework.Wait) + + // We will also request to move the sibling pods back to activeQ. + f.pgMgr.ActivateSiblings(pod, state) + case fcore.Success: + pgFullName := flabel.GetPodGroupFullName(pod) + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if flabel.GetPodGroupFullName(waitingPod.GetPod()) == pgFullName { + f.log.Info("Permit allows pod %s", waitingPod.GetPod().Name) + waitingPod.Allow(f.Name()) + } + }) + f.log.Info("Permit allows pod %s", pod.Name) + retStatus = framework.NewStatus(framework.Success) + waitTime = 0 + } + + return retStatus, waitTime +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (f *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + return nil +} + +// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. +func (f *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + return + } + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + f.log.Info("Unreserve rejects pod %s in group %s", waitingPod.GetPod().Name, pgName) + waitingPod.Reject(f.Name(), "rejection in Unreserve") + } + }) + f.pgMgr.DeletePermittedPodGroup(pgName) +} diff --git a/sig-scheduler-plugins/pkg/fluence/group/group.go b/sig-scheduler-plugins/pkg/fluence/group/group.go index 0ee0831..dd039e3 100644 --- a/sig-scheduler-plugins/pkg/fluence/group/group.go +++ b/sig-scheduler-plugins/pkg/fluence/group/group.go @@ -2,6 +2,7 @@ package group import ( "fmt" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,6 +12,9 @@ import ( sched "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) +// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified. +const DefaultWaitTime = 60 * time.Second + // CreateFakeGroup wraps an arbitrary pod in a fake group for fluence to schedule // This happens only in PreFilter so we already sorted func CreateFakeGroup(pod *corev1.Pod) *sched.PodGroup { @@ -44,3 +48,17 @@ func GetCreationTimestamp(groupName string, pg *sched.PodGroup, podInfo *framewo klog.Errorf(" [Fluence] Pod group %s time IsZero, we should not have reached here", groupName) return metav1.NewMicroTime(*podInfo.InitialAttemptTimestamp) } + +// GetWaitTimeDuration returns a wait timeout based on the following precedences: +// 1. spec.scheduleTimeoutSeconds of the given pg, if specified +// 2. given scheduleTimeout, if not nil +// 3. fall back to DefaultWaitTime +func GetWaitTimeDuration(pg *sched.PodGroup, scheduleTimeout *time.Duration) time.Duration { + if pg != nil && pg.Spec.ScheduleTimeoutSeconds != nil { + return time.Duration(*pg.Spec.ScheduleTimeoutSeconds) * time.Second + } + if scheduleTimeout != nil && *scheduleTimeout != 0 { + return *scheduleTimeout + } + return DefaultWaitTime +} diff --git a/sig-scheduler-plugins/pkg/logger/logger.go b/sig-scheduler-plugins/pkg/logger/logger.go index 053021a..d1e238e 100644 --- a/sig-scheduler-plugins/pkg/logger/logger.go +++ b/sig-scheduler-plugins/pkg/logger/logger.go @@ -79,8 +79,8 @@ func (l *DebugLogger) log(level int, prefix string, message ...any) error { rest := message[1:] // msg := fmt.Sprintf(message...) - fmt.Printf("Compariing level %d >= %d\n", level, l.level) - if level >= l.level { + fmt.Printf("Compariing level %d <= %d\n", level, l.level) + if level <= l.level { logger.Printf(prolog, rest...) } return l.Stop() From ef0ed50b1bcfefc30285024cff1c538f66ad62e2 Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 19 Apr 2024 23:53:08 -0600 Subject: [PATCH 2/2] go: update to 1.21 Problem: we need to update to a newer go to keep up with the sig-scheduler upstream, and also the rainbow scheduler integration. Solution: upgrade to 1.21. This also required some refactor of the main.go and fluence due to changes in function signatures. This is a test to see if tests are passing - the fluxion-go bindings used here are from a branch (not merged yet) that can be used for the PR this one is going into, and before merging that final one we should merge and release the bindings more properly. Signed-off-by: vsoch --- sig-scheduler-plugins/cmd/controller/app/server.go | 8 +++++--- sig-scheduler-plugins/pkg/fluence/core/core.go | 1 + sig-scheduler-plugins/pkg/fluence/fluence.go | 5 +++-- src/build/scheduler/Dockerfile | 4 ++-- src/fluence/go.mod | 4 ++-- src/fluence/go.sum | 2 ++ 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/sig-scheduler-plugins/cmd/controller/app/server.go b/sig-scheduler-plugins/cmd/controller/app/server.go index d42c0f4..aae8625 100644 --- a/sig-scheduler-plugins/cmd/controller/app/server.go +++ b/sig-scheduler-plugins/cmd/controller/app/server.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" api "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "sigs.k8s.io/scheduler-plugins/pkg/controllers" ) @@ -50,9 +51,10 @@ func Run(s *ServerRunOptions) error { // Controller Runtime Controllers ctrl.SetLogger(klogr.New()) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - MetricsBindAddress: s.MetricsAddr, - Port: 9443, + Scheme: scheme, + Metrics: metricsserver.Options{ + BindAddress: s.MetricsAddr, + }, HealthProbeBindAddress: s.ProbeAddr, LeaderElection: s.EnableLeaderElection, LeaderElectionID: "sched-plugins-controllers", diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index ea300ce..7f1e052 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -293,6 +293,7 @@ func (pgMgr *PodGroupManager) PreFilter( // it may not necessarily pass Filter due to other constraints such as affinity/taints. _, ok := pgMgr.permittedPG.Get(pgFullName) if ok { + pgMgr.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", pgFullName) return nil } diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 099d2f3..fe113d6 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -67,7 +67,8 @@ var ( _ framework.EnqueueExtensions = &Fluence{} - permitWaitingTimeSeconds int64 = 60 + // Set to be the same as coscheduling + permitWaitingTimeSeconds int64 = 300 podGroupBackoffSeconds int64 = 0 ) @@ -77,7 +78,7 @@ const ( ) // Initialize and return a new Fluence Custom Scheduler Plugin -func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { ctx := context.TODO() diff --git a/src/build/scheduler/Dockerfile b/src/build/scheduler/Dockerfile index 67bd5ce..2a8892c 100644 --- a/src/build/scheduler/Dockerfile +++ b/src/build/scheduler/Dockerfile @@ -2,11 +2,11 @@ FROM fluxrm/flux-sched:jammy USER root ENV DEBIAN_FRONTEND=noninteractive -ENV GO_VERSION=1.19.10 +ENV GO_VERSION=1.21.9 RUN apt-get update && apt-get clean -y && apt -y autoremove -# Install go 19.10 +# Install go RUN wget https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz && tar -xvf go${GO_VERSION}.linux-amd64.tar.gz && \ mv go /usr/local && rm go${GO_VERSION}.linux-amd64.tar.gz diff --git a/src/fluence/go.mod b/src/fluence/go.mod index 5c57652..01fc126 100644 --- a/src/fluence/go.mod +++ b/src/fluence/go.mod @@ -1,9 +1,9 @@ module github.com/flux-framework/flux-k8s/flux-plugin/fluence -go 1.19 +go 1.21 require ( - github.com/flux-framework/fluxion-go v0.32.0 + github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 google.golang.org/grpc v1.38.0 google.golang.org/protobuf v1.26.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/src/fluence/go.sum b/src/fluence/go.sum index 5700215..534497d 100644 --- a/src/fluence/go.sum +++ b/src/fluence/go.sum @@ -100,6 +100,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flux-framework/fluxion-go v0.32.0 h1:NY6Y1mlTTTZhHD+CmAsDsdNTxUsAFDQoORpMZj8NFLI= github.com/flux-framework/fluxion-go v0.32.0/go.mod h1:ZI3QxSvUfgJE2Snur/SntJmVfpMjr6D4ICVmdqJ9fkQ= +github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2 h1:Yz/vVX0XfB2q51ZLh2p8YI5vphvv0rZF4PqtKPscvsY= +github.com/flux-framework/fluxion-go v0.32.1-0.20240420052153-909523c84ca2/go.mod h1:jA5+kOSLxchFzixzYEvMAGjkXB5yszO/HxUwdhX/5/U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=