Skip to content

Commit

Permalink
Add jobby status command and executor (#54)
Browse files Browse the repository at this point in the history
* Add status command and executor

Also moves the previous only command into its own executor, called
`jobby submit` for now.

Incorporates the new logs streaming endpoint.

Docs and test directories are now included as well, until we decide
what to do with them in the automatic regen (next commit).

* Add shell script for automatic OpenAPI client regeneration

Needs a running FastAPI backend on localhost port 8000.

Downloads its OpenAPI spec into a unique temporary file, runs
openapi-gen in a docker container on it, and moves the generated
client code (and only the source code) into the `client` subpackage.

This includes the docs and test directories, which contain autogen'd
info that we might not care about.
  • Loading branch information
nicholasjng authored Aug 26, 2024
1 parent 8248267 commit ed9c52e
Show file tree
Hide file tree
Showing 50 changed files with 2,504 additions and 437 deletions.
24 changes: 22 additions & 2 deletions backend/src/jobs_server/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import re
from enum import StrEnum
from typing import Annotated, Any, TypeAlias
from typing import Annotated, Any, Self, TypeAlias

from jobs import JobOptions
from jobs.types import ExecutionMode
from pydantic import UUID4, AfterValidator, BaseModel, Field, StrictStr


Expand Down Expand Up @@ -32,6 +32,26 @@ def validate_image_ref(ref: str) -> str:
SubmissionContext: TypeAlias = dict[str, Any]


class ExecutionMode(StrEnum):
"""
ExecutionMode
"""

"""
allowed enum values
"""
LOCAL = "local"
DOCKER = "docker"
KUEUE = "kueue"
RAYCLUSTER = "raycluster"
RAYJOB = "rayjob"

@classmethod
def from_json(cls, json_str: str) -> Self:
"""Create an instance of ExecutionMode from a JSON string"""
return cls(json.loads(json_str))


class CreateJobModel(BaseModel):
name: str
file: str
Expand Down
3 changes: 1 addition & 2 deletions backend/src/jobs_server/runner/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from typing import ClassVar, Self

from jobs import Image, Job
from jobs.types import ExecutionMode

from jobs_server.models import SubmissionContext, WorkloadIdentifier
from jobs_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier


class Runner(abc.ABC):
Expand Down
4 changes: 2 additions & 2 deletions backend/src/jobs_server/runner/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from jobs import Image, Job
from jobs.job import DockerResourceOptions

from jobs_server.models import SubmissionContext
from jobs_server.runner.base import ExecutionMode, Runner, _make_executor_command
from jobs_server.models import ExecutionMode, SubmissionContext
from jobs_server.runner.base import Runner, _make_executor_command
from jobs_server.utils.helpers import remove_none_values


Expand Down
4 changes: 2 additions & 2 deletions backend/src/jobs_server/runner/kueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from jobs.types import K8sResourceKind
from kubernetes import client

from jobs_server.models import SubmissionContext, WorkloadIdentifier
from jobs_server.runner.base import ExecutionMode, Runner, _make_executor_command
from jobs_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
from jobs_server.runner.base import Runner, _make_executor_command
from jobs_server.utils.k8s import (
KubernetesNamespaceMixin,
gvk,
Expand Down
4 changes: 2 additions & 2 deletions backend/src/jobs_server/runner/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from ray.dashboard.modules.job.common import JobStatus
from ray.dashboard.modules.job.sdk import JobSubmissionClient

from jobs_server.models import SubmissionContext, WorkloadIdentifier
from jobs_server.runner.base import ExecutionMode, Runner, _make_executor_command
from jobs_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier
from jobs_server.runner.base import Runner, _make_executor_command
from jobs_server.utils.k8s import (
KubernetesNamespaceMixin,
gvk,
Expand Down
1 change: 0 additions & 1 deletion backend/src/jobs_server/services/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def __init__(self):
except config.ConfigException:
logging.warning(
"Could not load in-cluster config, attempting to load Kubeconfig",
exc_info=True,
)
config.load_kube_config()
self._in_cluster = False
Expand Down
21 changes: 21 additions & 0 deletions client/hack/openapi-regen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash -eux

REPO_ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && cd ../.. && pwd )"
fname=openapi-$(date +%s).json
curl -o "$REPO_ROOT/$fname" http://localhost:8000/openapi.json

docker run --rm \
-v "$REPO_ROOT":/local \
openapitools/openapi-generator-cli \
generate \
-i /local/"$fname" \
-g python \
-o /local/openapi-client \
--additional-properties=generateSourceCodeOnly=true,packageName=openapi_client

cp -af "$REPO_ROOT"/openapi-client/openapi_client "$REPO_ROOT"/client/src

ruff format "$REPO_ROOT"/client/src/openapi_client/
ruff check --fix --unsafe-fixes "$REPO_ROOT"/client/src/openapi_client/
rm -rf "$REPO_ROOT"/openapi-client
rm "$REPO_ROOT/$fname"
7 changes: 5 additions & 2 deletions client/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ fallback_version = "0.0.0"
extend = "../pyproject.toml"
src = ["src"]

[tool.ruff.lint]
exclude = ["src/openapi_client/**"]
[tool.ruff.lint.per-file-ignores]
"src/openapi_client/**" = [
"B904", # raise-without-from-inside-except
"E721", # type-comparison
]

[tool.mypy]
ignore_missing_imports = true
Expand Down
70 changes: 57 additions & 13 deletions client/src/jobs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,89 @@
import openapi_client.configuration
from jobs import Image, Job
from jobs.submission_context import SubmissionContext
from jobs.types import ExecutionMode
from openapi_client import ExecutionMode


def submit(args: argparse.Namespace) -> None:
job = discover_job(args)

submit_job(job, args)


def status(args: argparse.Namespace) -> None:
api_config = openapi_client.Configuration(host="http://localhost:8000")

with openapi_client.ApiClient(api_config) as api:
client = openapi_client.JobManagementApi(api)

resp = client.status_jobs_uid_status_get(
uid=args.uid,
namespace=args.namespace,
)
pp(resp)


def _make_argparser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run an example job either locally, or on a container execution platform",
description="The jobby command-line interface",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument(
subparsers = parser.add_subparsers(required=True)

# jobby submit, the job submission command
submission = subparsers.add_parser(
"submit",
description="Run an example job either locally, or on a container execution platform",
)

submission.add_argument(
"--image-name",
help="Image name to use when building a container image",
default="example:latest",
)

parser.add_argument(
submission.add_argument(
"--mode",
help="Job execution mode",
default="local",
choices=list(ExecutionMode),
type=ExecutionMode,
)

parser.add_argument(
submission.add_argument(
"--kueue-local-queue",
help="Name of the Kueue LocalQueue to submit the workload to",
default="user-queue",
)

parser.add_argument(
submission.add_argument(
"--ray-head-url",
help="URL of the Ray cluster head node",
default="http://localhost:8265",
)

parser.add_argument(
submission.add_argument(
"--namespace",
help="Kubernetes namespace to create resources in, defaults to currently active namespace",
)

parser.add_argument("entrypoint")
submission.add_argument("entrypoint")
submission.set_defaults(func=submit)

# jobby status, the status querying command
status_query = subparsers.add_parser(
"status", description="Query the status of a previously dispatched job"
)

# unique identifier of the job
status_query.add_argument("uid", metavar="<ID>")

status_query.add_argument(
"--namespace",
help="Kubernetes namespace the job was created in, defaults to currently active namespace",
)
status_query.set_defaults(func=status)

return parser

Expand All @@ -74,15 +116,19 @@ def _build_image(job: Job) -> Image:
host="http://localhost:8000",
)
with openapi_client.ApiClient(api_config) as api:
client = openapi_client.DefaultApi(api)
client = openapi_client.JobManagementApi(api)

# Job options sent to server do not need image options
opts = openapi_client.CreateJobModel(
name=job.name,
file=job.file,
image_ref=_build_image(job).tag,
mode=mode,
options=job.options,
options=openapi_client.JobOptions.model_validate(
job.options.model_dump()
)
if job.options
else None,
submission_context=SubmissionContext().to_dict(),
)
resp = client.submit_job_jobs_post(opts)
Expand Down Expand Up @@ -132,6 +178,4 @@ def main():
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)

args = _make_argparser().parse_args()
job = discover_job(args)

submit_job(job, args)
args.func(args)
21 changes: 0 additions & 21 deletions client/src/jobs/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os
from enum import Enum
from pathlib import Path
Expand All @@ -15,26 +14,6 @@ class K8sResourceKind(Enum):
LIMITS = "limits"


class ExecutionMode(str, Enum):
"""
ExecutionMode
"""

"""
allowed enum values
"""
LOCAL = "local"
DOCKER = "docker"
KUEUE = "kueue"
RAYCLUSTER = "raycluster"
RAYJOB = "rayjob"

@classmethod
def from_json(cls, json_str: str) -> Self:
"""Create an instance of ExecutionMode from a JSON string"""
return cls(json.loads(json_str))


class NoOptions(TypedDict, total=True):
pass

Expand Down
7 changes: 6 additions & 1 deletion client/src/openapi_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
__version__ = "1.0.0"

# import apis into sdk package
from openapi_client.api.default_api import DefaultApi
from openapi_client.api.job_management_api import JobManagementApi

# import ApiClient
from openapi_client.api_response import ApiResponse
Expand All @@ -31,6 +31,11 @@

# import models into sdk package
from openapi_client.models.create_job_model import CreateJobModel
from openapi_client.models.execution_mode import ExecutionMode
from openapi_client.models.http_validation_error import HTTPValidationError
from openapi_client.models.job_options import JobOptions
from openapi_client.models.resource_options import ResourceOptions
from openapi_client.models.scheduling_options import SchedulingOptions
from openapi_client.models.validation_error import ValidationError
from openapi_client.models.validation_error_loc_inner import ValidationErrorLocInner
from openapi_client.models.workload_identifier import WorkloadIdentifier
2 changes: 1 addition & 1 deletion client/src/openapi_client/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# flake8: noqa

# import apis into api package
from openapi_client.api.default_api import DefaultApi
from openapi_client.api.job_management_api import JobManagementApi
Loading

0 comments on commit ed9c52e

Please sign in to comment.