From 2d088721d1c693e6bb8d6792867881a74fa74e78 Mon Sep 17 00:00:00 2001 From: Pete Date: Fri, 8 Sep 2023 15:30:01 -0700 Subject: [PATCH] various improvements (#254) --- CHANGELOG.md | 9 +++++++++ beaker/data_model/cluster.py | 1 + beaker/data_model/job.py | 12 ++++++++++++ beaker/data_model/node.py | 1 + beaker/services/cluster.py | 21 +++++++++++++++++++-- beaker/services/job.py | 4 ++++ 6 files changed, 46 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 008a2a9..01d28b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,15 @@ use patch releases for compatibility fixes instead. ## Unreleased +### Added + +- Added `.priority` convenience property to `Job` data model class. +- Added `Beaker.job.url()` method to get the URL for a job. + +### Fixed + +- Fixed a bug with `Beaker.cluster.utilization()` that resulted in inflated numbers for the amount of running jobs. + ## [v1.20.1](https://github.com/allenai/beaker-py/releases/tag/v1.20.1) - 2023-09-01 ### Fixed diff --git a/beaker/data_model/cluster.py b/beaker/data_model/cluster.py index 231afa6..891fef5 100644 --- a/beaker/data_model/cluster.py +++ b/beaker/data_model/cluster.py @@ -73,6 +73,7 @@ class ClusterUtilization(BaseModel): cluster: Cluster running_jobs: int queued_jobs: int + running_preemptible_jobs: int nodes: Tuple[NodeUtilization, ...] @property diff --git a/beaker/data_model/job.py b/beaker/data_model/job.py index 04a7729..08f1703 100644 --- a/beaker/data_model/job.py +++ b/beaker/data_model/job.py @@ -202,6 +202,18 @@ def was_preempted(self) -> bool: CanceledCode.user_preemption, } + @property + def priority(self) -> Optional[Priority]: + """ + Get the priority of the job. + """ + if self.session is not None: + return self.session.priority + elif self.execution is not None: + return self.execution.spec.context.priority + else: + return None + def check(self): """ :raises JobFailedError: If the job failed or was canceled. diff --git a/beaker/data_model/node.py b/beaker/data_model/node.py index 5f374d9..29573d0 100644 --- a/beaker/data_model/node.py +++ b/beaker/data_model/node.py @@ -31,6 +31,7 @@ class NodeUtilization(BaseModel): hostname: str limits: NodeResources running_jobs: int + running_preemptible_jobs: int used: NodeResources free: NodeResources cordoned: bool = False diff --git a/beaker/services/cluster.py b/beaker/services/cluster.py index 33312ee..b940c0f 100644 --- a/beaker/services/cluster.py +++ b/beaker/services/cluster.py @@ -199,13 +199,24 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: running_jobs = 0 queued_jobs = 0 + running_preemptible_jobs = 0 node_to_util: Dict[str, Dict[str, Union[int, float]]] = { - node.id: {"running_jobs": 0, "gpus_used": 0, "cpus_used": 0.0} for node in nodes + node.id: { + "running_jobs": 0, + "running_preemptible_jobs": 0, + "gpus_used": 0, + "cpus_used": 0.0, + } + for node in nodes } for job in self.beaker.job.list(cluster=cluster, finalized=False): - if job.status.current == CurrentJobStatus.running: + if job.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle): + if job.node not in node_to_util: + continue running_jobs += 1 + if job.priority == Priority.preemptible: + running_preemptible_jobs += 1 elif job.status.current == CurrentJobStatus.created: queued_jobs += 1 @@ -215,6 +226,8 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: node_util = node_to_util[job.node] node_util["running_jobs"] += 1 + if job.priority == Priority.preemptible: + node_util["running_preemptible_jobs"] += 1 if job.requests is not None: if job.requests.gpu_count is not None: node_util["gpus_used"] += job.requests.gpu_count @@ -225,6 +238,7 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: cluster=cluster, running_jobs=running_jobs, queued_jobs=queued_jobs, + running_preemptible_jobs=running_preemptible_jobs, nodes=tuple( [ NodeUtilization( @@ -232,6 +246,9 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization: hostname=node.hostname, limits=node.limits, running_jobs=int(node_to_util[node.id]["running_jobs"]), + running_preemptible_jobs=int( + node_to_util[node.id]["running_preemptible_jobs"] + ), used=NodeResources( gpu_count=None if node.limits.gpu_count is None diff --git a/beaker/services/job.py b/beaker/services/job.py index d52f4bd..c81b3f1 100644 --- a/beaker/services/job.py +++ b/beaker/services/job.py @@ -605,3 +605,7 @@ def display_name(j: Job) -> str: finally: if owned_progress: progress.stop() + + def url(self, job: Union[str, Job]) -> str: + job_id = job.id if isinstance(job, Job) else job + return f"{self.config.agent_address}/job/{self.url_quote(job_id)}"