-
Notifications
You must be signed in to change notification settings - Fork 12
Add initial MultiRunManager
#242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
66f6fc7
Add MultiRunManager
basnijholt 06f8e01
test
basnijholt cc3e3eb
Merge remote-tracking branch 'origin/main' into multi-run-manager
basnijholt e2fe1e0
.
basnijholt 75bdfae
.
basnijholt a490bf2
.
basnijholt 4f14ab9
.
basnijholt bb61bda
executor
basnijholt ac33e8d
fix types
basnijholt a9d7126
awaitable
basnijholt 4d5b22b
throttle
basnijholt bfe3d59
await
basnijholt fc2fe45
set_result
basnijholt b4b2844
throttle load
basnijholt 94ea467
set
basnijholt e78c5d2
slots
basnijholt f41cea6
.
basnijholt b8aec70
order
basnijholt 5f2a8b6
.
basnijholt 6930961
docs
basnijholt 59558ca
.
basnijholt 24b475f
commemt
basnijholt 956b9aa
add str
basnijholt 67af153
TaskID
basnijholt 9d2378d
cleanup
basnijholt 678fd22
rename
basnijholt 728d330
add new
basnijholt 860e5f9
revert
basnijholt a4f485c
check filesize instead
basnijholt 6390f15
.
basnijholt e8a8784
fix load
basnijholt 4da2e42
Auto-scale load time
basnijholt b7f3efa
.
basnijholt 6fd8c64
Merge remote-tracking branch 'origin/main' into multi-run-manager
basnijholt a212dbb
fix
basnijholt 272ec5d
multiple args
basnijholt ce7dcbc
rename
basnijholt 483cd33
Merge remote-tracking branch 'origin/main' into multi-run-manager
basnijholt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
from __future__ import annotations | ||
|
||
import abc | ||
import asyncio | ||
import uuid | ||
from concurrent.futures import Executor, Future | ||
from dataclasses import dataclass, field | ||
from pathlib import Path | ||
from typing import TYPE_CHECKING, Any, Literal | ||
|
||
from adaptive import SequenceLearner | ||
|
||
import adaptive_scheduler | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Callable, Iterable | ||
|
||
from adaptive_scheduler.utils import _DATAFRAME_FORMATS, EXECUTOR_TYPES, GoalTypes | ||
|
||
|
||
class AdaptiveSchedulerExecutorBase(Executor): | ||
_run_manager: adaptive_scheduler.RunManager | None | ||
|
||
@abc.abstractmethod | ||
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> Future: | ||
pass | ||
|
||
@abc.abstractmethod | ||
def finalize(self, *, start: bool = True) -> adaptive_scheduler.RunManager: | ||
"""Finalize the executor and return the RunManager.""" | ||
|
||
def map( # type: ignore[override] | ||
self, | ||
fn: Callable[..., Any], | ||
/, | ||
*iterables: Iterable[Any], | ||
timeout: float | None = None, | ||
chunksize: int = 1, | ||
) -> list[Future]: | ||
tasks = [] | ||
if timeout is not None: | ||
msg = "Timeout not implemented" | ||
raise NotImplementedError(msg) | ||
if chunksize != 1: | ||
msg = "Chunksize not implemented" | ||
raise NotImplementedError(msg) | ||
for args in zip(*iterables, strict=True): | ||
task = self.submit(fn, *args) | ||
tasks.append(task) | ||
return tasks | ||
|
||
def shutdown( | ||
self, | ||
wait: bool = True, # noqa: FBT001, FBT002 | ||
*, | ||
cancel_futures: bool = False, | ||
) -> None: | ||
if not wait: | ||
msg = "Non-waiting shutdown not implemented" | ||
raise NotImplementedError(msg) | ||
if cancel_futures: | ||
msg = "Cancelling futures not implemented" | ||
raise NotImplementedError(msg) | ||
if self._run_manager is not None: | ||
self._run_manager.cancel() | ||
|
||
|
||
class SLURMTask(Future): | ||
def __init__(self, executor: SLURMExecutor, id_: tuple[int, int]) -> None: | ||
super().__init__() | ||
self.executor = executor | ||
self.id_ = id_ | ||
self._state: Literal["PENDING", "RUNNING", "FINISHED", "CANCELLED"] = "PENDING" | ||
|
||
def _get(self) -> Any | None: | ||
"""Updates the state of the task and returns the result if the task is finished.""" | ||
index = self.id_[1] | ||
learner = self._learner() | ||
if index in learner.data: | ||
self._state = "FINISHED" | ||
else: | ||
self._state = "PENDING" | ||
return None | ||
return learner.data[index] | ||
|
||
def __repr__(self) -> str: | ||
if self._state == "PENDING": | ||
self._get() | ||
return f"SLURMTask(id_={self.id_}, state={self._state})" | ||
|
||
def _learner(self, *, load: bool = True) -> SequenceLearner: | ||
i_learner, _ = self.id_ | ||
assert self.executor._run_manager is not None | ||
learner = self.executor._run_manager.learners[i_learner] | ||
if load and not learner.done(): | ||
fname = self.executor._run_manager.fnames[i_learner] | ||
learner.load(fname) | ||
return learner | ||
|
||
def result(self, timeout: float | None = None) -> Any: | ||
if timeout is not None: | ||
msg = "Timeout not implemented for SLURMTask" | ||
raise NotImplementedError(msg) | ||
if self.executor._run_manager is None: | ||
msg = "RunManager not initialized. Call finalize() first." | ||
raise RuntimeError(msg) | ||
result = self._get() | ||
if self._state == "FINISHED": | ||
return result | ||
msg = "Task not finished" | ||
raise RuntimeError(msg) | ||
|
||
def __await__(self) -> Any: | ||
def wakeup() -> None: | ||
if not self.done(): | ||
self._get() | ||
loop.call_soon(wakeup) | ||
else: | ||
fut.set_result(None) | ||
|
||
loop = asyncio.get_event_loop() | ||
fut = loop.create_future() | ||
loop.call_soon(wakeup) | ||
yield from fut | ||
return self.result() | ||
|
||
async def __aiter__(self) -> Any: | ||
await self | ||
|
||
return self.result() | ||
|
||
|
||
@dataclass | ||
class SLURMExecutor(AdaptiveSchedulerExecutorBase): | ||
partition: str | None = None | ||
nodes: int | None = 1 | ||
cores_per_node: int | None = 1 | ||
goal: GoalTypes | None = None | ||
folder: Path | None = None | ||
name: str = "adaptive" | ||
num_threads: int = 1 | ||
save_interval: float = 300 | ||
log_interval: float = 300 | ||
job_manager_interval: float = 60 | ||
cleanup_first: bool = True | ||
save_dataframe: bool = True | ||
dataframe_format: _DATAFRAME_FORMATS = "pickle" | ||
max_fails_per_job: int = 50 | ||
max_simultaneous_jobs: int = 100 | ||
exclusive: bool = False | ||
executor_type: EXECUTOR_TYPES = "process-pool" | ||
extra_scheduler: list[str] | None = None | ||
extra_run_manager_kwargs: dict[str, Any] | None = None | ||
extra_scheduler_kwargs: dict[str, Any] | None = None | ||
_sequences: dict[Callable[..., Any], list[Any]] = field(default_factory=dict) | ||
_sequence_mapping: dict[Callable[..., Any], int] = field(default_factory=dict) | ||
_quiet: bool = True | ||
_run_manager: adaptive_scheduler.RunManager | None = None | ||
|
||
def __post_init__(self) -> None: | ||
if self.folder is None: | ||
self.folder = Path.cwd() / ".adaptive_scheduler" / uuid.uuid4().hex # type: ignore[operator] | ||
else: | ||
self.folder = Path(self.folder) | ||
|
||
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> SLURMTask: | ||
if kwargs: | ||
msg = "Keyword arguments are not supported" | ||
raise ValueError(msg) | ||
if fn not in self._sequence_mapping: | ||
self._sequence_mapping[fn] = len(self._sequence_mapping) | ||
sequence = self._sequences.setdefault(fn, []) | ||
i = len(sequence) | ||
sequence.append(args) | ||
id_ = (self._sequence_mapping[fn], i) | ||
return SLURMTask(self, id_) | ||
|
||
def _to_learners(self) -> tuple[list[SequenceLearner], list[Path]]: | ||
learners = [] | ||
fnames = [] | ||
for func, args_kwargs_list in self._sequences.items(): | ||
learner = SequenceLearner(func, args_kwargs_list) | ||
learners.append(learner) | ||
assert self.folder is not None | ||
fnames.append(self.folder / f"{func.__name__}.pickle") | ||
return learners, fnames | ||
|
||
def finalize(self, *, start: bool = True) -> adaptive_scheduler.RunManager: | ||
learners, fnames = self._to_learners() | ||
assert self.folder is not None | ||
self._run_manager = adaptive_scheduler.slurm_run( | ||
learners=learners, | ||
fnames=fnames, | ||
partition=self.partition, | ||
nodes=self.nodes, | ||
cores_per_node=self.cores_per_node, | ||
goal=self.goal, | ||
folder=self.folder, | ||
name=self.name, | ||
num_threads=self.num_threads, | ||
save_interval=self.save_interval, | ||
log_interval=self.log_interval, | ||
job_manager_interval=self.job_manager_interval, | ||
cleanup_first=self.cleanup_first, | ||
save_dataframe=self.save_dataframe, | ||
dataframe_format=self.dataframe_format, | ||
max_fails_per_job=self.max_fails_per_job, | ||
max_simultaneous_jobs=self.max_simultaneous_jobs, | ||
exclusive=self.exclusive, | ||
executor_type=self.executor_type, | ||
extra_scheduler=self.extra_scheduler, | ||
extra_run_manager_kwargs=self.extra_run_manager_kwargs, | ||
extra_scheduler_kwargs=self.extra_scheduler_kwargs, | ||
quiet=self._quiet, | ||
) | ||
if start: | ||
self._run_manager.start() | ||
return self._run_manager |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.