Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor parallelism to support PyTorch and only use shared memory #393

Merged
merged 28 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3553be0
add seedbank with types to deps
mdekstrand May 9, 2024
b257ebe
upgrade parallel code
mdekstrand May 9, 2024
8132426
add top-level imports
mdekstrand May 9, 2024
fc6f959
switch parallel tests to new parallel logic
mdekstrand May 9, 2024
991aada
fix broken torch for large tensors
mdekstrand May 9, 2024
e91ae80
use new invoker for recommend & predict
mdekstrand May 9, 2024
b3f7ae6
re-enable multi-CPU item-item
mdekstrand May 9, 2024
9f80ec1
Wrap tensor serialization to handle CSR tensors.
mdekstrand May 9, 2024
1457563
yeet the old parallel lib
mdekstrand May 9, 2024
88b0a17
remove remaining use of old parallel code
mdekstrand May 9, 2024
548fe82
clean up isolation tests
mdekstrand May 9, 2024
8712a41
remove reference to persist
mdekstrand May 9, 2024
4ba39de
update docs
mdekstrand May 9, 2024
ff217e2
only use torch serialization for the model
mdekstrand May 10, 2024
424637b
remove ALS isolation test
mdekstrand May 10, 2024
b4dd445
switch to process pool executor
mdekstrand May 10, 2024
6cfa6f1
use Torch pool context for muiltiprocessing
mdekstrand May 10, 2024
05f5bb5
Remove unused improt
mdekstrand May 10, 2024
e42daab
overhaul parallel configuration
mdekstrand May 10, 2024
ca96c4f
ignore CSR tensor warnings in worker processes
mdekstrand May 10, 2024
15ae55b
Use shared memory manager for numpy arrays
mdekstrand May 10, 2024
f8fb050
Just use an MP pool for concurrency
mdekstrand May 10, 2024
9072786
rename ensure_init
mdekstrand May 10, 2024
96e4bbd
ensure parallel setup before running torch
mdekstrand May 10, 2024
a57dfe0
configure both layers of PyTorch parallelism
mdekstrand May 10, 2024
3bdb7f7
default to 4 backend threads per thread
mdekstrand May 10, 2024
5562980
speed up large model test
mdekstrand May 10, 2024
c4c68ab
initialize backend threads
mdekstrand May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import logging
import os
import time
import warnings

from seedbank import initialize, numpy_rng
Expand Down
1 change: 0 additions & 1 deletion docs/internals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ Neither LensKit users nor algorithm developers are likely to need to use this
code directly.

.. toctree::
sharing
parallel
13 changes: 4 additions & 9 deletions docs/parallel.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
Parallel Execution
------------------

.. py:module:: lenskit.util.parallel
.. py:module:: lenskit.parallel

LensKit uses :py:class:`concurrent.futures.ProcessPoolExecutor` to paralellize batch
operations (see :py:mod:`lenskit.batch`).
LensKit uses a custom API wrapping :py:class:`multiprocessing.pool.Pool` to
paralellize batch operations (see :py:mod:`lenskit.batch`).

The basic idea of this API is to create an *invoker* that has a model and a function,
and then passing lists of argument sets to the function::
Expand All @@ -13,6 +13,7 @@ and then passing lists of argument sets to the function::
results = list(func.map(args))

The model is persisted into shared memory to be used by the worker processes.
PyTorch tensors, including those on CUDA devices, are shared.

Parallel Model Ops
~~~~~~~~~~~~~~~~~~
Expand All @@ -25,9 +26,3 @@ Parallel Model Ops
:members:


Single Process Isolation
~~~~~~~~~~~~~~~~~~~~~~~~

We also have a single-process isolation function that runs a function in a subprocess.

.. autofunction:: run_sp
38 changes: 0 additions & 38 deletions docs/sharing.rst

This file was deleted.

3 changes: 2 additions & 1 deletion envs/lenskit-py3.10-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies:
- ruff>=0.2
- scikit-learn>=1.1
- scipy>=1.9.0
- seedbank>=0.1.0
- setuptools>=64
- setuptools_scm>=8
- sphinx-autobuild>=2021
Expand All @@ -44,5 +43,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
- unbeheader~=1.3
3 changes: 2 additions & 1 deletion envs/lenskit-py3.10-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ dependencies:
- ruff>=0.2
- scikit-learn>=1.1
- scipy>=1.9.0
- seedbank>=0.1.0
- setuptools>=64
- setuptools_scm>=8
- sphinx-autobuild>=2021
Expand All @@ -53,5 +52,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
- unbeheader~=1.3
3 changes: 2 additions & 1 deletion envs/lenskit-py3.10-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies:
- pandas<3,>=1.5
- pytorch==2.*
- scipy>=1.9.0
- seedbank>=0.1.0
- sphinx>=4.2
- sphinx_rtd_theme>=0.5
- sphinxcontrib-bibtex>=2.0
Expand All @@ -32,4 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
3 changes: 2 additions & 1 deletion envs/lenskit-py3.10-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ dependencies:
- pytest==7.*
- pytorch==2.*
- scipy>=1.9.0
- seedbank>=0.1.0
- tbb
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
3 changes: 2 additions & 1 deletion envs/lenskit-py3.11-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies:
- ruff>=0.2
- scikit-learn>=1.1
- scipy>=1.9.0
- seedbank>=0.1.0
- setuptools>=64
- setuptools_scm>=8
- sphinx-autobuild>=2021
Expand All @@ -44,5 +43,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
- unbeheader~=1.3
3 changes: 2 additions & 1 deletion envs/lenskit-py3.11-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ dependencies:
- ruff>=0.2
- scikit-learn>=1.1
- scipy>=1.9.0
- seedbank>=0.1.0
- setuptools>=64
- setuptools_scm>=8
- sphinx-autobuild>=2021
Expand All @@ -53,5 +52,7 @@ dependencies:
- tqdm>=4
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
- unbeheader~=1.3
3 changes: 2 additions & 1 deletion envs/lenskit-py3.11-doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies:
- pandas<3,>=1.5
- pytorch==2.*
- scipy>=1.9.0
- seedbank>=0.1.0
- sphinx>=4.2
- sphinx_rtd_theme>=0.5
- sphinxcontrib-bibtex>=2.0
Expand All @@ -32,4 +31,6 @@ dependencies:
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
3 changes: 2 additions & 1 deletion envs/lenskit-py3.11-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ dependencies:
- pytest==7.*
- pytorch==2.*
- scipy>=1.9.0
- seedbank>=0.1.0
- tbb
- threadpoolctl>=3.0
- pip
- pip:
- manylog>=0.1.0a3
- progress-api>=0.1.0a6
- seedbank>=0.2.0a1
5 changes: 3 additions & 2 deletions lenskit/algorithms/item_knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import numpy as np
import pandas as pd
import torch
from progress_api import Progress, make_progress
from progress_api import Progress

from lenskit import ConfigWarning, DataWarning, util
from lenskit.data.matrix import DimStats, sparse_ratings, sparse_row_stats
from lenskit.parallel import ensure_parallel_init
from lenskit.util.logging import pbh_update, progress_handle

from . import Predictor
Expand Down Expand Up @@ -465,7 +466,7 @@ def fit(self, ratings, **kwargs):
ratings(pandas.DataFrame):
(user,item,rating) data for computing item similarities.
"""
util.check_env()
ensure_parallel_init()
# Training proceeds in 2 steps:
# 1. Normalize item vectors to be mean-centered and unit-normalized
# 2. Compute similarities with pairwise dot products
Expand Down
1 change: 0 additions & 1 deletion lenskit/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@

from ._predict import predict # noqa: F401
from ._recommend import recommend # noqa: F401
from ._train import train_isolated # noqa: F401
3 changes: 2 additions & 1 deletion lenskit/batch/_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas as pd

from .. import util
from ..parallel import invoker

_logger = logging.getLogger(__name__)
_rec_context = None
Expand Down Expand Up @@ -81,7 +82,7 @@ def predict(algo, pairs, *, n_jobs=None, **kwargs):
nusers = pairs["user"].nunique()

timer = util.Stopwatch()
with util.parallel.invoker(algo, _predict_user, n_jobs=n_jobs) as worker:
with invoker(algo, _predict_user, n_jobs=n_jobs) as worker:
del algo # maybe free some memory

_logger.info(
Expand Down
15 changes: 7 additions & 8 deletions lenskit/batch/_recommend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .. import util
from ..algorithms import Recommender
from ..sharing import PersistedModel
from ..parallel import invoker

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -72,19 +72,18 @@
n_jobs = kwargs["nprocs"]
warnings.warn("nprocs is deprecated, use n_jobs", DeprecationWarning)

if not isinstance(algo, PersistedModel):
rec_algo = Recommender.adapt(algo)
if candidates is None and rec_algo is not algo:
warnings.warn("no candidates provided and algo is not a recommender, unlikely to work")
algo = rec_algo
del rec_algo
rec_algo = Recommender.adapt(algo)
if candidates is None and rec_algo is not algo:
warnings.warn("no candidates provided and algo is not a recommender, unlikely to work")

Check warning on line 77 in lenskit/batch/_recommend.py

View check run for this annotation

Codecov / codecov/patch

lenskit/batch/_recommend.py#L77

Added line #L77 was not covered by tests
algo = rec_algo
del rec_algo

if "ratings" in kwargs:
warnings.warn("Providing ratings to recommend is not supported", DeprecationWarning)

candidates = __standard_cand_fun(candidates)

with util.parallel.invoker(algo, _recommend_user, n_jobs=n_jobs) as worker:
with invoker(algo, _recommend_user, n_jobs=n_jobs) as worker:
_logger.info("recommending with %s for %d users (n_jobs=%s)", str(algo), len(users), n_jobs)
del algo
timer = util.Stopwatch()
Expand Down
62 changes: 0 additions & 62 deletions lenskit/batch/_train.py

This file was deleted.

22 changes: 22 additions & 0 deletions lenskit/parallel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This file is part of LensKit.
# Copyright (C) 2018-2023 Boise State University
# Copyright (C) 2023-2024 Drexel University
# Licensed under the MIT license, see LICENSE.md for details.
# SPDX-License-Identifier: MIT

"""
LensKit parallel computation support.
"""

from __future__ import annotations

from .config import ensure_parallel_init, get_parallel_config, initialize
from .invoker import ModelOpInvoker, invoker

__all__ = [
"initialize",
"get_parallel_config",
"ensure_parallel_init",
"invoker",
"ModelOpInvoker",
]
Loading
Loading