Skip to content

Commit

Permalink
Allow triggering the JobManager loop (#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
basnijholt authored Sep 11, 2024
1 parent 08e45c0 commit 6f3e6c3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
7 changes: 7 additions & 0 deletions adaptive_scheduler/_server_support/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(
# Other attributes
self.n_started = 0
self._request_times: dict[str, str] = {}
self._trigger_event = asyncio.Event()

# Command line launcher options
self.save_dataframe = save_dataframe
Expand Down Expand Up @@ -262,6 +263,7 @@ async def _manage(self) -> None:
if await sleep_unless_task_is_done(
self.database_manager.task, # type: ignore[arg-type]
self.interval,
self._trigger_event,
): # if true, we are done
return
except asyncio.CancelledError: # noqa: PERF203
Expand All @@ -281,5 +283,10 @@ async def _manage(self) -> None:
if await sleep_unless_task_is_done(
self.database_manager.task, # type: ignore[arg-type]
5,
self._trigger_event,
): # if true, we are done
return

def trigger(self) -> None:
"""External method to trigger the _manage loop to continue."""
self._trigger_event.set()
23 changes: 17 additions & 6 deletions adaptive_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,22 +1187,33 @@ def _time_between(start: str, end: str) -> float:
async def sleep_unless_task_is_done(
task: asyncio.Task,
sleep_duration: float,
trigger_event: asyncio.Event | None = None,
) -> bool:
"""Sleep for an interval, unless the task is done before then."""
"""Sleep for an interval, unless the task or a trigger event is done earlier."""
# Create the sleep task separately
sleep_task = asyncio.create_task(asyncio.sleep(sleep_duration))
tasks_to_wait = [sleep_task, task]

# Await both the sleep_task and the passed task
# Add the trigger_event to the wait list if provided
if trigger_event:
# Create a task for the event wait
event_task = asyncio.create_task(trigger_event.wait())
tasks_to_wait.append(event_task)

# Await all tasks until any one is done
done, pending = await asyncio.wait(
[sleep_task, task],
tasks_to_wait,
return_when=asyncio.FIRST_COMPLETED,
)

# Cancel only the sleep_task if it's pending
# Cancel only the sleep task if it's pending
if sleep_task in pending:
sleep_task.cancel()
return True # means that the task is done
return False

if trigger_event is not None and event_task.done():
trigger_event.clear()

return task.done()


def _update_progress_for_paths(
Expand Down
2 changes: 1 addition & 1 deletion adaptive_scheduler/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def _info_html(run_manager: RunManager) -> str:
n_done = sum(1 for job in dbm.as_dicts() if job["is_done"])
n_failed = len(dbm.failed)
n_failed_color = "red" if n_failed > 0 else "black"
n_unscheduled = len(dbm.learners) - n_running - n_pending - n_done
n_unscheduled = len(dbm.fnames) - n_running - n_pending - n_done
n_unscheduled_color = "orange" if n_unscheduled > 0 else "black"

status = run_manager.status()
Expand Down

0 comments on commit 6f3e6c3

Please sign in to comment.