Skip to content

Commit

Permalink
Set Pod owner reference as the e ElasticJob.
Browse files Browse the repository at this point in the history
  • Loading branch information
workingloong committed Feb 3, 2025
1 parent 6740f60 commit 2290ada
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
16 changes: 15 additions & 1 deletion go/master/pkg/batchscheduler/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
logger "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
)

// ElasticScheduler launches pods without waiting for all resouces of pod are ready
Expand Down Expand Up @@ -55,8 +57,20 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p
Replica: replicaConfig,
TemplateSpec: spec.Template.DeepCopy(),
}
pod := kubeutils.BuildPod(jobContext, podConfig)
pod := kubeutils.BuildPod(jobContext, podConfig, plan.OwnerJob)
scheduler.toCreatePods.PushBack(pod)
}
}
for _, podConfig := range plan.CreatedPods {
pod := kubeutils.BuildPod(jobContext, podConfig, plan.OwnerJob)
scheduler.toCreatePods.PushBack(pod)
}
for _, name := range plan.RemovedPods {
err := kubeutils.GlobalK8sClient.RemovePod(name)
if errors.IsNotFound(err) {
logger.Infof("The Pod %s has been removed", name)
} else {
logger.Warnf("Fail to remove the pod %s, err = %v", name, err)
}
}
}
16 changes: 8 additions & 8 deletions go/master/pkg/batchscheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"time"

elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
Expand All @@ -41,10 +41,15 @@ type SchedulingPlan struct {
CreatedPods []*kubeutils.PodConfig

// RemovedPods are Pods to be removed
RemovedPods []*kubeutils.PodConfig
RemovedPods []string

// OwnerJob specifies a job to scale.
OwnerJob *elasticjob.ElasticJob
OwnerJob *elasticjobv1.ElasticJob
}

// KubeScheduler is the base scheduler to create/update/remove pods.
type KubeScheduler struct {
toCreatePods *common.Queue
}

// NewBatchScheduler creates a batch scheduler according to the scheduler name.
Expand All @@ -56,11 +61,6 @@ func NewBatchScheduler(schedulerName string) BatchScheduler {
return nil
}

// KubeScheduler is the base scheduler to create/update/remove pods.
type KubeScheduler struct {
toCreatePods *common.Queue
}

// LoopToLaunchPods launches pods from the pod queue.
func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) {
for {
Expand Down
11 changes: 11 additions & 0 deletions go/master/pkg/kubeutils/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
k8sApi "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"
)

// GlobalK8sClient is the global client to access a k8s cluster.
Expand Down Expand Up @@ -107,3 +108,13 @@ func (client *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) error {
Create(ctx, pod, metav1.CreateOptions{})
return err
}

// RemovePod removes a Pod instance in the cluster
func (client *K8sClient) RemovePod(name string) error {
err := client.clientset.CoreV1().Pods(client.namespace).Delete(
context.Background(),
name,
metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))},
)
return err
}
6 changes: 5 additions & 1 deletion go/master/pkg/kubeutils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kubeutils
import (
"fmt"

elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,7 +55,7 @@ type PodConfig struct {
}

// BuildPod builds a corev1.Pod.
func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod {
func BuildPod(jobContext *common.JobContext, podConfig *PodConfig, ownerJob *elasticjobv1.ElasticJob) *corev1.Pod {
podName := fmt.Sprintf("%s-%s-%d", jobContext.Name, podConfig.Replica.Type, podConfig.Replica.ID)
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Expand All @@ -67,6 +68,9 @@ func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod {
// Set pod name and namespace.
pod.ObjectMeta.Name = podName
pod.ObjectMeta.Namespace = jobContext.NameSpace
pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(ownerJob, elasticjobv1.SchemeGroupVersionKind),
}

if pod.ObjectMeta.Labels == nil {
pod.ObjectMeta.Labels = make(map[string]string)
Expand Down

0 comments on commit 2290ada

Please sign in to comment.