diff --git a/go/master/main.go b/go/master/main.go index 751b43368..b6904e050 100644 --- a/go/master/main.go +++ b/go/master/main.go @@ -17,12 +17,10 @@ import ( "flag" "strconv" - logger "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - master "github.com/intelligent-machine-learning/dlrover/go/master/pkg" - "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" "github.com/intelligent-machine-learning/dlrover/go/master/pkg/server" + logger "github.com/sirupsen/logrus" ) func main() { @@ -41,10 +39,10 @@ func main() { // Listen and serve on defined port logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port) - var k8sClient *kubernetes.K8sClient if k8sScheduling { - k8sClient = kubernetes.NewK8sClient(namespace, jobName) + // Use incluster mode without kubeconfig. + kubeutils.NewGlobalK8sClient("", namespace) } - master := master.NewJobMaster(namespace, jobName, k8sClient) + master := master.NewJobMaster(namespace, jobName) master.Run() } diff --git a/go/master/master_suite_test.go b/go/master/master_suite_test.go new file mode 100644 index 000000000..2046daa0d --- /dev/null +++ b/go/master/master_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMaster(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Master Suite") +} diff --git a/go/master/pkg/batchscheduler/batchscheduler_suite_test.go b/go/master/pkg/batchscheduler/batchscheduler_suite_test.go new file mode 100644 index 000000000..ac73d66e7 --- /dev/null +++ b/go/master/pkg/batchscheduler/batchscheduler_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBatchscheduler(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Batchscheduler Suite") +} diff --git a/go/master/pkg/batchscheduler/elastic.go b/go/master/pkg/batchscheduler/elastic.go new file mode 100644 index 000000000..fbd6d37d4 --- /dev/null +++ b/go/master/pkg/batchscheduler/elastic.go @@ -0,0 +1,62 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + "context" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" +) + +// ElasticScheduler launches pods without waiting for all resouces of pod are ready +type ElasticScheduler struct { + KubeScheduler + SchedulerName string +} + +// NewElasticScheduler creates an elastic scheduler. +func NewElasticScheduler() *ElasticScheduler { + return &ElasticScheduler{ + KubeScheduler: KubeScheduler{ + toCreatePods: common.NewQueue(), + }, + SchedulerName: "elastic", + } +} + +// Start starts a routine to launch Pods. +func (scheduler *ElasticScheduler) Start(ctx context.Context, jobContext *common.JobContext) { + go scheduler.LoopToLaunchPods(ctx) +} + +// DoScheduling creates/updates/deletes pods +func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) { + for replicaType, spec := range plan.ReplicaSpecs { + for i := int32(0); i < spec.Replicas; i++ { + replicaConfig := &kubeutils.ReplicaConfig{ + Type: string(replicaType), + ID: i, + Number: spec.Replicas, + Rank: i, + } + podConfig := &kubeutils.PodConfig{ + Replica: replicaConfig, + TemplateSpec: spec.Template.DeepCopy(), + } + pod := kubeutils.BuildPod(jobContext, podConfig) + scheduler.toCreatePods.PushBack(pod) + } + } +} diff --git a/go/master/pkg/batchscheduler/elastic_internal_test.go b/go/master/pkg/batchscheduler/elastic_internal_test.go new file mode 100644 index 000000000..6c8202bd8 --- /dev/null +++ b/go/master/pkg/batchscheduler/elastic_internal_test.go @@ -0,0 +1,62 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + "fmt" + + commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("Elastic", func() { + It("Do scheduling to launch pods.", func() { + jobContext := &common.JobContext{ + NameSpace: "dlrover", + Name: "train-demo", + MasterHost: "127.0.0.1", + MasterPort: 12345, + } + + container := corev1.Container{ + Name: "main", + Image: "python:3.12.8", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/bash", "-c", "echo 0"}, + } + replicas := make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec) + replicas["worker"] = &commonv1.ReplicaSpec{ + Replicas: 3, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{container}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + } + schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas} + scheduler := NewElasticScheduler() + scheduler.DoScheduling(jobContext, schedulingPlan) + Expect(scheduler.toCreatePods.Len()).To(Equal(3)) + for i := 0; i < 3; i++ { + pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) + expectPodName := fmt.Sprintf("train-demo-worker-%d", i) + Expect(pod.ObjectMeta.Name).To(Equal(expectPodName)) + } + }) +}) diff --git a/go/master/pkg/batchscheduler/plan.go b/go/master/pkg/batchscheduler/plan.go new file mode 100644 index 000000000..87143e224 --- /dev/null +++ b/go/master/pkg/batchscheduler/plan.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler diff --git a/go/master/pkg/batchscheduler/scheduler.go b/go/master/pkg/batchscheduler/scheduler.go new file mode 100644 index 000000000..6d26f5d8f --- /dev/null +++ b/go/master/pkg/batchscheduler/scheduler.go @@ -0,0 +1,89 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler + +import ( + "context" + "time" + + elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" + logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" +) + +// BatchScheduler creates/updates/deletes the batch pods of an elastic job. +type BatchScheduler interface { + Start(ctx context.Context, jobContext *common.JobContext) + DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) +} + +// SchedulingPlan is the scheduling plan to notify the scheduler CURD pods. +type SchedulingPlan struct { + // ReplicaSpecs is a map which contains the replica specification to create Pods. + ReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec + + // CreatedPods are Pods to be created. + CreatedPods []*kubeutils.PodConfig + + // RemovedPods are Pods to be removed + RemovedPods []*kubeutils.PodConfig + + // OwnerJob specifies a job to scale. + OwnerJob *elasticjob.ElasticJob +} + +// NewBatchScheduler creates a batch scheduler according to the scheduler name. +func NewBatchScheduler(schedulerName string) BatchScheduler { + if schedulerName == "elastic" || schedulerName == "" { + scheduler := NewElasticScheduler() + return scheduler + } + return nil +} + +// KubeScheduler is the base scheduler to create/update/remove pods. +type KubeScheduler struct { + toCreatePods *common.Queue +} + +// LoopToLaunchPods launches pods from the pod queue. +func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) { + for { + select { + case <-ctx.Done(): + logger.Infof("The loop to launch Pod exists.") + default: + for scheduler.toCreatePods.Len() > 0 { + pod := scheduler.toCreatePods.PopFront().(*corev1.Pod) + err := kubeutils.GlobalK8sClient.CreatePod(ctx, pod) + if errors.IsAlreadyExists(err) { + logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name) + } else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + // Retry to create pod due to timeout. + scheduler.toCreatePods.PushFront(pod) + time.Sleep(5 * time.Second) + } else { + logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err) + panic(err.Error()) + } + } + } + time.Sleep(1 * time.Second) + } +} diff --git a/go/master/pkg/batchscheduler/volcano.go b/go/master/pkg/batchscheduler/volcano.go new file mode 100644 index 000000000..87143e224 --- /dev/null +++ b/go/master/pkg/batchscheduler/volcano.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchscheduler diff --git a/go/master/pkg/common/common_suite_test.go b/go/master/pkg/common/common_suite_test.go new file mode 100644 index 000000000..04d1cee41 --- /dev/null +++ b/go/master/pkg/common/common_suite_test.go @@ -0,0 +1,26 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCommon(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Common Suite") +} diff --git a/go/master/pkg/common/context.go b/go/master/pkg/common/context.go new file mode 100644 index 000000000..dfb61ffbd --- /dev/null +++ b/go/master/pkg/common/context.go @@ -0,0 +1,86 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "fmt" + "net" + "os" + "strconv" + "time" +) + +const masterServicePort = 215000 + +// JobContext stores the elastic job context. +type JobContext struct { + // Namespace is the kubernetes namespace where the job runs. + NameSpace string + // Name is the name of an elastic job. + Name string + // MasterHost is the host of master service. + MasterHost string + // MasterPort is the host of master port. + MasterPort int +} + +// NewJobContext creates a job context. +func NewJobContext(namespace string, name string) *JobContext { + host := fmt.Sprintf("elasticjob-%s-dlrover-master", name) + port := masterServicePort + + if !checkAddressReachable(host, port) { + host = os.Getenv("MY_POD_IP") + freePort, err := getFreePort() + if err != nil { + panic(err.Error()) + } + port = freePort + } + + return &JobContext{ + NameSpace: namespace, + Name: name, + MasterHost: host, + MasterPort: port, + } +} + +func checkAddressReachable(host string, port int) bool { + timeout := time.Second + masterAddr := net.JoinHostPort(host, strconv.Itoa(port)) + conn, err := net.DialTimeout("tcp", masterAddr, timeout) + if err != nil { + return false + } + if conn != nil { + defer conn.Close() + } + return true +} + +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} diff --git a/go/master/pkg/common/context_internal_test.go b/go/master/pkg/common/context_internal_test.go new file mode 100644 index 000000000..7671fe4ca --- /dev/null +++ b/go/master/pkg/common/context_internal_test.go @@ -0,0 +1,31 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Context", func() { + It("New Job Context", func() { + os.Setenv("MY_POD_IP", "127.0.0.1") + jobContext := NewJobContext("dlrover", "train-demo") + Expect(jobContext.MasterHost).To(Equal("127.0.0.1")) + Expect(jobContext.MasterPort > 0).To(BeTrue()) + }) + +}) diff --git a/go/master/pkg/common/queue.go b/go/master/pkg/common/queue.go new file mode 100644 index 000000000..706271297 --- /dev/null +++ b/go/master/pkg/common/queue.go @@ -0,0 +1,74 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "container/list" + "sync" +) + +// Queue is a thread-safe queue +type Queue struct { + lock sync.Mutex + data *list.List +} + +// NewQueue creates a Queue instance. +func NewQueue() *Queue { + q := new(Queue) + q.data = list.New() + q.lock = sync.Mutex{} + return q +} + +// PushFront pushes an element at the head of the queue. +func (q *Queue) PushFront(v interface{}) { + defer q.lock.Unlock() + q.lock.Lock() + q.data.PushFront(v) +} + +// PushBack pushes an element at the back of the queue. +func (q *Queue) PushBack(v interface{}) { + defer q.lock.Unlock() + q.lock.Lock() + q.data.PushBack(v) +} + +// PopFront gets the front element and removes it from the queue. +func (q *Queue) PopFront() interface{} { + defer q.lock.Unlock() + q.lock.Lock() + iter := q.data.Front() + v := iter.Value + q.data.Remove(iter) + return v +} + +// PopBack gets the back element and removes it from the queue. +func (q *Queue) PopBack() interface{} { + defer q.lock.Unlock() + q.lock.Lock() + iter := q.data.Back() + v := iter.Value + q.data.Remove(iter) + return v +} + +// Len gets the number of elements in the queue. +func (q *Queue) Len() int { + defer q.lock.Unlock() + q.lock.Lock() + return q.data.Len() +} diff --git a/go/master/pkg/common/queue_test.go b/go/master/pkg/common/queue_test.go new file mode 100644 index 000000000..70c8c3b5f --- /dev/null +++ b/go/master/pkg/common/queue_test.go @@ -0,0 +1,36 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + +var _ = Describe("Queue", func() { + It("Test Queue", func() { + queue := common.NewQueue() + queue.PushBack(2) + queue.PushFront(1) + queue.PushBack(3) + Expect(queue.Len()).To(Equal(3)) + front := queue.PopFront().(int) + Expect(front).To(Equal(1)) + back := queue.PopBack().(int) + Expect(back).To(Equal(3)) + }) + +}) diff --git a/go/master/pkg/jobmanager/manager.go b/go/master/pkg/jobmanager/manager.go new file mode 100644 index 000000000..d5569db28 --- /dev/null +++ b/go/master/pkg/jobmanager/manager.go @@ -0,0 +1,34 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager + +import ( + "context" + + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + +// JobManager is the interface to manager job lifecycle. +type JobManager interface { + Start(ctx context.Context, jobContext *common.JobContext) +} + +// NewJobManager creates a job manager. +func NewJobManager(elasticJob *elasticjobv1.ElasticJob) JobManager { + if elasticJob.Spec.DistributionStrategy == "pytorch" { + return NewPyTorchJobManager(elasticJob) + } + return nil +} diff --git a/go/master/pkg/jobmanager/pytorch.go b/go/master/pkg/jobmanager/pytorch.go new file mode 100644 index 000000000..240bc7432 --- /dev/null +++ b/go/master/pkg/jobmanager/pytorch.go @@ -0,0 +1,46 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager + +import ( + "context" + + elasticjobv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/batchscheduler" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" +) + +// PyTorchJobManager is the lifecycle manager of a PyTorch distributed training job. +type PyTorchJobManager struct { + replicaSchedulers map[string]batchscheduler.BatchScheduler +} + +// NewPyTorchJobManager creates PyTorch distributed training job manager. +func NewPyTorchJobManager(elasticJob *elasticjobv1.ElasticJob) *PyTorchJobManager { + schedulers := make(map[string]batchscheduler.BatchScheduler) + for replicaType, spec := range elasticJob.Spec.ReplicaSpecs { + scheduler := batchscheduler.NewBatchScheduler(spec.BatchScheduler) + schedulers[string(replicaType)] = scheduler + } + return &PyTorchJobManager{ + replicaSchedulers: schedulers, + } +} + +// Start starts the modules of the job manager. +func (jobManager *PyTorchJobManager) Start(ctx context.Context, jobContext *common.JobContext) { + for _, scheduler := range jobManager.replicaSchedulers { + scheduler.Start(ctx, jobContext) + } +} diff --git a/go/master/pkg/jobmanager/ray.go b/go/master/pkg/jobmanager/ray.go new file mode 100644 index 000000000..69e790164 --- /dev/null +++ b/go/master/pkg/jobmanager/ray.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager diff --git a/go/master/pkg/jobmanager/tensorflow.go b/go/master/pkg/jobmanager/tensorflow.go new file mode 100644 index 000000000..69e790164 --- /dev/null +++ b/go/master/pkg/jobmanager/tensorflow.go @@ -0,0 +1,14 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmanager diff --git a/go/master/pkg/kubernetes/client.go b/go/master/pkg/kubeutils/client.go similarity index 71% rename from go/master/pkg/kubernetes/client.go rename to go/master/pkg/kubeutils/client.go index d943eccd0..db64d2681 100644 --- a/go/master/pkg/kubernetes/client.go +++ b/go/master/pkg/kubeutils/client.go @@ -11,12 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "context" logger "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -26,16 +27,32 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +// GlobalK8sClient is the global client to access a k8s cluster. +var GlobalK8sClient *K8sClient + // K8sClient contains the instance to access a k8s cluster. type K8sClient struct { + namespace string config *rest.Config clientset *k8sApi.Clientset dynamicClient *dynamic.DynamicClient } +// GetGroupVersionResource :- gets GroupVersionResource for dynamic client +func GetGroupVersionResource(group, version, resource string) schema.GroupVersionResource { + return schema.GroupVersionResource{Group: group, Version: version, Resource: resource} +} + +// NewGlobalK8sClient initialize the global k8s client. +func NewGlobalK8sClient(kubeConfigPath string, namespace string) { + GlobalK8sClient = NewK8sClient(kubeConfigPath, namespace) +} + // NewK8sClient creates a k8s client instance. -func NewK8sClient(kubeConfigPath string) *K8sClient { - client := &K8sClient{} +func NewK8sClient(kubeConfigPath string, namespace string) *K8sClient { + client := &K8sClient{ + namespace: namespace, + } // creates the in-cluster config if kubeConfigPath == "" { @@ -68,22 +85,25 @@ func NewK8sClient(kubeConfigPath string) *K8sClient { } // GetCustomResourceInstance gets a custom resource instance from a k8s cluster. -func (client *K8sClient) GetCustomResourceInstance( - namespace string, name string, gvr schema.GroupVersionResource) ( +func (client *K8sClient) GetCustomResourceInstance(name string, gvr schema.GroupVersionResource) ( *unstructured.Unstructured, error, ) { // Unstructured utd, err := client.dynamicClient. Resource(gvr). - Namespace(namespace). - Get(context.TODO(), name, metav1.GetOptions{}) + Namespace(client.namespace). + Get(context.Background(), name, metav1.GetOptions{}) if err != nil { logger.Infof("fail to get %s %s", gvr.String(), name) } return utd, err } -// GetGroupVersionResource :- gets GroupVersionResource for dynamic client -func GetGroupVersionResource(group, version, resource string) schema.GroupVersionResource { - return schema.GroupVersionResource{Group: group, Version: version, Resource: resource} +// CreatePod creates a Pod instance in the cluster +func (client *K8sClient) CreatePod(ctx context.Context, pod *corev1.Pod) error { + _, err := client.clientset. + CoreV1(). + Pods(client.namespace). + Create(ctx, pod, metav1.CreateOptions{}) + return err } diff --git a/go/master/pkg/kubernetes/elasticjob.go b/go/master/pkg/kubeutils/elasticjob.go similarity index 86% rename from go/master/pkg/kubernetes/elasticjob.go rename to go/master/pkg/kubeutils/elasticjob.go index 16d597b58..1539834d1 100644 --- a/go/master/pkg/kubernetes/elasticjob.go +++ b/go/master/pkg/kubeutils/elasticjob.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" @@ -27,10 +27,9 @@ const ( ) // GetElasticJobInstance gets an elasticjob instance. -func GetElasticJobInstance(client *K8sClient, namespace string, jobName string) *elasticjob.ElasticJob { - +func GetElasticJobInstance(jobName string) *elasticjob.ElasticJob { gvr := GetGroupVersionResource(GROUP, VERSION, "elasticjobs") - utd, err := client.GetCustomResourceInstance(namespace, jobName, gvr) + utd, err := GlobalK8sClient.GetCustomResourceInstance(jobName, gvr) if err != nil { return nil } diff --git a/go/master/pkg/kubernetes/elasticjob_internal_test.go b/go/master/pkg/kubeutils/elasticjob_internal_test.go similarity index 87% rename from go/master/pkg/kubernetes/elasticjob_internal_test.go rename to go/master/pkg/kubeutils/elasticjob_internal_test.go index 6bdec0934..4da2b5d09 100644 --- a/go/master/pkg/kubernetes/elasticjob_internal_test.go +++ b/go/master/pkg/kubeutils/elasticjob_internal_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "os" @@ -24,8 +24,8 @@ var _ = Describe("Elasticjob", func() { It("Get an elasticjob instance", func() { kubeConfigPath := os.Getenv("KUBENETES_CLUSTER_CONFIG") if kubeConfigPath != "" { - k8sClient := NewK8sClient(kubeConfigPath) - job := GetElasticJobInstance(k8sClient, "dlrover", "torch-mnist") + NewGlobalK8sClient(kubeConfigPath, "dlrover") + job := GetElasticJobInstance("torch-mnist") Expect(job.Name).To(Equal("torch-minst")) } }) diff --git a/go/master/pkg/kubernetes/kubernetes_suite_test.go b/go/master/pkg/kubeutils/kubernetes_suite_test.go similarity index 97% rename from go/master/pkg/kubernetes/kubernetes_suite_test.go rename to go/master/pkg/kubeutils/kubernetes_suite_test.go index 261143afc..d9c0002b8 100644 --- a/go/master/pkg/kubernetes/kubernetes_suite_test.go +++ b/go/master/pkg/kubeutils/kubernetes_suite_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes +package kubeutils import ( "testing" diff --git a/go/master/pkg/kubeutils/pod.go b/go/master/pkg/kubeutils/pod.go new file mode 100644 index 000000000..b307ee3cf --- /dev/null +++ b/go/master/pkg/kubeutils/pod.go @@ -0,0 +1,157 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubeutils + +import ( + "fmt" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + envMasterAddr = "DLROVER_MASTER_ADDR" + envPodName = "MY_POD_NAME" + envPodIP = "MY_POD_IP" + envHostIP = "MY_HOST_IP" + envReplicaType = "REPLICA_TYPE" + envReplicaID = "REPLICA_ID" + envReplicaRank = "REPLICA_RANK" + envReplicaNum = "REPLICA_NUM" + + labelJobKey = "elasticjob.dlrover/name" + labelReplicaTypeKey = "elasticjob.dlrover/replica-type" + labelReplicaIDKey = "elasticjob.dlrover/replica-id" + labelReplicaRankKey = "elasticjob.dlrover/rank" +) + +// ReplicaConfig contains the replica specification. +type ReplicaConfig struct { + Type string + ID int32 + // Number if the total number of the replicas. + Number int32 + // Rank is the rank of the pod in the replicas. + Rank int32 +} + +// PodConfig contains the replica config and pod template spec. +type PodConfig struct { + Replica *ReplicaConfig + TemplateSpec *corev1.PodTemplateSpec +} + +// BuildPod builds a corev1.Pod. +func BuildPod(jobContext *common.JobContext, podConfig *PodConfig) *corev1.Pod { + podName := fmt.Sprintf("%s-%s-%d", jobContext.Name, podConfig.Replica.Type, podConfig.Replica.ID) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: podConfig.TemplateSpec.ObjectMeta, + Spec: podConfig.TemplateSpec.Spec, + } + // Set pod name and namespace. + pod.ObjectMeta.Name = podName + pod.ObjectMeta.Namespace = jobContext.NameSpace + + if pod.ObjectMeta.Labels == nil { + pod.ObjectMeta.Labels = make(map[string]string) + } + + // Insert Replica specifications into the pod labels. + pod.ObjectMeta.Labels[labelJobKey] = jobContext.Name + pod.ObjectMeta.Labels[labelReplicaTypeKey] = podConfig.Replica.Type + pod.ObjectMeta.Labels[labelReplicaIDKey] = fmt.Sprintf("%d", podConfig.Replica.ID) + pod.ObjectMeta.Labels[labelReplicaRankKey] = fmt.Sprintf("%d", podConfig.Replica.Rank) + + mainContainer := &pod.Spec.Containers[0] + insertJobMasterAddrEnv(mainContainer, jobContext.MasterHost, jobContext.MasterPort) + insertPodMetaEnv(mainContainer) + insertReplicaEnv(mainContainer, podConfig.Replica) + + return pod +} + +func insertJobMasterAddrEnv(container *corev1.Container, host string, port int) { + jobMasterServiceEnv := corev1.EnvVar{ + Name: envMasterAddr, + Value: fmt.Sprintf("%s:%d", host, port), + } + container.Env = append(container.Env, jobMasterServiceEnv) + +} + +func insertPodMetaEnv(container *corev1.Container) { + podNameEnv := corev1.EnvVar{ + Name: envPodName, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, + }, + } + container.Env = append(container.Env, podNameEnv) + + podIPEnv := corev1.EnvVar{ + Name: envPodIP, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + } + container.Env = append(container.Env, podIPEnv) + + hostIPEnv := corev1.EnvVar{ + Name: envHostIP, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.hostIP", + }, + }, + } + container.Env = append(container.Env, hostIPEnv) +} + +func insertReplicaEnv(container *corev1.Container, replicaConfig *ReplicaConfig) { + replicaTypeEnv := corev1.EnvVar{ + Name: envReplicaType, + Value: string(replicaConfig.Type), + } + container.Env = append(container.Env, replicaTypeEnv) + + replicaIDEnv := corev1.EnvVar{ + Name: envReplicaID, + Value: fmt.Sprintf("%d", replicaConfig.ID), + } + container.Env = append(container.Env, replicaIDEnv) + + rankIDEnv := corev1.EnvVar{ + Name: envReplicaNum, + Value: fmt.Sprintf("%d", replicaConfig.Rank), + } + container.Env = append(container.Env, rankIDEnv) + + replicaNumEnv := corev1.EnvVar{ + Name: envReplicaRank, + Value: fmt.Sprintf("%d", replicaConfig.Number), + } + container.Env = append(container.Env, replicaNumEnv) +} diff --git a/go/master/pkg/kubeutils/pod_internal_test.go b/go/master/pkg/kubeutils/pod_internal_test.go new file mode 100644 index 000000000..9cc5e8e4e --- /dev/null +++ b/go/master/pkg/kubeutils/pod_internal_test.go @@ -0,0 +1,85 @@ +// Copyright 2025 The DLRover Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubeutils + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" +) + +var _ = Describe("Pod", func() { + It("Create a Pod", func() { + jobContext := &common.JobContext{ + NameSpace: "dlrover", + Name: "train-demo", + MasterHost: "127.0.0.1", + MasterPort: 12345, + } + container := corev1.Container{ + Name: "main", + Image: "python:3.12.8", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/bash", "-c", "echo 0"}, + } + podConfig := &PodConfig{ + Replica: &ReplicaConfig{ + Type: "worker", + ID: 0, + Number: 8, + Rank: 0, + }, + TemplateSpec: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{container}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + } + pod := BuildPod(jobContext, podConfig) + Expect(pod.ObjectMeta.Name).To(Equal("train-demo-worker-0")) + Expect(pod.ObjectMeta.Namespace).To(Equal("dlrover")) + jobName, ok := pod.ObjectMeta.Labels[labelJobKey] + Expect(ok).To(BeTrue()) + Expect(jobName).To(Equal("train-demo")) + replicaType, ok := pod.ObjectMeta.Labels[labelReplicaTypeKey] + Expect(ok).To(BeTrue()) + Expect(replicaType).To(Equal("worker")) + + configPath := os.Getenv("KUBERNETES_CONFIG_PATH") + if _, err := os.Stat(configPath); errors.Is(err, os.ErrNotExist) { + Skip(fmt.Sprintf("The config file %s is not exist.", configPath)) + } + + k8sClient := NewK8sClient(configPath, "dlrover") + pod.ObjectMeta.Namespace = "no-namspace" + err := k8sClient.CreatePod(context.Background(), pod) + Expect(kubeerrors.IsBadRequest(err)).To(BeTrue()) + + pod.ObjectMeta.Namespace = "dlrover" + err = k8sClient.CreatePod(context.Background(), pod) + Expect(kubeerrors.IsAlreadyExists(err)).To(BeTrue()) + + pod.ObjectMeta.Name = "" + err = k8sClient.CreatePod(context.Background(), pod) + Expect(kubeerrors.IsInvalid(err)).To(BeTrue()) + }) +}) diff --git a/go/master/pkg/master.go b/go/master/pkg/master.go index 7baf4c2b5..651026ed0 100644 --- a/go/master/pkg/master.go +++ b/go/master/pkg/master.go @@ -14,37 +14,39 @@ package master import ( + "context" "time" - elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1" - "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/common" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/jobmanager" + "github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils" logger "github.com/sirupsen/logrus" ) // JobMaster is the master of an elasticjob. type JobMaster struct { - Namespace string - JobName string - K8sClient *kubernetes.K8sClient - Job *elasticjob.ElasticJob + jobContext *common.JobContext + jobManager jobmanager.JobManager } // NewJobMaster creates the master for an elasticjob. -func NewJobMaster(namespace string, jobName string, k8sClient *kubernetes.K8sClient) *JobMaster { - master := &JobMaster{ - Namespace: namespace, - JobName: jobName, - } - if k8sClient != nil { - job := kubernetes.GetElasticJobInstance(k8sClient, namespace, jobName) - master.K8sClient = k8sClient - master.Job = job +func NewJobMaster(namespace string, jobName string) *JobMaster { + master := &JobMaster{} + if kubeutils.GlobalK8sClient != nil { + elasticjob := kubeutils.GetElasticJobInstance(jobName) + master.jobManager = jobmanager.NewJobManager(elasticjob) } + master.jobContext = common.NewJobContext(namespace, jobName) logger.Infof("create a master of job %s.", jobName) return master } // Run starts the master instance. func (master *JobMaster) Run() { + ctx, cancel := context.WithCancel(context.Background()) + if master.jobManager != nil { + master.jobManager.Start(ctx, master.jobContext) + } + defer cancel() time.Sleep(10 * time.Hour) } diff --git a/go/master/pkg/master_internal_test.go b/go/master/pkg/master_internal_test.go index d651f513a..573ddc91d 100644 --- a/go/master/pkg/master_internal_test.go +++ b/go/master/pkg/master_internal_test.go @@ -20,8 +20,8 @@ import ( var _ = Describe("Master", func() { It("Create a master", func() { - master := NewJobMaster("dlrover", "test-master", nil) - Expect(master.Namespace).To(Equal("dlrover")) - Expect(master.JobName).To(Equal("test-master")) + master := NewJobMaster("dlrover", "test-master") + Expect(master.jobContext.NameSpace).To(Equal("dlrover")) + Expect(master.jobContext.Name).To(Equal("test-master")) }) })