Skip to content

Commit

Permalink
Merge branch 'master' into add-try-catch-protection-for-initialize-wo…
Browse files Browse the repository at this point in the history
…rkers
  • Loading branch information
BalaBalaYi authored Feb 6, 2025
2 parents 14cbcce + 6740f60 commit 48e27df
Show file tree
Hide file tree
Showing 25 changed files with 958 additions and 43 deletions.
12 changes: 5 additions & 7 deletions go/master/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ import (
"flag"
"strconv"

logger "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

master "github.com/intelligent-machine-learning/dlrover/go/master/pkg"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubernetes"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/server"
logger "github.com/sirupsen/logrus"
)

func main() {
Expand All @@ -41,10 +39,10 @@ func main() {

// Listen and serve on defined port
logger.Infof("The master starts with namespece %s, jobName %s, port %d", namespace, jobName, port)
var k8sClient *kubernetes.K8sClient
if k8sScheduling {
k8sClient = kubernetes.NewK8sClient(namespace, jobName)
// Use incluster mode without kubeconfig.
kubeutils.NewGlobalK8sClient("", namespace)
}
master := master.NewJobMaster(namespace, jobName, k8sClient)
master := master.NewJobMaster(namespace, jobName)
master.Run()
}
26 changes: 26 additions & 0 deletions go/master/master_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestMaster(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Master Suite")
}
26 changes: 26 additions & 0 deletions go/master/pkg/batchscheduler/batchscheduler_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestBatchscheduler(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Batchscheduler Suite")
}
62 changes: 62 additions & 0 deletions go/master/pkg/batchscheduler/elastic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
"context"

"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
)

// ElasticScheduler launches pods without waiting for all resouces of pod are ready
type ElasticScheduler struct {
KubeScheduler
SchedulerName string
}

// NewElasticScheduler creates an elastic scheduler.
func NewElasticScheduler() *ElasticScheduler {
return &ElasticScheduler{
KubeScheduler: KubeScheduler{
toCreatePods: common.NewQueue(),
},
SchedulerName: "elastic",
}
}

// Start starts a routine to launch Pods.
func (scheduler *ElasticScheduler) Start(ctx context.Context, jobContext *common.JobContext) {
go scheduler.LoopToLaunchPods(ctx)
}

// DoScheduling creates/updates/deletes pods
func (scheduler *ElasticScheduler) DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan) {
for replicaType, spec := range plan.ReplicaSpecs {
for i := int32(0); i < spec.Replicas; i++ {
replicaConfig := &kubeutils.ReplicaConfig{
Type: string(replicaType),
ID: i,
Number: spec.Replicas,
Rank: i,
}
podConfig := &kubeutils.PodConfig{
Replica: replicaConfig,
TemplateSpec: spec.Template.DeepCopy(),
}
pod := kubeutils.BuildPod(jobContext, podConfig)
scheduler.toCreatePods.PushBack(pod)
}
}
}
62 changes: 62 additions & 0 deletions go/master/pkg/batchscheduler/elastic_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
"fmt"

commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
)

var _ = Describe("Elastic", func() {
It("Do scheduling to launch pods.", func() {
jobContext := &common.JobContext{
NameSpace: "dlrover",
Name: "train-demo",
MasterHost: "127.0.0.1",
MasterPort: 12345,
}

container := corev1.Container{
Name: "main",
Image: "python:3.12.8",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"/bin/bash", "-c", "echo 0"},
}
replicas := make(map[commonv1.ReplicaType]*commonv1.ReplicaSpec)
replicas["worker"] = &commonv1.ReplicaSpec{
Replicas: 3,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{container},
RestartPolicy: corev1.RestartPolicyNever,
},
},
}
schedulingPlan := &SchedulingPlan{ReplicaSpecs: replicas}
scheduler := NewElasticScheduler()
scheduler.DoScheduling(jobContext, schedulingPlan)
Expect(scheduler.toCreatePods.Len()).To(Equal(3))
for i := 0; i < 3; i++ {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
expectPodName := fmt.Sprintf("train-demo-worker-%d", i)
Expect(pod.ObjectMeta.Name).To(Equal(expectPodName))
}
})
})
14 changes: 14 additions & 0 deletions go/master/pkg/batchscheduler/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler
89 changes: 89 additions & 0 deletions go/master/pkg/batchscheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler

import (
"context"
"time"

elasticjob "github.com/intelligent-machine-learning/dlrover/go/elasticjob/api/v1alpha1"
commonv1 "github.com/intelligent-machine-learning/dlrover/go/elasticjob/pkg/common/api/v1"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/common"
"github.com/intelligent-machine-learning/dlrover/go/master/pkg/kubeutils"
logger "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

// BatchScheduler creates/updates/deletes the batch pods of an elastic job.
type BatchScheduler interface {
Start(ctx context.Context, jobContext *common.JobContext)
DoScheduling(jobContext *common.JobContext, plan *SchedulingPlan)
}

// SchedulingPlan is the scheduling plan to notify the scheduler CURD pods.
type SchedulingPlan struct {
// ReplicaSpecs is a map which contains the replica specification to create Pods.
ReplicaSpecs map[commonv1.ReplicaType]*commonv1.ReplicaSpec

// CreatedPods are Pods to be created.
CreatedPods []*kubeutils.PodConfig

// RemovedPods are Pods to be removed
RemovedPods []*kubeutils.PodConfig

// OwnerJob specifies a job to scale.
OwnerJob *elasticjob.ElasticJob
}

// NewBatchScheduler creates a batch scheduler according to the scheduler name.
func NewBatchScheduler(schedulerName string) BatchScheduler {
if schedulerName == "elastic" || schedulerName == "" {
scheduler := NewElasticScheduler()
return scheduler
}
return nil
}

// KubeScheduler is the base scheduler to create/update/remove pods.
type KubeScheduler struct {
toCreatePods *common.Queue
}

// LoopToLaunchPods launches pods from the pod queue.
func (scheduler *KubeScheduler) LoopToLaunchPods(ctx context.Context) {
for {
select {
case <-ctx.Done():
logger.Infof("The loop to launch Pod exists.")
default:
for scheduler.toCreatePods.Len() > 0 {
pod := scheduler.toCreatePods.PopFront().(*corev1.Pod)
err := kubeutils.GlobalK8sClient.CreatePod(ctx, pod)
if errors.IsAlreadyExists(err) {
logger.Warnf("The pod %s already exists.", pod.ObjectMeta.Name)
} else if errors.IsTooManyRequests(err) || errors.IsTimeout(err) || errors.IsServerTimeout(err) {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
// Retry to create pod due to timeout.
scheduler.toCreatePods.PushFront(pod)
time.Sleep(5 * time.Second)
} else {
logger.Warnf("Fail to create pod %s with err: %v", pod.ObjectMeta.Name, err)
panic(err.Error())
}
}
}
time.Sleep(1 * time.Second)
}
}
14 changes: 14 additions & 0 deletions go/master/pkg/batchscheduler/volcano.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchscheduler
26 changes: 26 additions & 0 deletions go/master/pkg/common/common_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The DLRover Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestCommon(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Common Suite")
}
Loading

0 comments on commit 48e27df

Please sign in to comment.