Skip to content

Commit 40dcd2c

Browse files
committed
refactor: testing idea to wrap coscheduling
This is the "skeleton" of a new idea to wrap coscheduling, adding in the logic for fluence only where it is needed, likely in the PodGroup (in the new fluence/core/core that wraps the same in coscheduling). This is just a skeleton because we are deploying the sidecar with the wrapped scheduling and absolutely no logic ported over to AskFlux. I think I have a sense of where to put this, but wanted to save this vanilla/skeleton state in case we need to go back to it. Note that it did not work to have fluence inherit the functions from coscheduler, so I opted for a strategy of adding it as a helper field, and then just using it when necessary. Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 71156c5 commit 40dcd2c

File tree

14 files changed

+201
-596
lines changed

14 files changed

+201
-596
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ Fluence enables HPC-grade pod scheduling in Kubernetes via the [Kubernetes Sched
66

77
**Important** Fluence does not currently support use in conjunction with the kube-scheduler. Pods must all be scheduled by Fluence, and *you should not use both schedulers in the same cluster*.
88

9+
## TODO
10+
11+
- Need to list pods, get state, and if is completed, cancel the job id.
12+
- Keep track of state of all pods in group, when all of pods are completed, then issue cancel.
13+
- Calculate on the fly - on the update event we want to loop through pods, if ALL completed, then delete the podid for fluence.
14+
915
## Getting started
1016

1117
For instructions on how to start Fluence on a K8s cluster, see [examples](examples/). Documentation and instructions for reproducing our CANOPIE-2022 paper (citation below) can be found in the [canopie22-artifacts branch](https://github.com/flux-framework/flux-k8s/tree/canopie22-artifacts).

examples/pod-group/lammps/lammps2.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@ spec:
1414
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1515
resources:
1616
limits:
17-
cpu: 2
17+
cpu: 10
1818
requests:
19-
cpu: 2
19+
cpu: 10

examples/pod-group/lammps/lammps4-2.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ spec:
1717
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1818
resources:
1919
limits:
20-
cpu: 2
20+
cpu: 10
2121
requests:
22-
cpu: 2
22+
cpu: 10

examples/pod-group/lammps/lammps4-3.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ spec:
1717
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1818
resources:
1919
limits:
20-
cpu: 2
20+
cpu: 10
2121
requests:
22-
cpu: 2
22+
cpu: 10

examples/pod-group/lammps/lammps4.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ spec:
1818
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1919
resources:
2020
limits:
21-
cpu: 2
21+
cpu: 10
2222
requests:
23-
cpu: 2
23+
cpu: 10

examples/pod-group/lammps/lammps5.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ spec:
1717
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1818
resources:
1919
limits:
20-
cpu: 2
20+
cpu: 10
2121
requests:
22-
cpu: 2
22+
cpu: 10

examples/pod-group/lammps/lammps6.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ spec:
1717
command: lmp -v x 1 -v y 1 -v z 1 -in in.reaxc.hns -nocite
1818
resources:
1919
limits:
20-
cpu: 2
20+
cpu: 10
2121
requests:
22-
cpu: 2
22+
cpu: 10

examples/test_example/fluence-sized-job.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ spec:
1111
containers:
1212
- name: fluence-job
1313
image: busybox
14-
command: [echo, potato]
14+
command: [sleep, "20"]
1515
restartPolicy: Never
1616
backoffLimit: 4

sig-scheduler-plugins/cmd/scheduler/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
2828
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
29+
"sigs.k8s.io/scheduler-plugins/pkg/fluence"
2930
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
3031
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
3132
"sigs.k8s.io/scheduler-plugins/pkg/noderesources"
@@ -36,7 +37,7 @@ import (
3637
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
3738
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
3839
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"
39-
"sigs.k8s.io/scheduler-plugins/pkg/fluence"
40+
4041
// Ensure scheme package is initialized.
4142
_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
4243
)
@@ -56,8 +57,6 @@ func main() {
5657
app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
5758
app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
5859
app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
59-
// Sample plugins below.
60-
// app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
6160
app.WithPlugin(podstate.Name, podstate.New),
6261
app.WithPlugin(qos.Name, qos.New),
6362
app.WithPlugin(fluence.Name, fluence.New),

sig-scheduler-plugins/pkg/controllers/podgroup_controller.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
8282
log.Error(err, fmt.Sprintf("Unable to retrieve pod group %s", req.NamespacedName))
8383
return ctrl.Result{}, err
8484
}
85+
log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences)
8586

8687
// Grab all statuses (and groups of them) we are interested in
8788
schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending)
@@ -175,6 +176,7 @@ func (r *PodGroupReconciler) updateStatus(
175176
pods []v1.Pod,
176177
) (ctrl.Result, error) {
177178

179+
log := log.FromContext(ctx)
178180
patch := client.MergeFrom(pg.DeepCopy())
179181

180182
switch pg.Status.Phase {
@@ -186,24 +188,24 @@ func (r *PodGroupReconciler) updateStatus(
186188
}
187189

188190
case schedv1alpha1.PodGroupPending:
191+
result, err := r.updateOwnerReferences(ctx, pg, pods)
192+
if result.Requeue || err != nil {
193+
return result, err
194+
}
189195
if len(pods) >= int(pg.Spec.MinMember) {
190196
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
191-
result, err := r.updateOwnerReferences(ctx, pg, pods)
192-
if result.Requeue || err != nil {
193-
return result, err
194-
}
195197
}
196198
default:
197199

198-
// Get updated counts of running, succeeded, and failed pods
199-
running, succeeded, failed := getCurrentPodStats(pods)
200-
201200
// If for some reason we weren't pending and now have fewer than min required, flip back to pending
202201
if len(pods) < int(pg.Spec.MinMember) {
203202
pg.Status.Phase = schedv1alpha1.PodGroupPending
204203
break
205204
}
206205

206+
// Get updated counts of running, succeeded, and failed pods
207+
running, succeeded, failed := getCurrentPodStats(pods)
208+
207209
// A pod with succeeded + running STILL less than the minimum required is scheduling
208210
if succeeded+running < pg.Spec.MinMember {
209211
pg.Status.Phase = schedv1alpha1.PodGroupScheduling
@@ -232,16 +234,16 @@ func (r *PodGroupReconciler) updateStatus(
232234
}
233235

234236
// Apply the patch to update, or delete if finished
235-
// TODO would be better if owner references took here, so delete on owner deletion
236-
// TODO deletion is not currently handled for Deployment, ReplicaSet, StatefulSet
237-
// as they are expected to persist. You can delete / lose and bring up again
238237
var err error
239238
if pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed {
240-
err = r.Delete(ctx, pg)
241-
} else {
242-
r.Status().Update(ctx, pg)
243-
err = r.Patch(ctx, pg, patch)
239+
log.Info("PodGroup", "Status", "Finished", "Owners", pg.OwnerReferences)
240+
241+
// Update but don't requeue
242+
_, err := r.updateOwnerReferences(ctx, pg, pods)
243+
return ctrl.Result{}, err
244244
}
245+
r.Status().Update(ctx, pg)
246+
err = r.Patch(ctx, pg, patch)
245247
return ctrl.Result{Requeue: true}, err
246248
}
247249

@@ -366,21 +368,25 @@ func (r *PodGroupReconciler) updateOwnerReferences(
366368
return result, nil
367369
}
368370

369-
// Collect owner references for pod group
371+
// Collect current owner references for pod group,
372+
// We want to ensure we add unique ones across the pod
370373
owners := []metav1.OwnerReference{}
371374
var refs []string
372375
for _, ownerRef := range pod.OwnerReferences {
373376
refs = append(refs, fmt.Sprintf("%s/%s", pod.Namespace, ownerRef.Name))
374377
owners = append(owners, ownerRef)
375378
}
379+
376380
patch := client.MergeFrom(pg.DeepCopy())
377381
if len(refs) != 0 {
378382
sort.Strings(refs)
379383
pg.Status.OccupiedBy = strings.Join(refs, ",")
380384
}
385+
// If we have owners, collapose into list
381386
if len(owners) > 0 {
382387
pg.ObjectMeta.OwnerReferences = owners
383388
}
389+
384390
// Apply the patch to update the size
385391
r.Status().Update(ctx, pg)
386392
err := r.Patch(ctx, pg, patch)

sig-scheduler-plugins/pkg/fluence/README.md

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)