Skip to content

Commit

Permalink
naming: expand short named variables
Browse files Browse the repository at this point in the history
Problem: a lot of the variables with pg are hard to understand
Solution: write out podGroup or groupName explicitly.
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed May 3, 2024
1 parent 50ad162 commit 2aee785
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 211 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ I was having trouble developing this easily because it's a lot of steps to build
The last step ensures we use the images we loaded! You can basically just do:

```bash
/bin/bash ./hack/quick-build.sh
/bin/bash ./hack/quick-build-kind.sh
```

This sped up my development time immensely. If you want to manually do the steps, see that script for instructions.
Expand Down
36 changes: 36 additions & 0 deletions hack/quick-build-kind.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

# Before running this, you should:
# 1. create the kind cluster (needs more than one node, fluence does not scheduler to the control plane)
# 2. Install cert-manager
# 3. Customize the script to point to your registry if you intend to push

REGISTRY="${1:-ghcr.io/vsoch}"
HERE=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
ROOT=$(dirname ${HERE})

# Go to the script directory
cd ${ROOT}

# These build each of the images. The sidecar is separate from the other two in src/
make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONTROLLER_IMAGE=fluence-controller

# This is what it might look like to push
# docker push ghcr.io/vsoch/fluence-sidecar && docker push ghcr.io/vsoch/fluence-controller && docker push ghcr.io/vsoch/fluence:latest

# We load into kind so we don't need to push/pull and use up internet data ;)
kind load docker-image ${REGISTRY}/fluence-sidecar:latest
kind load docker-image ${REGISTRY}/fluence-controller:latest
kind load docker-image ${REGISTRY}/fluence:latest

# And then install using the charts. The pull policy ensures we use the loaded ones
cd ${ROOT}/upstream/manifests/install/charts
helm uninstall fluence || true
helm install \
--set scheduler.image=${REGISTRY}/fluence:latest \
--set scheduler.sidecarPullPolicy=Never \
--set scheduler.pullPolicy=Never \
--set controller.pullPolicy=Never \
--set controller.image=${REGISTRY}/fluence-controller:latest \
--set scheduler.sidecarimage=${REGISTRY}/fluence-sidecar:latest \
fluence as-a-second-scheduler/
22 changes: 1 addition & 21 deletions hack/quick-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,4 @@ ROOT=$(dirname ${HERE})
cd ${ROOT}

# These build each of the images. The sidecar is separate from the other two in src/
make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONTROLLER_IMAGE=fluence-controller

# This is what it might look like to push
# docker push ghcr.io/vsoch/fluence-sidecar && docker push ghcr.io/vsoch/fluence-controller && docker push ghcr.io/vsoch/fluence:latest

# We load into kind so we don't need to push/pull and use up internet data ;)
kind load docker-image ${REGISTRY}/fluence-sidecar:latest
kind load docker-image ${REGISTRY}/fluence-controller:latest
kind load docker-image ${REGISTRY}/fluence:latest

# And then install using the charts. The pull policy ensures we use the loaded ones
cd ${ROOT}/upstream/manifests/install/charts
helm uninstall fluence || true
helm install \
--set scheduler.image=${REGISTRY}/fluence:latest \
--set scheduler.sidecarPullPolicy=Never \
--set scheduler.pullPolicy=Never \
--set controller.pullPolicy=Never \
--set controller.image=${REGISTRY}/fluence-controller:latest \
--set scheduler.sidecarimage=${REGISTRY}/fluence-sidecar:latest \
fluence as-a-second-scheduler/
make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluence SIDECAR_IMAGE=fluence-sidecar CONTROLLER_IMAGE=fluence-controller
143 changes: 71 additions & 72 deletions sig-scheduler-plugins/pkg/fluence/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ type PodGroupManager struct {
// scheduleTimeout is the default timeout for podgroup scheduling.
// If podgroup's scheduleTimeoutSeconds is set, it will be used.
scheduleTimeout *time.Duration
// permittedPG stores the podgroup name which has passed the pre resource check.
permittedPG *gochache.Cache
// backedOffPG stores the podgorup name which failed scheduling recently.
backedOffPG *gochache.Cache
// permittedpodGroup stores the podgroup name which has passed the pre resource check.
permittedpodGroup *gochache.Cache
// backedOffpodGroup stores the podgorup name which failed scheduling recently.
backedOffpodGroup *gochache.Cache
// podLister is pod lister
podLister listerv1.PodLister

Expand All @@ -122,32 +122,32 @@ func NewPodGroupManager(
podInformer informerv1.PodInformer,
log *logger.DebugLogger,
) *PodGroupManager {
pgMgr := &PodGroupManager{
podGroupManager := &PodGroupManager{
client: client,
snapshotSharedLister: snapshotSharedLister,
scheduleTimeout: scheduleTimeout,
podLister: podInformer.Lister(),
permittedPG: gochache.New(3*time.Second, 3*time.Second),
backedOffPG: gochache.New(10*time.Second, 10*time.Second),
permittedpodGroup: gochache.New(3*time.Second, 3*time.Second),
backedOffpodGroup: gochache.New(10*time.Second, 10*time.Second),
groupToJobId: map[string]uint64{},
podToNode: map[string]string{},
log: log,
}
return pgMgr
return podGroupManager
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
func (podGroupManager *PodGroupManager) BackoffPodGroup(groupName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
}
pgMgr.backedOffPG.Add(pgName, nil, backoff)
podGroupManager.backedOffpodGroup.Add(groupName, nil, backoff)
}

// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
pgName := util.GetPodGroupLabel(pod)
if pgName == "" {
func (podGroupManager *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
groupName := util.GetPodGroupLabel(pod)
if groupName == "" {
return
}

Expand All @@ -158,11 +158,11 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
return
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}),
pods, err := podGroupManager.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: groupName}),
)
if err != nil {
klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", groupName)
return
}

Expand All @@ -188,40 +188,39 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
}

// GetStatuses string (of all pods) to show for debugging purposes
func (pgMgr *PodGroupManager) GetStatuses(
func (podGroupManager *PodGroupManager) GetStatuses(
pods []*corev1.Pod,
pod *corev1.Pod,
) string {
statuses := ""

// We need to distinguish 0 from the default and not finding anything
for _, p := range pods {
statuses += " " + fmt.Sprintf("%s", p.Status.Phase)
for _, pod := range pods {
statuses += " " + fmt.Sprintf("%s", pod.Status.Phase)
}
return statuses
}

// GetPodNode is a quick lookup to see if we have a node
func (pgMgr *PodGroupManager) GetPodNode(pod *corev1.Pod) string {
node, _ := pgMgr.podToNode[pod.Name]
func (podGroupManager *PodGroupManager) GetPodNode(pod *corev1.Pod) string {
node, _ := podGroupManager.podToNode[pod.Name]
return node
}

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
func (podGroupManager *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status {
groupName, podGroup := podGroupManager.GetPodGroup(ctx, pod)
if groupName == "" {
return PodGroupNotSpecified
}
if pg == nil {
if podGroup == nil {
// A Pod with a podGroup name but without a PodGroup found is denied.
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
assigned := podGroupManager.CalculateAssignedPods(podGroup.Name, podGroup.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
if int32(assigned)+1 >= podGroup.Spec.MinMember {
return Success
}

Expand All @@ -244,24 +243,24 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
// 1. it belongs to a podgroup that was recently denied or
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(
func (podGroupManager *PodGroupManager) PreFilter(
ctx context.Context,
pod *corev1.Pod,
state *framework.CycleState,
) error {

pgMgr.log.Info("[PodGroup PreFilter] pod %s", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
podGroupManager.log.Info("[PodGroup PreFilter] pod %s", klog.KObj(pod))
groupName, podGroup := podGroupManager.GetPodGroup(ctx, pod)
if podGroup == nil {
return nil
}

_, exist := pgMgr.backedOffPG.Get(pgFullName)
_, exist := podGroupManager.backedOffpodGroup.Get(groupName)
if exist {
return fmt.Errorf("podGroup %v failed recently", pgFullName)
return fmt.Errorf("podGroup %v failed recently", groupName)
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
pods, err := podGroupManager.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}),
)
if err != nil {
Expand All @@ -271,18 +270,18 @@ func (pgMgr *PodGroupManager) PreFilter(
// Only allow scheduling the first in the group so the others come after

// Get statuses to show for debugging
statuses := pgMgr.GetStatuses(pods, pod)
statuses := podGroupManager.GetStatuses(pods)

// This shows us the number of pods we have in the set and their states
pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods))
if len(pods) < int(pg.Spec.MinMember) {
podGroupManager.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", groupName, statuses, podGroup.Spec.MinMember, len(pods))
if len(pods) < int(podGroup.Spec.MinMember) {
return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), podGroup.Spec.MinMember)
}

// TODO we likely can take advantage of these resources or other custom
// attributes we add. For now ignore and calculate based on pod needs (above)
// if pg.Spec.MinResources == nil {
// if podGroup.Spec.MinResources == nil {
// fmt.Printf("Fluence Min resources are null, skipping PreFilter")
// return nil
// }
Expand All @@ -291,28 +290,28 @@ func (pgMgr *PodGroupManager) PreFilter(
// TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group.
// It only tries to PreFilter resource constraints so even if a PodGroup passed here,
// it may not necessarily pass Filter due to other constraints such as affinity/taints.
_, ok := pgMgr.permittedPG.Get(pgFullName)
_, ok := podGroupManager.permittedpodGroup.Get(groupName)
if ok {
pgMgr.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", pgFullName)
podGroupManager.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", groupName)
return nil
}

// TODO: right now we ask Fluxion for a podspec based on ONE pod, but
// TODO: right now we ask Fluxion for a podspec based on ONE representative pod, but
// we have the whole group! We can handle different pod needs now :)
repPod := pods[0]
nodes, err := pgMgr.AskFlux(ctx, *repPod, pg, pgFullName)
nodes, err := podGroupManager.AskFlux(ctx, *repPod, podGroup, groupName)
if err != nil {
pgMgr.log.Info("[PodGroup PreFilter] Fluxion returned an error %s, not schedulable", err.Error())
podGroupManager.log.Info("[PodGroup PreFilter] Fluxion returned an error %s, not schedulable", err.Error())
return err
}
pgMgr.log.Info("Node Selected %s (pod group %s)", nodes, pgFullName)
podGroupManager.log.Info("Node Selected %s (pod group %s)", nodes, groupName)

// Some reason fluxion gave us the wrong size?
if len(nodes) != len(pods) {
pgMgr.log.Warning("[PodGroup PreFilter] group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", pgFullName, len(pods), len(nodes))
pgMgr.mutex.Lock()
pgMgr.cancelFluxJob(pgFullName, repPod)
pgMgr.mutex.Unlock()
podGroupManager.log.Warning("[PodGroup PreFilter] group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", groupName, len(pods), len(nodes))
podGroupManager.mutex.Lock()
podGroupManager.cancelFluxJob(groupName, repPod)
podGroupManager.mutex.Unlock()
}

// Create a fluxState (CycleState) with all nodes - this is used to retrieve
Expand All @@ -324,32 +323,32 @@ func (pgMgr *PodGroupManager) PreFilter(
stateData := FluxStateData{NodeName: node}
state.Write(framework.StateKey(pod.Name), &stateData)
// Also save to the podToNode lookup
pgMgr.mutex.Lock()
pgMgr.podToNode[pod.Name] = node
pgMgr.mutex.Unlock()
podGroupManager.mutex.Lock()
podGroupManager.podToNode[pod.Name] = node
podGroupManager.mutex.Unlock()
}
pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
podGroupManager.permittedpodGroup.Add(groupName, groupName, *podGroupManager.scheduleTimeout)
return nil
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
groupName := util.GetPodGroupLabel(pod)
if len(groupName) == 0 {
return ts
}
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
var podGroup v1alpha1.PodGroup
if err := podGroupManager.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil {
return ts
}
return pg.CreationTimestamp.Time
return podGroup.CreationTimestamp.Time
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
func (podGroupManager *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
nodeInfos, err := podGroupManager.snapshotSharedLister.NodeInfos().List()
if err != nil {
pgMgr.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err)
podGroupManager.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err)
return 0
}
var count int
Expand All @@ -365,21 +364,21 @@ func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace stri
}

// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter.
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
func (podGroupManager *PodGroupManager) DeletePermittedPodGroup(groupName string) {
podGroupManager.permittedpodGroup.Delete(groupName)
}

// GetPodGroup returns the PodGroup that a Pod belongs to in cache.
func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
func (podGroupManager *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) {
groupName := util.GetPodGroupLabel(pod)
if len(groupName) == 0 {
return "", nil
}
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), nil
var podGroup v1alpha1.PodGroup
if err := podGroupManager.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil {
return fmt.Sprintf("%v/%v", pod.Namespace, groupName), nil
}
return fmt.Sprintf("%v/%v", pod.Namespace, pgName), &pg
return fmt.Sprintf("%v/%v", pod.Namespace, groupName), &podGroup
}

// GetNamespacedName returns the namespaced name.
Expand Down
Loading

0 comments on commit 2aee785

Please sign in to comment.