Skip to content

Commit

Permalink
[feat][kubectl-plugin] add scale command
Browse files Browse the repository at this point in the history
to scale a RayCluster's worker group.

closes #110

## Example Usage

```console
$ kubectl ray scale cluster -h                                                                                                                (base)
Scale a Ray cluster's worker group.

Usage:
  ray scale cluster (WORKERGROUP) (-c/--ray-cluster RAYCLUSTER) (-r/--replicas N) [flags]

Examples:
  # Scale a Ray cluster's worker group to 3 replicas
  kubectl ray scale cluster my-workergroup --ray-cluster my-raycluster --replicas 3

$ kubectl ray scale default-group --ray-cluster NONEXISTENT --replicas 0
Error: failed to scale worker group default-group in Ray cluster NONEXISTENT in namespace default: rayclusters.ray.io "NONEXISTENT" not found

$ kubectl ray scale DEADBEEF --ray-cluster dxia-test --replicas 1
Error: worker group DEADBEEF not found in Ray cluster dxia-test in namespace default. Available worker groups: default-group, another-group, yet-another-group

$ kubectl ray scale default-group --ray-cluster dxia-test --replicas 3
Scaled worker group default-group in Ray cluster dxia-test in namespace default from 0 to 3 replicas

$ kubectl ray scale default-group --ray-cluster dxia-test --replicas 1
Scaled worker group default-group in Ray cluster dxia-test in namespace default from 3 to 1 replicas

$ kubectl ray scale default-group --ray-cluster dxia-test --replicas -1
Error: must specify -r/--replicas with a non-negative integer
```

Signed-off-by: David Xia <david@davidxia.com>
  • Loading branch information
davidxia committed Feb 17, 2025
1 parent 7b13f94 commit a356189
Show file tree
Hide file tree
Showing 4 changed files with 416 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kubectl-plugin/pkg/cmd/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/get"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/job"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/log"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/scale"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/session"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/cmd/version"
)
Expand All @@ -35,6 +36,7 @@ func NewRayCommand(streams genericiooptions.IOStreams) *cobra.Command {
cmd.AddCommand(version.NewVersionCommand(streams))
cmd.AddCommand(create.NewCreateCommand(streams))
cmd.AddCommand(kubectlraydelete.NewDeleteCommand(streams))
cmd.AddCommand(scale.NewScaleCommand(streams))

return cmd
}
26 changes: 26 additions & 0 deletions kubectl-plugin/pkg/cmd/scale/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scale

import (
"fmt"
"strings"

"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

func NewScaleCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Use: "scale",
Short: "Scale a Ray resource",
SilenceUsage: true,
Run: func(cmd *cobra.Command, args []string) {
if len(args) > 0 {
fmt.Println(fmt.Errorf("unknown command(s) %q", strings.Join(args, " ")))
}
cmd.HelpFunc()(cmd, args)
},
}

cmd.AddCommand(NewScaleClusterCommand(streams))
return cmd
}
144 changes: 144 additions & 0 deletions kubectl-plugin/pkg/cmd/scale/scale_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package scale

import (
"context"
"fmt"
"io"
"os"
"strings"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/kubectl/pkg/util/templates"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)

type ScaleClusterOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
replicas *int32
workerGroup string
cluster string
}

var (
scaleLong = templates.LongDesc(`
Scale a Ray cluster's worker group.
`)

scaleExample = templates.Examples(`
# Scale a Ray cluster's worker group to 3 replicas
kubectl ray scale cluster my-workergroup --ray-cluster my-raycluster --replicas 3
`)
)

func NewScaleClusterOptions(streams genericclioptions.IOStreams) *ScaleClusterOptions {
return &ScaleClusterOptions{
configFlags: genericclioptions.NewConfigFlags(true),
ioStreams: &streams,
replicas: new(int32),
}
}

func NewScaleClusterCommand(streams genericclioptions.IOStreams) *cobra.Command {
options := NewScaleClusterOptions(streams)
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "cluster (WORKERGROUP) (-c/--ray-cluster RAYCLUSTER) (-r/--replicas N)",
Short: "Scale a Ray cluster's worker group",
Long: scaleLong,
Example: scaleExample,
SilenceUsage: true,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
k8sClient, err := client.NewClient(cmdFactory)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
return options.Run(cmd.Context(), k8sClient, os.Stdout)
},
}

cmd.Flags().StringVarP(&options.cluster, "ray-cluster", "c", "", "Ray cluster of the worker group")
cobra.CheckErr(cmd.MarkFlagRequired("ray-cluster"))
cmd.Flags().Int32VarP(options.replicas, "replicas", "r", -1, "Desired number of replicas in worker group")
options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *ScaleClusterOptions) Complete(args []string) error {
if *options.configFlags.Namespace == "" {
*options.configFlags.Namespace = "default"
}

options.workerGroup = args[0]

return nil
}

func (options *ScaleClusterOptions) Validate() error {
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return fmt.Errorf("error retrieving raw config: %w", err)
}

if !util.HasKubectlContext(config, options.configFlags) {
return fmt.Errorf("no context is currently set, use %q or %q to select a new one", "--context", "kubectl config use-context <context>")
}

if options.cluster == "" {
return fmt.Errorf("must specify -c/--ray-cluster")
}

if options.replicas == nil || *options.replicas < 0 {
return fmt.Errorf("must specify -r/--replicas with a non-negative integer")
}

return nil
}

func (options *ScaleClusterOptions) Run(ctx context.Context, k8sClient client.Client, writer io.Writer) error {
cluster, err := k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Get(ctx, options.cluster, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to scale worker group %s in Ray cluster %s in namespace %s: %w", options.workerGroup, options.cluster, *options.configFlags.Namespace, err)
}

// find the index of the worker group
var workerGroups []string
workerGroupIndex := -1
for i, workerGroup := range cluster.Spec.WorkerGroupSpecs {
workerGroups = append(workerGroups, workerGroup.GroupName)
if workerGroup.GroupName == options.workerGroup {
workerGroupIndex = i
}
}
if workerGroupIndex == -1 {
return fmt.Errorf("worker group %s not found in Ray cluster %s in namespace %s. Available worker groups: %s", options.workerGroup, options.cluster, *options.configFlags.Namespace, strings.Join(workerGroups, ", "))
}

previousReplicas := *cluster.Spec.WorkerGroupSpecs[workerGroupIndex].Replicas
if previousReplicas == *options.replicas {
fmt.Fprintf(writer, "worker group %s in Ray cluster %s in namespace %s already has %d replicas. Skipping\n", options.workerGroup, options.cluster, *options.configFlags.Namespace, previousReplicas)
return nil
}

cluster.Spec.WorkerGroupSpecs[workerGroupIndex].Replicas = options.replicas
_, err = k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Update(ctx, cluster, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to scale worker group %s in Ray cluster %s in namespace %s: %w", options.workerGroup, options.cluster, *options.configFlags.Namespace, err)
}

fmt.Fprintf(writer, "Scaled worker group %s in Ray cluster %s in namespace %s from %d to %d replicas\n", options.workerGroup, options.cluster, *options.configFlags.Namespace, previousReplicas, *options.replicas)
return nil
}
Loading

0 comments on commit a356189

Please sign in to comment.