Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
basnijholt committed Oct 24, 2024
1 parent 9d2378d commit 678fd22
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
3 changes: 3 additions & 0 deletions adaptive_scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Adaptive Scheduler."""

from adaptive_scheduler import client_support, scheduler, server_support, utils
from adaptive_scheduler._executor import SlurmExecutor, SlurmTask
from adaptive_scheduler._version import __version__
from adaptive_scheduler.scheduler import PBS, SLURM
from adaptive_scheduler.server_support import (
Expand All @@ -20,4 +21,6 @@
"SLURM",
"start_one_by_one",
"utils",
"SlurmExecutor",
"SlurmTask",
]
21 changes: 14 additions & 7 deletions adaptive_scheduler/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ class TaskID(NamedTuple):
sequence_index: int


class SLURMTask(Future):
class SlurmTask(Future):
"""A `Future` that loads the result from a `SequenceLearner`."""

__slots__ = ("executor", "task_id", "_state", "_last_mtime", "min_load_interval")

def __init__(
self,
executor: SLURMExecutor,
executor: SlurmExecutor,
task_id: TaskID,
min_load_interval: float = 1.0,
) -> None:
Expand Down Expand Up @@ -178,7 +178,7 @@ async def __aiter__(self) -> Any:


@dataclass
class SLURMExecutor(AdaptiveSchedulerExecutorBase):
class SlurmExecutor(AdaptiveSchedulerExecutorBase):
"""An executor that runs jobs on SLURM.
Similar to `concurrent.futures.Executor`, but for SLURM.
Expand Down Expand Up @@ -327,17 +327,20 @@ def __post_init__(self) -> None:
else:
self.folder = Path(self.folder)

def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> SLURMTask:
def submit(self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any) -> SlurmTask:
if kwargs:
msg = "Keyword arguments are not supported"
raise ValueError(msg)
if fn not in self._sequence_mapping:
self._sequence_mapping[fn] = len(self._sequence_mapping)
if len(args) != 1:
msg = "Exactly one argument is required"
raise ValueError(msg)
sequence = self._sequences.setdefault(fn, [])
i = len(sequence)
sequence.append(args)
sequence.append(args[0])
task_id = TaskID(self._sequence_mapping[fn], i)
return SLURMTask(self, task_id)
return SlurmTask(self, task_id)

def _to_learners(self) -> tuple[list[SequenceLearner], list[Path]]:
learners = []
Expand All @@ -346,10 +349,14 @@ def _to_learners(self) -> tuple[list[SequenceLearner], list[Path]]:
learner = SequenceLearner(func, args_kwargs_list)
learners.append(learner)
assert isinstance(self.folder, Path)
fnames.append(self.folder / f"{func.__name__}.pickle")
name = func.__name__ if hasattr(func, "__name__") else ""
fnames.append(self.folder / f"{name}-{uuid.uuid4().hex}.pickle")
return learners, fnames

def finalize(self, *, start: bool = True) -> adaptive_scheduler.RunManager:
if self._run_manager is not None:
msg = "RunManager already initialized. Create a new SlurmExecutor instance."
raise RuntimeError(msg)
learners, fnames = self._to_learners()
assert self.folder is not None
self._run_manager = adaptive_scheduler.slurm_run(
Expand Down

0 comments on commit 678fd22

Please sign in to comment.