Skip to content

Commit

Permalink
various improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
epwalsh committed Sep 8, 2023
1 parent 1566e3e commit d8307bb
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 2 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use patch releases for compatibility fixes instead.

## Unreleased

### Added

- Added `.priority` convenience property to `Job` data model class.
- Added `Beaker.job.url()` method to get the URL for a job.

### Fixed

- Fixed a bug with `Beaker.cluster.utilization()` that resulted in inflated numbers for the amount of running jobs.

## [v1.20.1](https://github.com/allenai/beaker-py/releases/tag/v1.20.1) - 2023-09-01

### Fixed
Expand Down
1 change: 1 addition & 0 deletions beaker/data_model/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ClusterUtilization(BaseModel):
cluster: Cluster
running_jobs: int
queued_jobs: int
running_preemptible_jobs: int
nodes: Tuple[NodeUtilization, ...]

@property
Expand Down
12 changes: 12 additions & 0 deletions beaker/data_model/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ def was_preempted(self) -> bool:
CanceledCode.user_preemption,
}

@property
def priority(self) -> Optional[Priority]:
"""
Get the priority of the job.
"""
if self.session is not None:
return self.session.priority
elif self.execution is not None:
return self.execution.spec.context.priority
else:
return None

def check(self):
"""
:raises JobFailedError: If the job failed or was canceled.
Expand Down
1 change: 1 addition & 0 deletions beaker/data_model/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class NodeUtilization(BaseModel):
hostname: str
limits: NodeResources
running_jobs: int
running_preemptible_jobs: int
used: NodeResources
free: NodeResources
cordoned: bool = False
21 changes: 19 additions & 2 deletions beaker/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,24 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:

running_jobs = 0
queued_jobs = 0
running_preemptible_jobs = 0
node_to_util: Dict[str, Dict[str, Union[int, float]]] = {
node.id: {"running_jobs": 0, "gpus_used": 0, "cpus_used": 0.0} for node in nodes
node.id: {
"running_jobs": 0,
"running_preemptible_jobs": 0,
"gpus_used": 0,
"cpus_used": 0.0,
}
for node in nodes
}

for job in self.beaker.job.list(cluster=cluster, finalized=False):
if job.status.current == CurrentJobStatus.running:
if job.status.current in (CurrentJobStatus.running, CurrentJobStatus.idle):
if job.node not in node_to_util:
continue
running_jobs += 1
if job.priority == Priority.preemptible:
running_preemptible_jobs += 1
elif job.status.current == CurrentJobStatus.created:
queued_jobs += 1

Expand All @@ -215,6 +226,8 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:

node_util = node_to_util[job.node]
node_util["running_jobs"] += 1
if job.priority == Priority.preemptible:
node_util["running_preemptible_jobs"] += 1
if job.requests is not None:
if job.requests.gpu_count is not None:
node_util["gpus_used"] += job.requests.gpu_count
Expand All @@ -225,13 +238,17 @@ def utilization(self, cluster: Union[str, Cluster]) -> ClusterUtilization:
cluster=cluster,
running_jobs=running_jobs,
queued_jobs=queued_jobs,
running_preemptible_jobs=running_preemptible_jobs,
nodes=tuple(
[
NodeUtilization(
id=node.id,
hostname=node.hostname,
limits=node.limits,
running_jobs=int(node_to_util[node.id]["running_jobs"]),
running_preemptible_jobs=int(
node_to_util[node.id]["running_preemptible_jobs"]
),
used=NodeResources(
gpu_count=None
if node.limits.gpu_count is None
Expand Down
4 changes: 4 additions & 0 deletions beaker/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,7 @@ def display_name(j: Job) -> str:
finally:
if owned_progress:
progress.stop()

def url(self, job: Union[str, Job]) -> str:
job_id = job.id if isinstance(job, Job) else job
return f"{self.config.agent_address}/job/{self.url_quote(job_id)}"

0 comments on commit d8307bb

Please sign in to comment.