diff --git a/go/master/pkg/batchscheduler/elastic.go b/go/master/pkg/batchscheduler/elastic.go index fbd6d37d4..25365a864 100644 --- a/go/master/pkg/batchscheduler/elastic.go +++ b/go/master/pkg/batchscheduler/elastic.go @@ -18,6 +18,8 @@ import ( "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" + logger "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/errors" ) // ElasticScheduler launches pods without waiting for all resouces of pod are ready @@ -55,8 +57,20 @@ func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, p Replica: replicaConfig, TemplateSpec: spec.Template.DeepCopy(), } - pod := kubeutils.BuildPod(jobContext, podConfig) + pod := kubeutils.BuildPod(jobContext, podConfig, plan.OwnerJob) scheduler.toCreatePods.PushBack(pod) } } + for _, podConfig := range plan.CreatedPods { + pod := kubeutils.BuildPod(jobContext, podConfig, plan.OwnerJob) + scheduler.toCreatePods.PushBack(pod) + } + for _, name := range plan.RemovedPods { + err := kubeutils.GlobalK8sClient.RemovePod(name) + if errors.IsNotFound(err) { + logger.Infof("The Pod %s has been removed", name) + } else { + logger.Warnf("Fail to remove the pod %s, err = %v", name, err) + } + } } diff --git a/go/master/pkg/batchscheduler/scheduler.go b/go/master/pkg/batchscheduler/scheduler.go index 6d26f5d8f..5230888d7 100644 --- a/go/master/pkg/batchscheduler/scheduler.go +++ b/go/master/pkg/batchscheduler/scheduler.go @@ -17,7 +17,7 @@ import ( "context" "time" - elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" @@ -41,10 +41,15 @@ type SchedulingPlan struct { CreatedPods []*kubeutils.PodConfig // RemovedPods are Pods to be removed - RemovedPods []*kubeutils.PodConfig + RemovedPods []string // OwnerJob specifies a job to scale. - OwnerJob *elasticjob.ElasticJob + OwnerJob *elasticjobv1.ElasticJob +} + +// KubeScheduler is the base scheduler to create/update/remove pods. +type KubeScheduler struct { + toCreatePods *common.Queue } // NewBatchScheduler creates a batch scheduler according to the scheduler name. @@ -56,11 +61,6 @@ func NewBatchScheduler(schedulerName string) BatchScheduler { return nil } -// KubeScheduler is the base scheduler to create/update/remove pods. -type KubeScheduler struct { - toCreatePods *common.Queue -} - // LoopToLaunchPods launches pods from the pod queue. func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) { for { diff --git a/go/master/pkg/kubeutils/client.go b/go/master/pkg/kubeutils/client.go index db64d2681..353d6db07 100644 --- a/go/master/pkg/kubeutils/client.go +++ b/go/master/pkg/kubeutils/client.go @@ -25,6 +25,7 @@ import ( k8sApi "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/utils/ptr" ) // GlobalK8sClient is the global client to access a k8s cluster. @@ -107,3 +108,13 @@ func (client *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) error { Create(ctx, pod, metav1.CreateOptions{}) return err } + +// RemovePod removes a Pod instance in the cluster +func (client *K8sClient) RemovePod(name string) error { + err := client.clientset.CoreV1().Pods(client.namespace).Delete( + context.Background(), + name, + metav1.DeleteOptions{GracePeriodSeconds: ptr.To(int64(0))}, + ) + return err +} diff --git a/go/master/pkg/kubeutils/pod.go b/go/master/pkg/kubeutils/pod.go index b307ee3cf..a8bba7f64 100644 --- a/go/master/pkg/kubeutils/pod.go +++ b/go/master/pkg/kubeutils/pod.go @@ -16,6 +16,7 @@ package kubeutils import ( "fmt" + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,7 +55,7 @@ type PodConfig struct { } // BuildPod builds a corev1.Pod. -func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod { +func BuildPod(jobContext *common.JobContext, podConfig *PodConfig, ownerJob *elasticjobv1.ElasticJob) *corev1.Pod { podName := fmt.Sprintf("%s-%s-%d", jobContext.Name, podConfig.Replica.Type, podConfig.Replica.ID) pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -67,6 +68,9 @@ func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod { // Set pod name and namespace. pod.ObjectMeta.Name = podName pod.ObjectMeta.Namespace = jobContext.NameSpace + pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + *metav1.NewControllerRef(ownerJob, elasticjobv1.SchemeGroupVersionKind), + } if pod.ObjectMeta.Labels == nil { pod.ObjectMeta.Labels = make(map[string]string)