diff --git a/.vscode/ltex.dictionary.en-US.txt b/.vscode/ltex.dictionary.en-US.txt index eba16ce28..9fc80886b 100644 --- a/.vscode/ltex.dictionary.en-US.txt +++ b/.vscode/ltex.dictionary.en-US.txt @@ -4,3 +4,7 @@ YYYY CalVer backported semver-stable +invokers +lenskit +invoker +CUDA diff --git a/.vscode/ltex.hiddenFalsePositives.en-US.txt b/.vscode/ltex.hiddenFalsePositives.en-US.txt index 9bcc82e76..394fdfe8d 100644 --- a/.vscode/ltex.hiddenFalsePositives.en-US.txt +++ b/.vscode/ltex.hiddenFalsePositives.en-US.txt @@ -1 +1,2 @@ {"rule":"COMMA_COMPOUND_SENTENCE","sentence":"^\\QThis combination does mean that we may sometimes release a minor revision with the previous\nyear's major version number, if there are breaking changes in progress but not yet ready for\nrelease and we need to release new features or fixes for the current major version.\\E$"} +{"rule":"MISSING_GENITIVE","sentence":"^\\QPyTorch tensors, including those on CUDA devices, are shared.\\E$"} diff --git a/conftest.py b/conftest.py index b2b43dadd..3d455e7c5 100644 --- a/conftest.py +++ b/conftest.py @@ -11,6 +11,7 @@ from seedbank import initialize, numpy_rng from pytest import fixture +from hypothesis import settings logging.getLogger("numba").setLevel(logging.INFO) @@ -52,3 +53,6 @@ def pytest_collection_modifyitems(items): if evm is not None and slm is None: _log.debug("adding slow mark to %s", item) item.add_marker("slow") + + +settings.register_profile("default", deadline=1000) diff --git a/docs/batch.rst b/docs/batch.rst index 782028fa4..c7c97e664 100644 --- a/docs/batch.rst +++ b/docs/batch.rst @@ -1,3 +1,5 @@ +.. _batch:: + Batch-Running Recommenders ========================== diff --git a/docs/conf.py b/docs/conf.py index e90e0e32a..ad3c2514f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -121,6 +121,9 @@ "binpickle": ("https://binpickle.lenskit.org/en/stable/", None), "csr": ("https://csr.lenskit.org/en/latest/", None), "seedbank": ("https://seedbank.lenskit.org/en/latest/", None), + "progress_api": ("https://progress-api.readthedocs.io/en/latest/", None), + "manylog": ("https://manylog.readthedocs.io/en/latest/", None), + "torch": ("https://pytorch.org/docs/stable/", None), } autodoc_default_options = {"members": True, "member-order": "bysource", "show-inheritance": True} diff --git a/docs/index.rst b/docs/index.rst index 4ed6bcf5a..463fb2fb9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -43,6 +43,7 @@ Resources batch evaluation/index documenting + parallel .. toctree:: :maxdepth: 1 diff --git a/docs/internals.rst b/docs/internals.rst index 4912867c5..2934e5fcf 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -4,6 +4,3 @@ LensKit Internals These modules are primarily for internal infrastructural support in Lenskit. Neither LensKit users nor algorithm developers are likely to need to use this code directly. - -.. toctree:: - parallel diff --git a/docs/parallel.rst b/docs/parallel.rst index 13eab4360..b69abe67c 100644 --- a/docs/parallel.rst +++ b/docs/parallel.rst @@ -3,8 +3,70 @@ Parallel Execution .. py:module:: lenskit.parallel +LensKit supports various forms of parallel execution, each with an environment +variable controlling its : + +- :doc:`Batch operations ` using :doc:`multi-process execution `. +- Parallel model training. For most models provided by LensKit, this is usually + implemented using PyTorch JIT parallelism (:func:`torch.jit.fork`). +- Parallel computation in the various backends (BLAS, MKL, Torch, etc.). + +Other models compatible with LensKit may use their own parallel processing logic. + +Configuring Parallelism +~~~~~~~~~~~~~~~~~~~~~~~ + +LensKit provides 4 knobs for configuring parallelism, each of which has a +corresponding environment variable and parameter to :py:func:`initialize`. The +environment variables are: + +.. envvar:: LK_NUM_PROCS + + The number of processes to use for batch operations. Defaults to the number + of CPUs or 4, whichever is lower. + +.. envvar:: LK_NUM_THREADS + + The number of threads to use for parallel model building. Defaults to the + number of CPUs or 8, whichever is smaller. + + This number is passed to :func:`torch.set_num_interop_threads` to set up the + Torch JIT thread count. + +.. envvar:: LK_NUM_BACKEND_THREADS + + The number of threads to be used by backend compute engines. Defaults to up + to 4 backend threads per training thread, depending on the capacity of the + machine:: + + max(min(NCPUS // LK_NUM_THREADS, 4), 1) + + This is passed to :func:`torch.set_num_threads` (to control PyTorch internal + parallelism), and to the underlying BLAS layer (via `threadpoolctl`_). + +.. envvar:: LK_NUM_CHILD_THREADS + + The number of backend threads to be used in worker processes spawned by + batch evaluation. Defaults to 4 per process, capped by the number of CPUs + available:: + + max(min(NCPUS // LK_NUM_PROCS, 4), 1) + + Workers have both the process and thread counts set to 1. + +.. _threadpoolctl: https://github.com/joblib/threadpoolctl + +.. autofunction:: initialize + +.. autofunction:: ensure_parallel_init + +.. _parallel-model-ops:: + +Parallel Model Ops +~~~~~~~~~~~~~~~~~~ + LensKit uses a custom API wrapping :py:class:`multiprocessing.pool.Pool` to -paralellize batch operations (see :py:mod:`lenskit.batch`). +parallelize batch operations (see :py:mod:`lenskit.batch`). The basic idea of this API is to create an *invoker* that has a model and a function, and then passing lists of argument sets to the function:: @@ -15,14 +77,20 @@ and then passing lists of argument sets to the function:: The model is persisted into shared memory to be used by the worker processes. PyTorch tensors, including those on CUDA devices, are shared. -Parallel Model Ops -~~~~~~~~~~~~~~~~~~ +LensKit users will generally not need to directly use parallel op invokers, but +if you are implementing new batch operations with parallelism they are useful. +They may also be useful for other kinds of analysis. .. autofunction:: invoker -.. autofunction:: proc_count - .. autoclass:: ModelOpInvoker :members: +Logging and Progress +~~~~~~~~~~~~~~~~~~~~ + +Multi-process op invokers automatically set up logging and progress reporting to +work across processes using the :py:mod:`manylog` package. Op invokers can also +report the progress of queued jobs to a :py:class:`progress_api.Progress`. +.. autofunction:: invoke_progress diff --git a/envs/lenskit-py3.10-ci.yaml b/envs/lenskit-py3.10-ci.yaml index 52a151bbd..76dfb8ce0 100644 --- a/envs/lenskit-py3.10-ci.yaml +++ b/envs/lenskit-py3.10-ci.yaml @@ -43,7 +43,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.10-dev.yaml b/envs/lenskit-py3.10-dev.yaml index 50b1b9e47..d6d552c92 100644 --- a/envs/lenskit-py3.10-dev.yaml +++ b/envs/lenskit-py3.10-dev.yaml @@ -52,7 +52,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.10-doc.yaml b/envs/lenskit-py3.10-doc.yaml index 695df08bb..b17c55bda 100644 --- a/envs/lenskit-py3.10-doc.yaml +++ b/envs/lenskit-py3.10-doc.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.10-test.yaml b/envs/lenskit-py3.10-test.yaml index ad2b0dd50..2001c153d 100644 --- a/envs/lenskit-py3.10-test.yaml +++ b/envs/lenskit-py3.10-test.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.11-ci.yaml b/envs/lenskit-py3.11-ci.yaml index e50554765..affbcba54 100644 --- a/envs/lenskit-py3.11-ci.yaml +++ b/envs/lenskit-py3.11-ci.yaml @@ -43,7 +43,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.11-dev.yaml b/envs/lenskit-py3.11-dev.yaml index f6823bfdf..69392a96d 100644 --- a/envs/lenskit-py3.11-dev.yaml +++ b/envs/lenskit-py3.11-dev.yaml @@ -52,7 +52,7 @@ dependencies: - tqdm>=4 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 - unbeheader~=1.3 diff --git a/envs/lenskit-py3.11-doc.yaml b/envs/lenskit-py3.11-doc.yaml index d7efce6f3..819a54c0e 100644 --- a/envs/lenskit-py3.11-doc.yaml +++ b/envs/lenskit-py3.11-doc.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/envs/lenskit-py3.11-test.yaml b/envs/lenskit-py3.11-test.yaml index a6de0af20..7e9c3d6b1 100644 --- a/envs/lenskit-py3.11-test.yaml +++ b/envs/lenskit-py3.11-test.yaml @@ -31,6 +31,6 @@ dependencies: - threadpoolctl>=3.0 - pip - pip: - - manylog>=0.1.0a3 - - progress-api>=0.1.0a6 + - manylog>=0.1.0a5 + - progress-api>=0.1.0a9 - seedbank>=0.2.0a1 diff --git a/lenskit/batch/_predict.py b/lenskit/batch/_predict.py index 5c571fca1..fc76dd2ec 100644 --- a/lenskit/batch/_predict.py +++ b/lenskit/batch/_predict.py @@ -10,10 +10,9 @@ import pandas as pd from .. import util -from ..parallel import invoker +from ..parallel import invoke_progress, invoker _logger = logging.getLogger(__name__) -_rec_context = None def _predict_user(model, req): @@ -82,7 +81,11 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs): nusers = pairs["user"].nunique() timer = util.Stopwatch() - with invoker(algo, _predict_user, n_jobs=n_jobs) as worker: + nusers = pairs["user"].nunique() + with ( + invoke_progress(_logger, "predictions", nusers, unit="user") as progress, + invoker(algo, _predict_user, n_jobs=n_jobs, progress=progress) as worker, + ): del algo # maybe free some memory _logger.info( diff --git a/lenskit/batch/_recommend.py b/lenskit/batch/_recommend.py index 0ae0049e2..c3eca1a44 100644 --- a/lenskit/batch/_recommend.py +++ b/lenskit/batch/_recommend.py @@ -9,10 +9,11 @@ import numpy as np import pandas as pd +from progress_api import make_progress from .. import util from ..algorithms import Recommender -from ..parallel import invoker +from ..parallel import invoke_progress, invoker _logger = logging.getLogger(__name__) @@ -83,8 +84,11 @@ def recommend(algo, users, n, candidates=None, *, n_jobs=None, **kwargs): candidates = __standard_cand_fun(candidates) - with invoker(algo, _recommend_user, n_jobs=n_jobs) as worker: - _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) + _logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs) + with ( + invoke_progress(_logger, "recommending", len(users), unit="user") as progress, + invoker(algo, _recommend_user, n_jobs=n_jobs, progress=progress) as worker, + ): del algo timer = util.Stopwatch() results = worker.map((user, n, candidates(user)) for user in users) diff --git a/lenskit/parallel/__init__.py b/lenskit/parallel/__init__.py index 00bada6b3..7944933f4 100644 --- a/lenskit/parallel/__init__.py +++ b/lenskit/parallel/__init__.py @@ -11,12 +11,13 @@ from __future__ import annotations from .config import ensure_parallel_init, get_parallel_config, initialize -from .invoker import ModelOpInvoker, invoker +from .invoker import ModelOpInvoker, invoke_progress, invoker __all__ = [ "initialize", "get_parallel_config", "ensure_parallel_init", "invoker", + "invoke_progress", "ModelOpInvoker", ] diff --git a/lenskit/parallel/config.py b/lenskit/parallel/config.py index 85692e826..e03982c12 100644 --- a/lenskit/parallel/config.py +++ b/lenskit/parallel/config.py @@ -43,28 +43,18 @@ def initialize( Args: processes: - The number of processes to use for multiprocessing evaluations. - Configured from ``LK_NUM_PROCS``. Defaults to the number of CPUs or - 4, whichever is smaller. + The number of processes to use for multiprocessing evaluations (see + :envvar:`LK_NUM_PROCS`) threads: The number of threads to use for parallel model training and similar - operations. This is passed to - :func:`torch.set_num_interop_threads`. Environment variable is - ``LK_NUM_THREADS``. Defaults to the number of CPUs or 8, whichever - is smaller, to avoid runaway thread coordination overhead on large - machines. + operations (see :envvar:`LK_NUM_THREADS`). backend_threads: - The number of threads underlying computational engines should use. - This is passed to :func:`torch.set_num_threads` and to the BLAS - threading layer. Configured from ``LK_NUM_BACKEND_THREADS``. + The number of threads underlying computational engines should use + (see :envvar:`LK_NUM_BACKEND_THREADS`). child_threads: The number of threads backends are allowed to use in the worker - processes in multiprocessing operations. This is like - ``backend_threads``, except it is passed to the underlying libraries - in worker processes. Environment variable is - ``LK_NUM_CHILD_THREADS``. Defaults is computed from the number of - CPUs with a max of 4 threads per worker. Child processes set both - ``processes`` and ``threads`` to 1. + processes in multiprocessing operations (see + :envvar:`LK_NUM_CHILD_THREADS`). """ global _config if _config: @@ -137,6 +127,6 @@ def _resolve_parallel_config( backend_threads = max(min(ncpus // threads, 4), 1) if child_threads is None: - child_threads = min(ncpus // processes, 4) + child_threads = max(min(ncpus // processes, 4), 1) return ParallelConfig(processes, threads, backend_threads, child_threads) diff --git a/lenskit/parallel/invoker.py b/lenskit/parallel/invoker.py index e8655dab9..c2fb4f266 100644 --- a/lenskit/parallel/invoker.py +++ b/lenskit/parallel/invoker.py @@ -8,8 +8,11 @@ from __future__ import annotations from abc import ABC, abstractmethod +from logging import Logger from typing import Any, Callable, Generic, Iterable, Iterator, Optional, TypeAlias, TypeVar +from progress_api import Progress, make_progress + from lenskit.parallel.config import ensure_parallel_init, get_parallel_config M = TypeVar("M") @@ -18,20 +21,46 @@ InvokeOp: TypeAlias = Callable[[M, A], R] +def invoke_progress( + logger: str | Logger | None = None, + label: str | None = None, + total: int | None = None, + unit: str | None = None, +) -> Progress: + """ + Create a progress bar for parallel tasks. It is populated with the + correct state of tasks for :func:`invoker`. + + See :func:`make_progress` for details on parameter meanings. + """ + return make_progress( + logger, label, total, outcomes="finished", states=["in-progress", "dispatched"], unit=unit + ) + + def invoker( model: M, func: InvokeOp[M, A, R], n_jobs: Optional[int] = None, + progress: Progress | None = None, ) -> ModelOpInvoker[A, R]: """ Get an appropriate invoker for performing operations on ``model``. Args: - model(obj): The model object on which to perform operations. - func(function): The function to call. The function must be pickleable. - n_jobs(int or None): + model: The model object on which to perform operations. + func: The function to call. The function must be pickleable. + n_jobs: The number of processes to use for parallel operations. If ``None``, will call :func:`proc_count` with a maximum default process count of 4. + progress: + A progress bar to use to report status. It should have the following states: + + * dispatched + * in-progress + * finished + + One can be created with :func:`invoke_progress` Returns: ModelOpInvoker: @@ -44,11 +73,11 @@ def invoker( if n_jobs == 1: from .sequential import InProcessOpInvoker - return InProcessOpInvoker(model, func) + return InProcessOpInvoker(model, func, progress) else: from .pool import ProcessPoolOpInvoker - return ProcessPoolOpInvoker(model, func, n_jobs) + return ProcessPoolOpInvoker(model, func, n_jobs, progress) class ModelOpInvoker(ABC, Generic[A, R]): diff --git a/lenskit/parallel/pool.py b/lenskit/parallel/pool.py index d65ce28d0..7e73e81a2 100644 --- a/lenskit/parallel/pool.py +++ b/lenskit/parallel/pool.py @@ -9,13 +9,13 @@ import logging import multiprocessing as mp -import pickle from concurrent.futures import ProcessPoolExecutor from multiprocessing.managers import SharedMemoryManager from typing import Generic, Iterable, Iterator import manylog import seedbank +from progress_api import Progress, null_progress from . import worker from .config import get_parallel_config @@ -27,23 +27,28 @@ class ProcessPoolOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): + progress: Progress manager: SharedMemoryManager pool: ProcessPoolExecutor - def __init__(self, model: M, func: InvokeOp[M, A, R], n_jobs: int): + def __init__( + self, model: M, func: InvokeOp[M, A, R], n_jobs: int, progress: Progress | None = None + ): _log.debug("persisting function") ctx = mp.get_context("spawn") _log.info("setting up process pool w/ %d workers", n_jobs) kid_tc = get_parallel_config().child_threads seed = seedbank.root_seed() - log_addr = ensure_log_listener() + manylog.initialize() + self.progress = progress or null_progress() self.manager = SharedMemoryManager() self.manager.start() + prog_uuid = manylog.share_progress(self.progress) try: - cfg = worker.WorkerConfig(kid_tc, seed, log_addr) - job = worker.WorkerContext(func, model) + cfg = worker.WorkerConfig(kid_tc, seed) + job = worker.WorkerContext(func, model, prog_uuid) job = shm_serialize(job, self.manager) self.pool = ProcessPoolExecutor(n_jobs, ctx, worker.initalize, (cfg, job)) except Exception as e: @@ -51,7 +56,15 @@ def __init__(self, model: M, func: InvokeOp[M, A, R], n_jobs: int): raise e def map(self, tasks: Iterable[A]) -> Iterator[R]: - return self.pool.map(worker.worker, tasks) + return self.pool.map(worker.worker, self._task_iter(tasks)) + + def _task_iter(self, tasks: Iterable[A]): + """ + Yield the tasks, recording each as dispatched before it is yielded. + """ + for task in tasks: + self.progress.update(1, "dispatched") + yield task def shutdown(self): self.pool.shutdown() diff --git a/lenskit/parallel/sequential.py b/lenskit/parallel/sequential.py index eb3ed7a2c..5fe95cc49 100644 --- a/lenskit/parallel/sequential.py +++ b/lenskit/parallel/sequential.py @@ -9,6 +9,8 @@ import logging from typing import Generic, Iterable, Iterator +from progress_api import Progress, null_progress + from .invoker import A, InvokeOp, M, ModelOpInvoker, R _log = logging.getLogger(__name__) @@ -17,15 +19,20 @@ class InProcessOpInvoker(ModelOpInvoker[A, R], Generic[M, A, R]): model: M function: InvokeOp[M, A, R] + progress: Progress | None = None - def __init__(self, model: M, func: InvokeOp[M, A, R]): + def __init__(self, model: M, func: InvokeOp[M, A, R], progress: Progress | None = None): _log.info("setting up in-process worker") self.model = model self.function = func + self.progress = progress or null_progress() def map(self, tasks: Iterable[A]) -> Iterator[R]: - proc = ft.partial(self.function, self.model) - return map(proc, tasks) + for task in tasks: + self.progress.update(1, "in-progress") + res = self.function(self.model, task) + self.progress.update(1, "finished", "in-progress") + yield res def shutdown(self): del self.model diff --git a/lenskit/parallel/worker.py b/lenskit/parallel/worker.py index 3e12c1a61..69c916204 100644 --- a/lenskit/parallel/worker.py +++ b/lenskit/parallel/worker.py @@ -12,10 +12,12 @@ import pickle import warnings from typing import Any +from uuid import UUID import manylog import seedbank from numpy.random import SeedSequence +from progress_api import Progress from typing_extensions import Generic, NamedTuple from .config import initialize as init_parallel @@ -26,22 +28,23 @@ __work_context: WorkerContext +__progress: Progress class WorkerConfig(NamedTuple): threads: int seed: SeedSequence - log_addr: str class WorkerContext(NamedTuple, Generic[M, A, R]): func: InvokeOp[M, A, R] model: M + progress: UUID def initalize(cfg: WorkerConfig, ctx: ModelData) -> None: - global __work_context - manylog.init_worker_logging(cfg.log_addr) + global __work_context, __progress + manylog.initialize() init_parallel(processes=1, threads=1, backend_threads=cfg.threads, child_threads=1) seed = seedbank.derive_seed(mp.current_process().name, base=cfg.seed) @@ -54,9 +57,13 @@ def initalize(cfg: WorkerConfig, ctx: ModelData) -> None: _log.error("deserialization failed: %s", e) raise e + __progress = manylog.connect_progress(__work_context.progress) + _log.debug("worker %d ready (process %s)", os.getpid(), mp.current_process()) def worker(arg: Any) -> Any: + __progress.update(1, "in-progress", "dispatched") res = __work_context.func(__work_context.model, arg) + __progress.update(1, "finished", "in-progress") return res diff --git a/pyproject.toml b/pyproject.toml index 24a1c16fa..95cd78940 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ "threadpoolctl >=3.0", "binpickle >= 0.3.2", "seedbank >= 0.2.0a1", # p2c: -p - "progress-api >=0.1.0a6", # p2c: -p - "manylog >=0.1.0a3", # p2c: -p + "progress-api >=0.1.0a9", # p2c: -p + "manylog >=0.1.0a5", # p2c: -p "csr >= 0.5", ]