@@ -35,7 +35,7 @@ import (
3535const (
3636 dashboardAddr = "http://localhost:8265"
3737 clusterTimeout = 120.0
38- portforwardtimeout = 60.0
38+ portForwardTimeout = 60.0
3939)
4040
4141type SubmitJobOptions struct {
@@ -45,6 +45,7 @@ type SubmitJobOptions struct {
4545 workerNodeSelectors map [string ]string
4646 headNodeSelectors map [string ]string
4747 logColor string
48+ address string
4849 image string
4950 fileName string
5051 workingDir string
@@ -86,8 +87,9 @@ type JobInfo struct {
8687
8788var (
8889 jobSubmitLong = templates .LongDesc (`
89- Submit Ray job to Ray cluster as one would using Ray CLI e.g. 'ray job submit ENTRYPOINT'. Command supports all options that 'ray job submit' supports, except '--address'.
90- If Ray cluster is already setup, use 'kubectl ray session' instead.
90+ Submit Ray job to Ray cluster as one would using Ray CLI e.g. 'ray job submit ENTRYPOINT'.
91+ If Ray cluster is already setup, use 'kubectl ray session' instead. If '--address' is set, we connect directly
92+ without port-forwarding; if empty, we port-forward to localhost:8265.
9193
9294 If no RayJob YAML file is specified, the command will create a default RayJob for the user.
9395
@@ -149,6 +151,7 @@ func NewJobSubmitCommand(cmdFactory cmdutil.Factory, streams genericclioptions.I
149151 },
150152 }
151153 cmd .Flags ().StringVarP (& options .fileName , "filename" , "f" , "" , "Path and name of the Ray Job YAML file" )
154+ cmd .Flags ().StringVar (& options .address , "address" , "" , "Ray Dashboard base URL (e.g., https://ray.example.com). If set, skips port-forwarding." )
152155 cmd .Flags ().StringVar (& options .submissionID , "submission-id" , "" , "ID to specify for the Ray job. If not provided, one will be generated" )
153156 cmd .Flags ().StringVar (& options .runtimeEnv , "runtime-env" , "" , "Path and name to the runtime env YAML file." )
154157 cmd .Flags ().StringVar (& options .workingDir , "working-dir" , "" , "Directory containing files that your job will run in" )
@@ -307,6 +310,11 @@ func (options *SubmitJobOptions) Validate(cmd *cobra.Command) error {
307310 }
308311 }
309312
313+ if cmd .Flags ().Changed ("address" ) {
314+ if strings .TrimSpace (options .address ) == "" {
315+ return fmt .Errorf ("--address was provided but is empty" )
316+ }
317+ }
310318 return nil
311319}
312320
@@ -421,55 +429,62 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
421429 return fmt .Errorf ("Timed out waiting for cluster" )
422430 }
423431
424- svcName , err := k8sClients .GetRayHeadSvcName (ctx , options .namespace , util .RayCluster , options .cluster )
425- if err != nil {
426- return fmt .Errorf ("Failed to find service name: %w" , err )
427- }
428-
429- // start port forward section
430- portForwardCmd := portforward .NewCmdPortForward (factory , * options .ioStreams )
431- portForwardCmd .SetArgs ([]string {"service/" + svcName , fmt .Sprintf ("%d:%d" , 8265 , 8265 )})
432-
433- // create new context for port-forwarding so we can cancel the context to stop the port forwarding only
434- portforwardctx , cancel := context .WithCancel (ctx )
435- defer cancel ()
436- go func () {
437- fmt .Printf ("Port Forwarding service %s\n " , svcName )
438- if err := portForwardCmd .ExecuteContext (portforwardctx ); err != nil {
439- log .Fatalf ("Error occurred while port-forwarding Ray dashboard: %v" , err )
432+ if options .address == "" {
433+ svcName , err := k8sClients .GetRayHeadSvcName (ctx , options .namespace , util .RayCluster , options .cluster )
434+ if err != nil {
435+ return fmt .Errorf ("Failed to find service name: %w" , err )
440436 }
441- }()
442437
443- // Wait for port forward to be ready
444- var portforwardReady bool
445- portforwardWaitStartTime := time .Now ()
446- currTime = portforwardWaitStartTime
438+ // start port forward section
439+ portForwardCmd := portforward .NewCmdPortForward (factory , * options .ioStreams )
440+ portForwardCmd .SetArgs ([]string {"service/" + svcName , fmt .Sprintf ("%d:%d" , 8265 , 8265 )})
441+
442+ // create new context for port-forwarding so we can cancel the context to stop the port forwarding only
443+ portForwardCtx , cancel := context .WithCancel (ctx )
444+ defer cancel ()
445+ go func () {
446+ fmt .Printf ("Port forwarding service %s\n " , svcName )
447+ if err := portForwardCmd .ExecuteContext (portForwardCtx ); err != nil {
448+ log .Fatalf ("Error occurred while port-forwarding Ray dashboard: %v" , err )
449+ }
450+ }()
447451
448- portforwardCheckRequest , err := http .NewRequestWithContext (ctx , http .MethodGet , dashboardAddr , nil )
449- if err != nil {
450- return fmt .Errorf ("Error occurred when trying to create request to probe cluster endpoint: %w" , err )
451- }
452- httpClient := http.Client {
453- Timeout : 5 * time .Second ,
454- }
455- fmt .Printf ("Waiting for portforwarding..." )
456- for ! portforwardReady && currTime .Sub (portforwardWaitStartTime ).Seconds () <= portforwardtimeout {
457- time .Sleep (2 * time .Second )
458- rayDashboardResponse , err := httpClient .Do (portforwardCheckRequest )
452+ // Wait for port forward to be ready
453+ var portForwardReady bool
454+ portForwardWaitStartTime := time .Now ()
455+ currTime = portForwardWaitStartTime
456+
457+ portforwardCheckRequest , err := http .NewRequestWithContext (ctx , http .MethodGet , dashboardAddr , nil )
459458 if err != nil {
460- err = fmt .Errorf ("Error occurred when waiting for portforwarding: %w" , err )
461- fmt .Println (err )
459+ return fmt .Errorf ("Error occurred when trying to create request to probe cluster endpoint: %w" , err )
462460 }
463- if rayDashboardResponse . StatusCode >= 200 && rayDashboardResponse . StatusCode < 300 {
464- portforwardReady = true
461+ httpClient := http. Client {
462+ Timeout : 5 * time . Second ,
465463 }
466- rayDashboardResponse .Body .Close ()
467- currTime = time .Now ()
468- }
469- if ! portforwardReady {
470- return fmt .Errorf ("Timed out waiting for port forwarding" )
464+ fmt .Printf ("Waiting for port forwarding..." )
465+ for ! portForwardReady && currTime .Sub (portForwardWaitStartTime ).Seconds () <= portForwardTimeout {
466+ time .Sleep (2 * time .Second )
467+ rayDashboardResponse , err := httpClient .Do (portforwardCheckRequest )
468+ if err != nil {
469+ err = fmt .Errorf ("Error occurred when waiting for port forwarding: %w" , err )
470+ fmt .Println (err )
471+ currTime = time .Now ()
472+ continue
473+ }
474+ if rayDashboardResponse .StatusCode >= 200 && rayDashboardResponse .StatusCode < 300 {
475+ portForwardReady = true
476+ }
477+ rayDashboardResponse .Body .Close ()
478+ currTime = time .Now ()
479+ }
480+ if ! portForwardReady {
481+ return fmt .Errorf ("Timed out waiting for port forwarding" )
482+ }
483+ options .address = dashboardAddr
484+ fmt .Printf ("Port forwarding started on %s\n " , options .address )
485+ } else {
486+ fmt .Printf ("Using address %s (no port-forwarding)\n " , options .address )
471487 }
472- fmt .Printf ("Portforwarding started on %s\n " , dashboardAddr )
473488
474489 // If submission ID is not provided by the user, generate one.
475490 if options .submissionID == "" {
@@ -608,7 +623,11 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
608623}
609624
610625func (options * SubmitJobOptions ) raySubmitCmd () ([]string , error ) {
611- raySubmitCmd := []string {"ray" , "job" , "submit" , "--address" , dashboardAddr }
626+ addr := options .address
627+ if addr == "" {
628+ addr = dashboardAddr
629+ }
630+ raySubmitCmd := []string {"ray" , "job" , "submit" , "--address" , addr }
612631
613632 if len (options .runtimeEnv ) > 0 {
614633 raySubmitCmd = append (raySubmitCmd , "--runtime-env" , options .runtimeEnv )
0 commit comments