diff --git a/lenskit-funksvd/tests/test_funksvd.py b/lenskit-funksvd/tests/test_funksvd.py index 002152b14..49bdb5f40 100644 --- a/lenskit-funksvd/tests/test_funksvd.py +++ b/lenskit-funksvd/tests/test_funksvd.py @@ -13,10 +13,11 @@ from pytest import approx, mark -from lenskit.data import Dataset, ItemList, from_interactions_df -from lenskit.data.bulk import dict_to_df, iter_item_lists +from lenskit import batch +from lenskit.data import Dataset, ItemList, ItemListCollection, UserIDKey, from_interactions_df from lenskit.funksvd import FunkSVDScorer from lenskit.metrics import call_metric, quick_measure_model +from lenskit.pipeline.common import predict_pipeline from lenskit.testing import BasicComponentTests, ScorerTests, wantjit _log = logging.getLogger(__name__) @@ -169,15 +170,17 @@ def test_fsvd_save_load(ml_ds: Dataset): def test_fsvd_known_preds(ml_ds: Dataset): algo = FunkSVDScorer(15, iterations=125, lrate=0.001) _log.info("training %s on ml data", algo) - algo.train(ml_ds) + pipe = predict_pipeline(algo, fallback=False) + pipe.train(ml_ds) dir = Path(__file__).parent pred_file = dir / "funksvd-preds.csv" _log.info("reading known predictions from %s", pred_file) known_preds = pd.read_csv(str(pred_file)) + known = ItemListCollection.from_df(known_preds, UserIDKey) - preds = {u: algo(u, il) for (u, il) in iter_item_lists(known_preds)} - preds = dict_to_df(preds) + preds = batch.predict(pipe, known, n_jobs=1) + preds = preds.to_df().drop(columns=["prediction"], errors="ignore") known_preds.rename(columns={"prediction": "expected"}, inplace=True) merged = pd.merge(known_preds, preds) diff --git a/lenskit/lenskit/batch/__init__.py b/lenskit/lenskit/batch/__init__.py index 301a4e98c..4a25fa95e 100644 --- a/lenskit/lenskit/batch/__init__.py +++ b/lenskit/lenskit/batch/__init__.py @@ -10,9 +10,81 @@ from __future__ import annotations -from ._predict import predict -from ._recommend import recommend +from typing import Mapping, Sequence + +from lenskit.data import ID, GenericKey, ItemList, ItemListCollection, UserIDKey +from lenskit.pipeline import Pipeline + from ._results import BatchResults from ._runner import BatchPipelineRunner, InvocationSpec __all__ = ["BatchPipelineRunner", "BatchResults", "InvocationSpec", "predict", "recommend"] + + +def predict( + pipeline: Pipeline, + test: ItemListCollection[GenericKey] | Mapping[ID, ItemList], + *, + n_jobs: int | None = None, + **kwargs, +) -> ItemListCollection[GenericKey]: + """ + Convenience function to batch-generate rating predictions (or other per-item + scores) from a pipeline. This is a batch version of :func:`lenskit.predict`. + + Stability: + Caller + """ + + runner = BatchPipelineRunner(n_jobs=n_jobs) + runner.predict() + outs = runner.run(pipeline, test) + return outs.output("predictions") # type: ignore + + +def score( + pipeline: Pipeline, + test: ItemListCollection[GenericKey] | Mapping[ID, ItemList], + *, + n_jobs: int | None = None, + **kwargs, +) -> ItemListCollection[GenericKey]: + """ + Convenience function to batch-generate personalized scores from a pipeline. + This is a batch version of :func:`lenskit.predict`. + + Stability: + Caller + """ + + runner = BatchPipelineRunner(n_jobs=n_jobs) + runner.score() + outs = runner.run(pipeline, test) + return outs.output("scores") # type: ignore + + +def recommend( + pipeline: Pipeline, + users: Sequence[ID | UserIDKey], + n: int | None = None, + candidates=None, + *, + n_jobs: int | None = None, + **kwargs, +) -> ItemListCollection[UserIDKey]: + """ + Convenience function to batch-generate recommendations from a pipeline. This + is a batch version of :func:`lenskit.recommend`. + + .. todo:: + + Support more inputs than just user IDs. + + Stability: + Caller + """ + + runner = BatchPipelineRunner(n_jobs=n_jobs) + runner.recommend(n=n) + outs = runner.run(pipeline, users) + return outs.output("recommendations") # type: ignore diff --git a/lenskit/lenskit/batch/_predict.py b/lenskit/lenskit/batch/_predict.py deleted file mode 100644 index 57f621cd6..000000000 --- a/lenskit/lenskit/batch/_predict.py +++ /dev/null @@ -1,37 +0,0 @@ -# This file is part of LensKit. -# Copyright (C) 2018-2023 Boise State University -# Copyright (C) 2023-2024 Drexel University -# Licensed under the MIT license, see LICENSE.md for details. -# SPDX-License-Identifier: MIT -from __future__ import annotations - -import logging -from typing import Mapping - -from lenskit.data import ID, GenericKey, ItemList, ItemListCollection -from lenskit.pipeline import Pipeline - -from ._runner import BatchPipelineRunner - -_logger = logging.getLogger(__name__) - - -def predict( - pipeline: Pipeline, - test: ItemListCollection[GenericKey] | Mapping[ID, ItemList], - *, - n_jobs: int | None = None, - **kwargs, -) -> ItemListCollection[GenericKey]: - """ - Convenience function to batch-generate rating predictions (or other per-item - scores) from a pipeline. - - Stability: - Caller - """ - - runner = BatchPipelineRunner(n_jobs=n_jobs) - runner.predict() - outs = runner.run(pipeline, test) - return outs.output("predictions") # type: ignore diff --git a/lenskit/lenskit/batch/_recommend.py b/lenskit/lenskit/batch/_recommend.py deleted file mode 100644 index 9165910e8..000000000 --- a/lenskit/lenskit/batch/_recommend.py +++ /dev/null @@ -1,38 +0,0 @@ -# This file is part of LensKit. -# Copyright (C) 2018-2023 Boise State University -# Copyright (C) 2023-2024 Drexel University -# Licensed under the MIT license, see LICENSE.md for details. -# SPDX-License-Identifier: MIT -from __future__ import annotations - -import logging -from typing import Sequence - -from lenskit.data import ID, ItemListCollection, UserIDKey -from lenskit.pipeline import Pipeline - -from ._runner import BatchPipelineRunner - -_logger = logging.getLogger(__name__) - - -def recommend( - pipeline: Pipeline, - users: Sequence[ID | UserIDKey], - n: int | None = None, - candidates=None, - *, - n_jobs: int | None = None, - **kwargs, -) -> ItemListCollection[UserIDKey]: - """ - Convenience function to batch-generate recommendations from a pipeline. - - Stability: - Caller - """ - - runner = BatchPipelineRunner(n_jobs=n_jobs) - runner.recommend(n=n) - outs = runner.run(pipeline, users) - return outs.output("recommendations") diff --git a/lenskit/lenskit/batch/_runner.py b/lenskit/lenskit/batch/_runner.py index 8572ab2cf..39ff04195 100644 --- a/lenskit/lenskit/batch/_runner.py +++ b/lenskit/lenskit/batch/_runner.py @@ -68,9 +68,22 @@ def __init__(self, *, n_jobs: int | None = None): def add_invocation(self, inv: InvocationSpec): self.invocations.append(inv) + def score(self, component: str = "scorer", *, output: str = "scores"): + """ + Request the batch run to generate test item scores. + + Args: + component: + The name of the rating predictor component to run. + output: + The name of the results in the output dictionary. + """ + self.add_invocation(InvocationSpec("score", {component: output}, "test-items")) + def predict(self, component: str = "rating-predictor", *, output: str = "predictions"): """ - Request the batch run to generate test item scores or rating predictins. + Request the batch run to generate test item rating predictions. It is identical + to :meth:`score` but with different defaults. Args: component: @@ -78,7 +91,7 @@ def predict(self, component: str = "rating-predictor", *, output: str = "predict output: The name of the results in the output dictionary. """ - self.add_invocation(InvocationSpec("predict-ratings", {component: output}, "test-items")) + return self.score(component, output=output) def recommend( self, component: str = "recommender", *, output: str = "recommendations", **extra: Any diff --git a/lenskit/lenskit/data/bulk.py b/lenskit/lenskit/data/bulk.py deleted file mode 100644 index 0b5f4b381..000000000 --- a/lenskit/lenskit/data/bulk.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Functions for working with bulk per-user data. - -.. note:: - - This is a provisional API to enable incremental forward development while - we develop more flexible abstractions for collections of data indexed by - user or other keys. -""" - -from collections.abc import Mapping -from typing import Iterator, cast, overload - -import pandas as pd - -from .items import ItemList -from .schemas import ( - ITEM_COMPAT_COLUMN, - USER_COLUMN, - USER_COMPAT_COLUMN, - column_name, - normalize_columns, -) -from .types import ID, Column - - -def dict_to_df(data: Mapping[ID, ItemList | None], *, column: str = USER_COLUMN) -> pd.DataFrame: - """ - Convert a dictionary mapping user IDs to item lists into a data frame. - Missing item lists are excluded. - - Args: - data: - The dictionary of data. - column: - The column, to support dictionaries mapped by things other than user - IDs. - """ - - df = pd.concat( - {u: il.to_df(numbers=False) for (u, il) in data.items() if il is not None}, - names=[column], - ) - df = df.reset_index(column) - df = df.reset_index(drop=True) - return df - - -def dict_from_df( - df: pd.DataFrame, *, column: Column = USER_COMPAT_COLUMN, item_col: Column = ITEM_COMPAT_COLUMN -) -> dict[ID, ItemList]: - """ - Convert a dictionary mapping user IDs to item lists into a data frame. - - Args: - df: - The data frame. - column: - The column, to support dictionaries mapped by things other than user IDs. - """ - df = normalize_columns(df, column, item_col) - return {u: ItemList.from_df(udf) for (u, udf) in df.groupby(column_name(column))} # type: ignore - - -def group_df(df: pd.DataFrame, *, column: Column = USER_COMPAT_COLUMN, item_col=ITEM_COMPAT_COLUMN): - """ - Group a data frame by a specified column, possibly checking for and - normalizing the names of other columns as well. The default options group - by ``user_id`` and require an ``item_id`` column, allowing the compatibility - aliases ``user`` and ``item``. - """ - df = normalize_columns(df, column, item_col) - col = column_name(column) - return df.groupby(col) - - -@overload -def count_item_lists(data: Mapping[ID, ItemList | None]) -> int: ... -@overload -def count_item_lists( - data: pd.DataFrame | Mapping[ID, ItemList | None], - *, - column: Column = USER_COMPAT_COLUMN, -) -> int: ... -def count_item_lists( - data: pd.DataFrame | Mapping[ID, ItemList | None], - *, - column: Column = USER_COMPAT_COLUMN, -) -> int: - if isinstance(data, pd.DataFrame): - data = normalize_columns(data, column) - return data[column_name(column)].nunique() - else: - return len(data) - - -@overload -def iter_item_lists( - data: Mapping[ID, ItemList | None], -) -> Iterator[tuple[ID, ItemList | None]]: ... -@overload -def iter_item_lists( - data: pd.DataFrame | Mapping[ID, ItemList | None], - *, - column: Column = USER_COMPAT_COLUMN, - item_col=ITEM_COMPAT_COLUMN, -) -> Iterator[tuple[ID, ItemList | None]]: ... -def iter_item_lists( - data: pd.DataFrame | Mapping[ID, ItemList | None], - *, - column: Column = USER_COMPAT_COLUMN, - item_col=ITEM_COMPAT_COLUMN, -) -> Iterator[tuple[ID, ItemList | None]]: - """ - Iterate over item lists identified by keys. When the input is a data frame, - the column names may be specified; the default options group by ``user_id`` - and require an ``item_id`` column, allowing the compatibility aliases - ``user`` and ``item``. - """ - if isinstance(data, pd.DataFrame): - for key, df in group_df(data, column=column, item_col=item_col): - yield cast(ID, key), ItemList.from_df(df) - - else: - yield from data.items() diff --git a/lenskit/lenskit/pipeline/common.py b/lenskit/lenskit/pipeline/common.py index d8a37ac29..b6f308ae8 100644 --- a/lenskit/lenskit/pipeline/common.py +++ b/lenskit/lenskit/pipeline/common.py @@ -158,3 +158,54 @@ def topn_pipeline( builder.predicts_ratings(fallback=BiasScorer()) return builder.build(name) + + +def predict_pipeline( + scorer: Component, + *, + fallback: bool | Component = True, + n: int = -1, + name: str | None = None, +) -> Pipeline: + """ + Create a pipeline that predicts ratings, but does **not** include any + ranking capabilities. Mostly userful for testing and historical purposes. + The resulting pipeline **must** be called with an item list. + + Stability: + Caller + + Args: + scorer: + The scorer to use in the pipeline (it will added with the component + name ``scorer``, see :ref:`pipeline-names`). + fallback: + Whether to use a fallback predictor when the scorer cannot score. + When configured, the `scorer` node is the scorer, and the + `rating-predictor` node applies the fallback. + n: + The recommendation list length to configure in the pipeline. + name: + The pipeline name. + """ + from lenskit.basic.bias import BiasScorer + + pipe = Pipeline(name=name) + + query = pipe.create_input("query", RecQuery, ID, ItemList) + items = pipe.create_input("items", ItemList) + + lookup = pipe.add_component("history-lookup", UserTrainingHistoryLookup(), query=query) + + score = pipe.add_component("scorer", scorer, query=lookup, items=items) + + if fallback is True: + fallback = BiasScorer() + + if fallback is False: + pipe.alias("rating-predictor", score) + else: + backup = pipe.add_component("fallback-predictor", fallback, query=lookup, items=items) + pipe.add_component("rating-predictor", FallbackScorer(), primary=score, fallback=backup) + + return pipe diff --git a/lenskit/tests/batch/test_batch_pipeline.py b/lenskit/tests/batch/test_batch_pipeline.py index 7e4ccbe57..dc67cb533 100644 --- a/lenskit/tests/batch/test_batch_pipeline.py +++ b/lenskit/tests/batch/test_batch_pipeline.py @@ -13,7 +13,7 @@ from pytest import approx, fixture, mark from lenskit.basic import BiasScorer, PopScorer -from lenskit.batch import BatchPipelineRunner, predict, recommend +from lenskit.batch import BatchPipelineRunner, predict, recommend, score from lenskit.data import Dataset, ItemList, UserIDKey, from_interactions_df from lenskit.data.convert import normalize_interactions_df from lenskit.metrics import NDCG, RBP, RMSE, RunAnalysis @@ -64,6 +64,21 @@ def test_predict_single(mlb: MLB): assert preds >= 1 and preds <= 5 +def test_score_single(mlb: MLB): + res = score(mlb.pipeline, {1: ItemList([31])}, n_jobs=1) + + assert len(res) == 1 + uid, result = next(iter(res)) + assert isinstance(uid, UserIDKey) + assert uid.user_id == 1 + assert len(result) == 1 + assert result.ids()[0] == 31 + + preds = result.field("score") + assert preds is not None + assert preds >= 1 and preds <= 5 + + def test_recommend_user(mlb: MLB): user = 5 diff --git a/lenskit/tests/data/test_bulk.py b/lenskit/tests/data/test_bulk.py deleted file mode 100644 index ca299716b..000000000 --- a/lenskit/tests/data/test_bulk.py +++ /dev/null @@ -1,42 +0,0 @@ -# This file is part of LensKit. -# Copyright (C) 2018-2023 Boise State University -# Copyright (C) 2023-2024 Drexel University -# Licensed under the MIT license, see LICENSE.md for details. -# SPDX-License-Identifier: MIT - -""" -Test the data type utilities in splits. -""" - -import numpy as np -import pandas as pd - -from lenskit.data.bulk import dict_from_df, iter_item_lists - - -def test_dict_from_df(rng, ml_ratings: pd.DataFrame): - ml_ratings = ml_ratings.rename(columns={"user": "user_id", "item": "item_id"}) - users = dict_from_df(ml_ratings) - assert len(users) == ml_ratings["user_id"].nunique() - assert set(users.keys()) == set(ml_ratings["user_id"]) - - for uid in rng.choice(ml_ratings["user_id"].unique(), 25): - items = users[uid] - udf = ml_ratings[ml_ratings["user_id"] == uid] - assert len(items) == len(udf) - assert np.all(np.unique(items.ids()) == np.unique(udf["item_id"])) - - tot = sum(len(il) for il in users.values()) - assert tot == len(ml_ratings) - - -def test_iter_df(ml_ratings: pd.DataFrame): - counts = {} - for key, il in iter_item_lists(ml_ratings): - counts[key] = len(il) - - counts = pd.Series(counts) - in_counts = ml_ratings.value_counts("user_id") - assert len(counts) == len(in_counts) - counts, in_counts = counts.align(in_counts, join="outer") - assert np.all(counts == in_counts) diff --git a/lenskit/tests/models/test_knn_item_item.py b/lenskit/tests/models/test_knn_item_item.py index cb0957cca..c878b4a29 100644 --- a/lenskit/tests/models/test_knn_item_item.py +++ b/lenskit/tests/models/test_knn_item_item.py @@ -22,12 +22,12 @@ from lenskit.basic.history import UserTrainingHistoryLookup from lenskit.batch import BatchPipelineRunner from lenskit.data import ItemList, ItemListCollection, UserIDKey, Vocabulary, from_interactions_df -from lenskit.data.bulk import dict_to_df, iter_item_lists from lenskit.diagnostics import ConfigWarning, DataWarning from lenskit.knn.item import ItemKNNScorer from lenskit.metrics import MAE, RBP, RMSE, RecipRank, RunAnalysis, call_metric, quick_measure_model from lenskit.operations import score from lenskit.pipeline import RecPipelineBuilder, topn_pipeline +from lenskit.pipeline.common import predict_pipeline from lenskit.splitting import SampleFrac, crossfold_users from lenskit.testing import BasicComponentTests, ScorerTests, wantjit from lenskit.util.torch import inference_mode @@ -458,7 +458,7 @@ def test_ii_known_preds(ml_ds): from lenskit import batch iknn = ItemKNNScorer(20, min_sim=1.0e-6) - pipe = topn_pipeline(iknn) + pipe = predict_pipeline(iknn, fallback=False) # noqa: F821 _log.info("training %s on ml data", iknn) pipe.train(ml_ds) _log.info("model means: %s", iknn.item_means_) @@ -467,12 +467,10 @@ def test_ii_known_preds(ml_ds): pred_file = dir / "item-item-preds.csv" _log.info("reading known predictions from %s", pred_file) known_preds = pd.read_csv(str(pred_file)) + known = ItemListCollection.from_df(known_preds, UserIDKey) - preds = { - user: score(pipe, query=user, items=ItemList(kps, prediction=False)) - for (user, kps) in iter_item_lists(known_preds) - } - preds = dict_to_df(preds) + preds = batch.predict(pipe, known, n_jobs=1) + preds = preds.to_df().drop(columns=["prediction"], errors="ignore") merged = pd.merge(known_preds.rename(columns={"prediction": "expected"}), preds) assert len(merged) == len(preds) diff --git a/lenskit/tests/models/test_knn_user_user.py b/lenskit/tests/models/test_knn_user_user.py index ad246377f..afcb76c49 100644 --- a/lenskit/tests/models/test_knn_user_user.py +++ b/lenskit/tests/models/test_knn_user_user.py @@ -14,10 +14,17 @@ from pytest import approx, fail, mark -from lenskit.data import Dataset, ItemList, RecQuery, from_interactions_df -from lenskit.data.bulk import dict_to_df, iter_item_lists +from lenskit.data import ( + Dataset, + ItemList, + ItemListCollection, + RecQuery, + UserIDKey, + from_interactions_df, +) from lenskit.knn import UserKNNScorer from lenskit.metrics import call_metric, quick_measure_model +from lenskit.pipeline.common import predict_pipeline, topn_pipeline from lenskit.testing import BasicComponentTests, ScorerTests _log = logging.getLogger(__name__) @@ -210,20 +217,19 @@ def test_uu_known_preds(ml_ds: Dataset): from lenskit import batch uknn = UserKNNScorer(30, min_sim=1.0e-6) + pipe = predict_pipeline(uknn, fallback=False) _log.info("training %s on ml data", uknn) - uknn.train(ml_ds) + pipe.train(ml_ds) dir = Path(__file__).parent pred_file = dir / "user-user-preds.csv" _log.info("reading known predictions from %s", pred_file) known_preds = pd.read_csv(str(pred_file)) _log.info("generating %d known predictions", len(known_preds)) + known = ItemListCollection.from_df(known_preds, UserIDKey) - preds = { - user: uknn(user, ItemList(kps, prediction=False)) - for (user, kps) in iter_item_lists(known_preds) - } - preds = dict_to_df(preds) + preds = batch.predict(pipe, known, n_jobs=1) + preds = preds.to_df().drop(columns=["prediction"], errors="ignore") merged = pd.merge(known_preds.rename(columns={"prediction": "expected"}), preds) assert len(merged) == len(preds)