From 74bdc28e615755e2dfb6b9e02dfea08c0b8f3eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?N=C3=ADcollas=20Silva?= Date: Fri, 8 Apr 2022 22:43:10 +0100 Subject: [PATCH] Refactor environment (#70) * created the environment and inserted the split and filter methods, in addition to refactoring the DatasetLoader.py and dataset.py modules * fixed random split * removed comments * refactor DefaultDatasetLoader.py and dataset.py, created folder for filters and splits * Set setup and make * Saving progress in the enviornment refactor * Implement loader * Finish enviornment implementation * Fix loader and split-base * Adding loaders to the iRec * Delete .idea directory * environment integration: environment/load environment/split and environment/filter * environment integration: environment/load environment/split and environment/filter * finished the integration of the fixed and updated train-test load registry.py * fixed num_total_users/items * fixed imports * Fix validation * A simple example of tests * fixed return train test dataset * added documentation for load module * fixed assert * Added docstrings and type hints * Added docstrings and typehints * Update docs for dataset.py and fixed warnings * Fix simple returns * Fix bugs * Add bdd tests * updated requirements * removed unit test * refactor: removed idea directory * refactor: removed unnecessary Makefile * fixed erros in yaml * Update InteractionMetricEvaluator.py * remove: traitlets dependency in run_agent * feat: dev requirements included behave * remove: redundant setup file * refactor: removed all app branch changes Co-authored-by: thiagodks Co-authored-by: Nicollas Silva Co-authored-by: Thiago Silva <48692251+thiagodks@users.noreply.github.com> Co-authored-by: Carlos Mito Co-authored-by: heitor57 --- .gitignore | 1 + irec/{app => connector}/utils.py | 71 +-- irec/environment/__init__.py | 0 irec/environment/dataset.py | 80 +++ irec/environment/filter/filtering_by_items.py | 47 ++ irec/environment/filter/filtering_by_users.py | 47 ++ irec/environment/loader/full_data.py | 152 +++++ irec/environment/loader/train_test.py | 108 ++++ irec/environment/registry.py | 59 ++ irec/environment/split/base.py | 74 +++ irec/environment/split/randomised.py | 34 ++ irec/environment/split/temporal.py | 41 ++ irec/evaluation_policies/EvaluationPolicy.py | 3 +- irec/evaluation_policies/FixedInteraction.py | 3 +- .../InteractionMetricEvaluator.py | 4 +- irec/utils/DatasetLoader.py | 133 ---- irec/utils/Factory.py | 8 +- irec/utils/dataset.py | 573 ------------------ irec/value_functions/MostPopular.py | 2 + requirements_dev.txt | 2 + setup.cfg | 2 +- tests/__init__.py | 0 tests/bdd/__init__.py | 0 .../filter/filtering_by_items.feature | 50 ++ .../filter/filtering_by_users.feature | 50 ++ .../environment/split/randomised.feature | 28 + .../environment/split/temporal.feature | 28 + tests/bdd/steps/__init__.py | 0 tests/bdd/steps/given.py | 38 ++ tests/bdd/steps/then.py | 29 + tests/bdd/steps/when.py | 84 +++ 31 files changed, 988 insertions(+), 763 deletions(-) rename irec/{app => connector}/utils.py (97%) create mode 100644 irec/environment/__init__.py create mode 100644 irec/environment/dataset.py create mode 100644 irec/environment/filter/filtering_by_items.py create mode 100644 irec/environment/filter/filtering_by_users.py create mode 100644 irec/environment/loader/full_data.py create mode 100644 irec/environment/loader/train_test.py create mode 100644 irec/environment/registry.py create mode 100644 irec/environment/split/base.py create mode 100644 irec/environment/split/randomised.py create mode 100644 irec/environment/split/temporal.py delete mode 100644 irec/utils/DatasetLoader.py delete mode 100644 irec/utils/dataset.py create mode 100644 tests/__init__.py create mode 100644 tests/bdd/__init__.py create mode 100644 tests/bdd/features/environment/filter/filtering_by_items.feature create mode 100644 tests/bdd/features/environment/filter/filtering_by_users.feature create mode 100644 tests/bdd/features/environment/split/randomised.feature create mode 100644 tests/bdd/features/environment/split/temporal.feature create mode 100644 tests/bdd/steps/__init__.py create mode 100644 tests/bdd/steps/given.py create mode 100644 tests/bdd/steps/then.py create mode 100644 tests/bdd/steps/when.py diff --git a/.gitignore b/.gitignore index f93944b1..c065e321 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ data/ *.aux *.log *.csv +.idea/ .vim/coc-settings.json # Byte-compiled / optimized / DLL files diff --git a/irec/app/utils.py b/irec/connector/utils.py similarity index 97% rename from irec/app/utils.py rename to irec/connector/utils.py index 16e6018c..33f55164 100755 --- a/irec/app/utils.py +++ b/irec/connector/utils.py @@ -11,7 +11,7 @@ import json from collections import defaultdict from pathlib import Path -from irec.utils.dataset import TrainTestDataset +from irec.environment.dataset import Dataset import collections from app import constants import matplotlib.ticker as mtick @@ -217,7 +217,7 @@ def load_settings(workdir): # with open( # workdir + sep + "settings" + sep + # "datasets_preprocessors_parameters.yaml") as f: - # d['datasets_preprocessors_parameters'] = yaml.load(f, Loader=loader) + # d['datasets_preprocessors_parameters'] = yaml.loader(f, Loader=loader) # d['datasets_preprocessors_parameters'] = { # k: { # **setting, @@ -270,7 +270,8 @@ def get_experiment_run_id(dm, evaluation_policy, itr_id): def run_interactor( agent, - traintest_dataset: TrainTestDataset, + train_dataset: Dataset, + test_dataset: Dataset, evaluation_policy: EvaluationPolicy, settings, forced_run, @@ -291,31 +292,19 @@ def run_interactor( log_custom_parameters(get_agent_run_parameters(settings)) interactions, acts_info = evaluation_policy.evaluate( - agent, traintest_dataset.train, traintest_dataset.test + agent, train_dataset, test_dataset ) fname = "./tmp/interactions.pickle" log_custom_artifact(fname, interactions) fname = "./tmp/acts_info.pickle" log_custom_artifact(fname, acts_info) - # create_path_to_file(fname) - # with open(fname,mode='wb') as f: - # pickle.dump(history_items_recommended,f) - # mlflow.log_artifact(f.name) def get_agent_id(agent_name, agent_parameters): - # agent_dict = class2dict(agent) return agent_name + "_" + json.dumps(agent_parameters, separators=(",", ":")) -# def get_agent_id(agent, template_parameters): -# agent_dict = class2dict(agent) -# new_agent_settings = update_nested_dict(template_parameters, agent_dict) -# return agent.name + '_' + json.dumps(new_agent_settings, -# separators=(',', ':')) - - def get_agent_id_from_settings(agent, settings): agent_settings = next( gen_dict_extract(agent.name, settings["agents_preprocessor_parameters"]) @@ -471,28 +460,20 @@ def load_dataset_experiment(settings): ) client = MlflowClient() - artifact_path = client.download_artifacts(run.info.run_id, "dataset.pickle") - traintest_dataset = pickle.load(open(artifact_path, "rb")) - return traintest_dataset - + train_artifact_path = client.download_artifacts(run.info.run_id, "train_dataset.pickle") + test_artifact_path = client.download_artifacts(run.info.run_id, "test_dataset.pickle") + train_dataset = pickle.load(open(train_artifact_path, "rb")) + test_dataset = pickle.load(open(test_artifact_path, "rb")) + return train_dataset, test_dataset -def run_agent(traintest_dataset, settings, forced_run): - # dataset_loader_parameters = settings["dataset_loaders"][ - # settings["defaults"]["dataset_loader"] - # ] +def run_agent(train_dataset, test_dataset, settings, forced_run): evaluation_policy_name = settings["defaults"]["evaluation_policy"] evaluation_policy_parameters = settings["evaluation_policies"][ evaluation_policy_name ] - # exec("import irec.value_functions.{}".format(value_function_name)) - # value_function = eval( - # "irec.value_functions.{}.{}".format( - # value_function_name, value_function_name - # ) - # )(**value_function_parameters) exec( f"from irec.evaluation_policies.{evaluation_policy_name} import {evaluation_policy_name}" ) @@ -504,7 +485,8 @@ def run_agent(traintest_dataset, settings, forced_run): agent = AgentFactory().create(settings["defaults"]["agent"], agent_parameters) run_interactor( agent=agent, - traintest_dataset=traintest_dataset, + train_dataset=train_dataset, + test_dataset=test_dataset, evaluation_policy=evaluation_policy, settings=settings, forced_run=forced_run, @@ -737,9 +719,6 @@ def generate_base(dataset_name, settings): dataset_loader_settings, ) ) - # client.log_param() - # for k,v in dataset_loader_settings.items(): - # log_param(k,v) from irec.utils.Factory import DatasetLoaderFactory @@ -747,10 +726,9 @@ def generate_base(dataset_name, settings): dataset_loader = dataset_loader_factory.create( dataset_name, dataset_loader_settings ) - dataset = dataset_loader.load() - - fname = "./tmp/dataset.pickle" - log_custom_artifact(fname, dataset) + train_dataset, test_dataset = dataset_loader.process() + log_custom_artifact("./tmp/train_dataset.pickle", train_dataset) + log_custom_artifact("./tmp/test_dataset.pickle", test_dataset) def download_data(dataset_names): @@ -804,7 +782,7 @@ def run_agent_with_dataset_parameters( for dataset_loader_name in dataset_loaders: current_settings = settings current_settings["defaults"]["dataset_loader"] = dataset_loader_name - traintest_dataset = load_dataset_experiment(settings) + train_dataset, test_dataset = load_dataset_experiment(settings) for agent_name in agents: current_settings["defaults"]["agent"] = agent_name current_settings["agents"][agent_name] = dataset_agents_parameters[ @@ -813,7 +791,8 @@ def run_agent_with_dataset_parameters( if tasks>1: f = executor.submit( run_agent, - traintest_dataset, + train_dataset, + test_dataset, copy.deepcopy(current_settings), forced_run, ) @@ -821,7 +800,7 @@ def run_agent_with_dataset_parameters( if len(futures) >= tasks: completed, futures = wait(futures, return_when=FIRST_COMPLETED) else: - run_agent(traintest_dataset,copy.deepcopy(current_settings),forced_run) + run_agent(train_dataset, test_dataset,copy.deepcopy(current_settings),forced_run) for f in futures: f.result() @@ -1273,22 +1252,22 @@ def evaluate_agent_with_dataset_parameters( ): from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED - from irec.utils.dataset import Dataset with ProcessPoolExecutor(max_workers=tasks) as executor: futures = set() for dataset_loader_name in dataset_loaders: settings["defaults"]["dataset_loader"] = dataset_loader_name - traintest_dataset = load_dataset_experiment(settings) + train_dataset, test_dataset = load_dataset_experiment(settings) data = np.vstack( - (traintest_dataset.train.data, traintest_dataset.test.data) + (train_dataset.data, test_dataset.data) ) dataset = Dataset(data) - dataset.update_from_data() + dataset.set_parameters() dataset.update_num_total_users_items() + for agent_name in agents: settings["defaults"]["agent"] = agent_name settings["agents"][agent_name] = dataset_agents_parameters[ @@ -1347,7 +1326,7 @@ def eval_agent_search( data = np.vstack((traintest.train.data, traintest.test.data)) dataset = copy.copy(traintest.train) dataset.data = data - dataset.update_from_data() + dataset.set_parameters() for agent_name in agents: settings["defaults"]["agent"] = agent_name for agent_og_parameters in agents_search_parameters[agent_name]: diff --git a/irec/environment/__init__.py b/irec/environment/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/irec/environment/dataset.py b/irec/environment/dataset.py new file mode 100644 index 00000000..f1f4fe6f --- /dev/null +++ b/irec/environment/dataset.py @@ -0,0 +1,80 @@ +from typing import List +import numpy as np + + +class Dataset: + + num_users = 0 + num_items = 0 + rate_domain = set() + max_uid = 0 + max_iid = 0 + mean_rating = 0 + min_rating = 0 + max_rating = 0 + + def __init__( + self, + data: np.ndarray + ): + """__init__ + + Args: + data (np.ndarray): the data + + """ + self.data = data + self.num_total_users = 0 + self.num_total_items = 0 + + @staticmethod + def normalize_ids(ids: List) -> np.array: + """normalize_ids + + normalizes the ids by putting them in sequence + + Args: + ids (List): list of ids + + Returns: + result (np.array): the normalized ids + """ + unique_values = np.sort(np.unique(ids)) + result = np.searchsorted(unique_values, ids) + return result + + def reset_index(self): + """reset_index + + Resets user and item indices + + """ + self.data[:, 0] = self.normalize_ids(self.data[:, 0]) + self.data[:, 1] = self.normalize_ids(self.data[:, 1]) + + def set_parameters(self): + + """set_parameters + + Calculates and updates the database parameters + + """ + self.num_users = len(np.unique(self.data[:, 0])) + self.num_items = len(np.unique(self.data[:, 1])) + self.rate_domain = set(np.unique(self.data[:, 2])) + self.uids = np.unique(self.data[:, 0]).astype(int) + self.iids = np.unique(self.data[:, 1]).astype(int) + self.max_uid = np.max(self.uids) + self.max_iid = np.max(self.iids) + self.mean_rating = np.mean(self.data[:, 2]) + self.min_rating = np.min(self.data[:, 2]) + self.max_rating = np.max(self.data[:, 2]) + + def update_num_total_users_items(self, num_total_users=0, num_total_items=0): + """update_num_total_users_items + + Updates the total number of users and items + + """ + self.num_total_users = num_total_users if num_total_users > self.max_uid+1 else self.max_uid+1 + self.num_total_items = num_total_items if num_total_items > self.max_iid+1 else self.max_iid+1 diff --git a/irec/environment/filter/filtering_by_items.py b/irec/environment/filter/filtering_by_items.py new file mode 100644 index 00000000..0cb4b846 --- /dev/null +++ b/irec/environment/filter/filtering_by_items.py @@ -0,0 +1,47 @@ +import random +from pandas import DataFrame + + +class FilteringByItems: + """FilteringByItems. + This class contains different filtering by item approaches. + """ + + def __init__(self): + pass + + @staticmethod + def min_ratings(df_dataset: DataFrame, min_ratings: int) -> DataFrame: + """min_ratings. + This function removes items whose total number of + ratings is less than [min_ratings]. + + Args: + df_dataset (DataFrame): the data to be filtered. + min_ratings (int): minimum number of ratings. + Returns: + The data filtered by the number of ratings. + """ + selected_items = dict( + df_dataset.groupby("itemId")["userId"].agg("count")[ + lambda ratings: ratings >= min_ratings + ] + ) + return df_dataset[df_dataset["itemId"].isin(selected_items)] + + @staticmethod + def num_items(df_dataset: DataFrame, num_items: int) -> DataFrame: + """num_items. + This function limits the number of distinct items in the dataset. + + Args: + df_dataset (DataFrame): the data to be filtered. + num_items (int): maximum number of items in the dataset. + Returns: + The data filtered by the number of items. + """ + try: + selected_items = random.sample(list(df_dataset["itemId"].unique()), num_items) + except: + return df_dataset + return df_dataset[df_dataset["itemId"].isin(selected_items)] diff --git a/irec/environment/filter/filtering_by_users.py b/irec/environment/filter/filtering_by_users.py new file mode 100644 index 00000000..143c0497 --- /dev/null +++ b/irec/environment/filter/filtering_by_users.py @@ -0,0 +1,47 @@ +import random +from pandas import DataFrame + + +class FilteringByUsers: + """FilteringByUsers. + This class contains different filtering by users approaches. + """ + + def __init__(self): + pass + + @staticmethod + def min_consumption(df_dataset: DataFrame, min_consumption: int) -> DataFrame: + """min_consumption. + This function removes users whose total number of + consumptions is less than [min_consumption]. + + Args: + df_dataset (DataFrame): the data to be filtered. + min_consumption (int): minimum number of items consumed by a user. + Returns: + The data filtered by the number of consumptions. + """ + selected_users = dict( + df_dataset.groupby("userId")["itemId"].agg("count")[ + lambda consumption: consumption >= min_consumption + ] + ) + return df_dataset[df_dataset["userId"].isin(selected_users)] + + @staticmethod + def num_users(df_dataset: DataFrame, num_users: int) -> DataFrame: + """num_users. + This function limits the number of distinct users in the dataset. + + Args: + df_dataset (DataFrame): the data to be filtered. + num_users (int): maximum number of users in the dataset. + Returns: + The data filtered by the number of users. + """ + try: + selected_users = random.sample(list(df_dataset["userId"].unique()), num_users) + except: + return df_dataset + return df_dataset[df_dataset["userId"].isin(selected_users)] diff --git a/irec/environment/loader/full_data.py b/irec/environment/loader/full_data.py new file mode 100644 index 00000000..8b23dcee --- /dev/null +++ b/irec/environment/loader/full_data.py @@ -0,0 +1,152 @@ +from typing import Tuple, TypedDict +import pandas as pd +import numpy as np +import random + +from irec.environment.dataset import Dataset +from irec.environment.registry import FilterRegistry, SplitRegistry + +DatasetType = TypedDict('DatasetType', {'path': str, 'random_seed': float, 'file_delimiter': str, 'skip_head': bool}) +FilterUsersType = TypedDict('FilterUsersType', {'min_consumption': int, 'num_users': int}) +FilterItemsType = TypedDict('FilterItemsType', {'min_ratings': int, 'num_items': int}) +FilteringType = TypedDict('FilteringType', {'filter_users': FilterUsersType, 'filter_items': FilterItemsType}) +SplittingType = TypedDict('SplittingType', {'strategy': str, 'train_size': float, 'test_consumes': int}) + + +class DefaultLoader: + + def __init__(self, + dataset: DatasetType, + prefiltering: FilteringType, + splitting: SplittingType) -> None: + """__init__. + + Args: + dataset (DatasetType): info required by the dataset + prefiltering (FilteringType): info required by the prefiltering + splitting (SplittingType): info required by the Splitting + """ + # dataset attributes + if "path" in dataset.keys(): + self.dataset_path = dataset["path"] + else: + raise IndexError("Dataset 'path' not exists. You must define your dataset path to be reader by the system.") + + self.random_seed = dataset["random_seed"] if "random_seed" in dataset.keys() else 0 + self.delimiter = dataset["file_delimiter"] if "file_delimiter" in dataset.keys() else "," + self.skip_rows = int(dataset["skip_head"]) if "skip_head" in dataset.keys() else 1 + + # filtering attributes + self.prefiltering = prefiltering + + # splitting attributes + self.test_consumes = splitting["test_consumes"] if "test_consumes" in splitting.keys() else 0 + self.strategy = splitting["strategy"] if "strategy" in splitting.keys() else "random" + self.train_size = splitting["train_size"] if "train_size" in splitting.keys() else 0.8 + + def _read(self) -> np.ndarray: + """_read + The data read according to the parameters specified. + The expected columns are userId, itemId, rating, timestamp + Returns: + data (np.ndarray): The data loaded + """ + data = np.loadtxt(self.dataset_path, + delimiter=self.delimiter, + skiprows=self.skip_rows) + + # TODO: check if timestamp exists. If not, create a sequential value for it + + return data + + def _filter(self, + data: np.array) -> np.ndarray: + """_filter + + Applies all filters specified in dataset_loaders.yaml + + Args: + data: the array of data previously read + + Returns: + data_df (np.array): The data filtered by the filters applied. + """ + data_df = pd.DataFrame(data, columns=["userId", "itemId", "rating", "timestamp"]) + print(f"\nApplying filters...") + for key, filters in self.prefiltering.items(): + print(f"{key}:") + for filter_method, value in filters.items(): + print(f"\t {filter_method}: {value}") + data_df = getattr(FilterRegistry.get(key), filter_method)(data_df, value) + + return data_df.to_numpy() + + def _split(self, + dataset: Dataset) -> Tuple[Dataset, Dataset]: + """split + + Splits the data set into training and testing + + Args: + dataset (Dataset): an object of the dataset class + + Returns: + train_dataset (Dataset): the train + test_dataset (Dataset): the test + """ + num_train_users = round(dataset.num_users * self.train_size) + num_test_users = int(dataset.num_users - num_train_users) + # Get the required strategy + split_strategy = SplitRegistry.get(self.strategy)( + test_consumes=self.test_consumes, + train_size=self.train_size) + # Apply it in the data + test_uids = split_strategy.get_test_uids(dataset.data, num_test_users) + train_dataset, test_dataset = split_strategy.split_dataset(dataset.data, test_uids) + train_dataset.update_num_total_users_items( + num_total_users=dataset.num_total_users, + num_total_items=dataset.num_total_items + ) + test_dataset.update_num_total_users_items( + num_total_users=dataset.num_total_users, + num_total_items=dataset.num_total_items + ) + return train_dataset, test_dataset + + def process(self) -> Tuple[Dataset, Dataset]: + + """process + + Perform complete processing of the dataset: read -> filter (optional) -> split + + Return: + train_dataset (Dataset): the train + test_dataset (Dataset): the test + """ + + np.random.seed(self.random_seed) + random.seed(self.random_seed) + # Read the data + data = self._read() + # Create dataset + dataset = Dataset(data) + dataset.reset_index() + dataset.set_parameters() + dataset.update_num_total_users_items() + + # Apply filters if they were defined + if len(self.prefiltering) > 0: + filtered_data = self._filter(dataset.data) + # update dataset + dataset = Dataset(filtered_data) + dataset.reset_index() + dataset.set_parameters() + dataset.update_num_total_users_items() + + # Create train and test set + print(f"\nApplying splitting strategy: {self.strategy}\n") + train_dataset, test_dataset = self._split(dataset) + print("train:", train_dataset.num_total_items, train_dataset.num_total_users) + print("test:", test_dataset.num_total_items, test_dataset.num_total_users) + + return train_dataset, test_dataset diff --git a/irec/environment/loader/train_test.py b/irec/environment/loader/train_test.py new file mode 100644 index 00000000..a85fd24c --- /dev/null +++ b/irec/environment/loader/train_test.py @@ -0,0 +1,108 @@ +from typing import TypedDict, Tuple +import numpy as np +from irec.environment.dataset import Dataset + +TrainDatasetType = TypedDict('TrainDatasetType', {'path': str, 'file_delimiter': str, 'skip_head': bool}) +TestDatasetType = TypedDict('TestDatasetType', {'path': str, 'file_delimiter': str, 'skip_head': bool}) +ValidationDatasetType = TypedDict('ValidationDatasetType', {'path': str, 'file_delimiter': str, 'skip_head': bool}) +DatasetType = TypedDict('DatasetType', {'train': TrainDatasetType, 'test': TestDatasetType}) + + +class TrainTestLoader: + + def __init__( + self, + dataset: DatasetType) -> None: + """__init__. + + Args: + dataset (DatasetType): info required by the dataset + """ + + assert len(dataset.keys()) == 2, "You must define files for train and test sets." + self.dataset_params = dataset + + def _set_attributes(self, + dataset: DatasetType, + split_type: str) -> None: + + """_set_attributes + + Set dataset attributes + + Args: + dataset (DatasetType): dictionary with training and test datasets + split_type (str): split type (train or test) + + """ + + if split_type in dataset.keys() and "path" in dataset[split_type].keys(): + self.path = dataset[split_type]["path"] + self.delimiter = dataset[split_type]["file_delimiter"] \ + if "file_delimiter" in dataset[split_type].keys() else "," + self.skip_rows = int(dataset[split_type]["skip_head"]) \ + if "skip_head" in dataset[split_type].keys() else 1 + else: + raise IndexError(f"You must define your {split_type} data and its path to be reader by the system.") + + @staticmethod + def _read(path: str, + delimiter: str, + skiprows: int) -> np.ndarray: + """_read + + The data read according to the parameters specified. + + Args: + path (str): dataset directory + delimiter (str): file delimiter + skiprows (str): used to skip or not the file header + + Return: + data (np.ndarray): the data + """ + data = np.loadtxt(path, + delimiter=delimiter, + skiprows=skiprows) + return data + + def process(self) -> Tuple[Dataset, Dataset]: + + """process + + reads the dataset and gets information about the dataset + + Returns: + train_dataset (Dataset): the train + test_dataset (Dataset): the test + """ + + self._set_attributes(self.dataset_params, split_type="train") + train_data = self._read(self.path, + self.delimiter, + self.skip_rows) + train_dataset = Dataset(train_data) + train_dataset.set_parameters() + + self._set_attributes(self.dataset_params, split_type="test") + test_data = self._read(self.path, + self.delimiter, + self.skip_rows) + test_dataset = Dataset(test_data) + test_dataset.set_parameters() + + num_total_users = max(train_dataset.max_uid, test_dataset.max_uid) + 1 + num_total_items = max(train_dataset.max_iid, test_dataset.max_iid) + 1 + + train_dataset.update_num_total_users_items( + num_total_users=num_total_users, + num_total_items=num_total_items + ) + test_dataset.update_num_total_users_items( + num_total_users=num_total_users, + num_total_items=num_total_items + ) + print("Test shape:", test_dataset.data.shape) + print("Train shape:", train_dataset.data.shape) + + return train_dataset, test_dataset diff --git a/irec/environment/registry.py b/irec/environment/registry.py new file mode 100644 index 00000000..8202b943 --- /dev/null +++ b/irec/environment/registry.py @@ -0,0 +1,59 @@ +from __future__ import annotations +from typing import List + + +class SplitRegistry: + + from irec.environment.split.randomised import Random + from irec.environment.split.temporal import Temporal + + _splitting = { + "temporal": Temporal, + "random": Random, + } + + @classmethod + def all(cls: SplitRegistry) -> List[str]: + return list(cls._splitting.keys()) + + @classmethod + def get(cls: SplitRegistry, name: str): + return cls._splitting[name] + + +class FilterRegistry: + + from irec.environment.filter.filtering_by_items import FilteringByItems + from irec.environment.filter.filtering_by_users import FilteringByUsers + + _filters = { + "filter_users": FilteringByUsers, + "filter_items": FilteringByItems, + } + + @classmethod + def all(cls: FilterRegistry) -> List[str]: + return list(cls._filters.keys()) + + @classmethod + def get(cls: FilterRegistry, name: str): + return cls._filters[name] + + +class LoaderRegistry: + + from irec.environment.loader.train_test import TrainTestLoader + from irec.environment.loader.full_data import DefaultLoader + + _loader = { + "TrainTestLoader": TrainTestLoader, + "DefaultLoader": DefaultLoader, + } + + @classmethod + def all(cls: LoaderRegistry) -> List[str]: + return list(cls._loader.keys()) + + @classmethod + def get(cls: LoaderRegistry, name: str): + return cls._loader[name] diff --git a/irec/environment/split/base.py b/irec/environment/split/base.py new file mode 100644 index 00000000..6be1a36a --- /dev/null +++ b/irec/environment/split/base.py @@ -0,0 +1,74 @@ +import numpy as np + +from typing import List, Tuple +from irec.environment.dataset import Dataset +from pandas import DataFrame + + +class SplitStrategy: + """SplitStrategy. + + Base class for the splitting strategies. + """ + + def __init__(self, train_size: float, test_consumes: int): + """__init__. + + Args: + train_size (float): defines the train size in percentage [0, 1]. + test_consumes (int): minimum number of items a user must consume to be a candidate. + """ + + self.train_size = train_size + self.test_consumes = test_consumes + + def _get_users_candidate(self, data_df: DataFrame) -> List[int]: + """_get_users_candidate. + + This method gets the IDs of the users with more than or equal + to [self.test_consumes] consumptions. + + Args: + data_df (DataFrame): the data to be splitted. + Returns: + List of the valid candidate users. + """ + + users_items_consumed = data_df.groupby("userId").count().iloc[:, 0] + test_candidate_users = list( + users_items_consumed[users_items_consumed >= self.test_consumes] + .to_dict() + .keys() + ) + return test_candidate_users + + @staticmethod + def split_dataset( + data: np.ndarray, + test_uids: List[int] + ) -> Tuple[Dataset, Dataset]: + """split_dataset. + + Method responsible for performing the splitting. + + Args: + data (np.ndarray): the data to be splitted. + test_uids (List[int]): list of the users ID in the testset. + Returns: + A tuple containing the trainset and the testset. + """ + + data_isin_test_uids = np.isin(data[:, 0], test_uids) + + train_data = data[~data_isin_test_uids, :] + train_dataset = Dataset(train_data) + train_dataset.set_parameters() + + test_data = data[data_isin_test_uids, :] + test_dataset = Dataset(test_data) + test_dataset.set_parameters() + + print("Test shape:", test_dataset.data.shape) + print("Train shape:", train_dataset.data.shape) + + return train_dataset, test_dataset diff --git a/irec/environment/split/randomised.py b/irec/environment/split/randomised.py new file mode 100644 index 00000000..7b686644 --- /dev/null +++ b/irec/environment/split/randomised.py @@ -0,0 +1,34 @@ +import numpy as np +import pandas as pd + +from .base import SplitStrategy +from random import sample +from pandas import DataFrame + + +class Random(SplitStrategy): + """Random. + + A strategy that randomly selects data for the + training and testing sets; + + """ + + def get_test_uids( + self, + data: np.ndarray, + num_test_users: int + ) -> np.array: + """get_test_uids. + Performs a random splitting. + Args: + data (ndarray): the data to be split. + num_test_users (int): total number of users in the test set. + Returns: + A list of the users IDs that will be in the test set. + """ + data_df = pd.DataFrame(data, columns=["userId", "itemId", "rating", "timestamp"]) + test_candidate_users = self._get_users_candidate(data_df) + test_uids = np.array(sample(test_candidate_users, + k=num_test_users)) + return test_uids diff --git a/irec/environment/split/temporal.py b/irec/environment/split/temporal.py new file mode 100644 index 00000000..ce5a0101 --- /dev/null +++ b/irec/environment/split/temporal.py @@ -0,0 +1,41 @@ +import numpy as np +import pandas as pd + +from .base import SplitStrategy +from pandas import DataFrame + + +class Temporal(SplitStrategy): + """Random. + + A strategy that splits the user-item interactions based + on timestamp. The idea is to avoid introducing a bias in users' + consumption history by breaking the chronological order of the + events. + """ + + def get_test_uids( + self, + data: np.ndarray, + num_test_users: int + ) -> np.array: + """get_test_uids. + Performs the temporal splitting strategy. + Args: + data (ndarray): the data to be split. + num_test_users (int): total number of users in the test set. + Returns: + A list of the users IDs that will be in the test set. + """ + data_df = pd.DataFrame(data, columns=["userId", "itemId", "rating", "timestamp"]) + test_candidate_users = self._get_users_candidate(data_df) + test_candidate_users = np.array(test_candidate_users, dtype=int) + users_start_time = data_df.groupby("userId").min()["timestamp"].to_numpy() + test_uids = np.array( + list( + test_candidate_users[ + list(reversed(np.argsort(users_start_time[test_candidate_users]))) + ] + )[:num_test_users] + ) + return test_uids diff --git a/irec/evaluation_policies/EvaluationPolicy.py b/irec/evaluation_policies/EvaluationPolicy.py index 019eeed5..342f6ca4 100644 --- a/irec/evaluation_policies/EvaluationPolicy.py +++ b/irec/evaluation_policies/EvaluationPolicy.py @@ -1,6 +1,5 @@ -from irec.utils.dataset import Dataset +from irec.environment.dataset import Dataset from irec.agents import Agent -import irec.value_functions """Evaluation Policies. diff --git a/irec/evaluation_policies/FixedInteraction.py b/irec/evaluation_policies/FixedInteraction.py index f2021c58..8ef0e3d9 100644 --- a/irec/evaluation_policies/FixedInteraction.py +++ b/irec/evaluation_policies/FixedInteraction.py @@ -1,7 +1,7 @@ from irec.ActionCollection import OneUserActionCollection from .EvaluationPolicy import EvaluationPolicy from threadpoolctl import threadpool_limits -from irec.utils.dataset import Dataset +from irec.environment.dataset import Dataset from collections import defaultdict from irec.agents import Agent from tqdm import tqdm @@ -75,7 +75,6 @@ def evaluate(self, model, train_dataset, test_dataset): items_not_recommended = np.nonzero(not_recommended)[0] # items_score, info = model.action_estimates((uid,items_not_recommended)) # best_items = items_not_recommended[np.argpartition(items_score,-self.interaction_size)[-self.interaction_size:]] - actions, info = model.act( OneUserActionCollection(uid, items_not_recommended), self.interaction_size, diff --git a/irec/metric_evaluators/InteractionMetricEvaluator.py b/irec/metric_evaluators/InteractionMetricEvaluator.py index eb093ad3..9ca300d8 100644 --- a/irec/metric_evaluators/InteractionMetricEvaluator.py +++ b/irec/metric_evaluators/InteractionMetricEvaluator.py @@ -2,7 +2,7 @@ from irec.value_functions.Entropy import Entropy from .MetricEvaluator import MetricEvaluator from collections import defaultdict -from irec.utils import dataset +from irec.environment.dataset import Dataset import irec.value_functions import scipy.sparse import numpy as np @@ -29,7 +29,7 @@ def __init__( self.interactions_to_evaluate = list(range(self.num_interactions)) self.iterations_to_evaluate = self.interactions_to_evaluate - if isinstance(ground_truth_dataset, dataset.Dataset): + if isinstance(ground_truth_dataset, Dataset): self.ground_truth_consumption_matrix = scipy.sparse.csr_matrix( ( self.ground_truth_dataset.data[:, 2], diff --git a/irec/utils/DatasetLoader.py b/irec/utils/DatasetLoader.py deleted file mode 100644 index 90ea3ce0..00000000 --- a/irec/utils/DatasetLoader.py +++ /dev/null @@ -1,133 +0,0 @@ -from . import dataset -import random -import numpy as np - - -class DatasetLoader: - pass - -class DefaultDatasetLoader: - def __init__(self, dataset_path, prefiltering, splitting, random_seed) -> None: - self.dataset_path = dataset_path - self.prefiltering = prefiltering - self.test_consumes = prefiltering["test_consumes"] - self.strategy = splitting["strategy"] - self.train_size = splitting["train_size"] - self.random_seed = random_seed - - def load(self): - random.seed(self.random_seed) - np.random.seed(self.random_seed) - - default_processor = dataset.DefaultDataset() - data = default_processor.process(self.dataset_path) - data = default_processor.prefiltering(data, self.prefiltering) - - traintest_processor = dataset.TrainTestConsumption( - strategy=self.strategy, - test_consumes=self.test_consumes, - train_size=self.train_size, - ) - res = traintest_processor.process(data) - return res - -class DefaultValidationDatasetLoader: - def __init__(self, dataset_path, prefiltering, splitting , random_seed) -> None: - self.dataset_path = dataset_path - self.prefiltering = prefiltering - self.test_consumes = prefiltering["test_consumes"] - self.strategy = splitting["strategy"] - self.train_size = splitting["train_size"] - self.random_seed = random_seed - - def load(self): - random.seed(self.random_seed) - np.random.seed(self.random_seed) - - default_processor = dataset.DefaultDataset() - data = default_processor.process(self.dataset_path) - data = default_processor.prefiltering(data, self.prefiltering) - - traintest_processor = dataset.TrainTestConsumption( - strategy=self.strategy, - test_consumes=self.test_consumes, - train_size=self.train_size, - ) - res = traintest_processor.process(traintest_processor.process(data).train) - - return res - -class ML100kDatasetLoader: - def __init__( - self, dataset_path, crono, random_seed, test_consumes, train_size - ) -> None: - self.dataset_path = dataset_path - self.crono = crono - self.random_seed = random_seed - self.test_consumes = test_consumes - self.train_size = train_size - - def load(self): - np.random.seed(self.random_seed) - random.seed(self.random_seed) - ml100k_processor = dataset.MovieLens100k() - data = ml100k_processor.process(self.dataset_path) - - traintest_processor = dataset.TrainTestConsumption( - crono=self.crono, - test_consumes=self.test_consumes, - train_size=self.train_size, - ) - res = traintest_processor.process(data) - return res - -class TRTEDatasetLoader: - def __init__( - self, - dataset_path - # self, dataset_path, crono, random_seed, test_consumes, train_size - ) -> None: - self.dataset_path = dataset_path - # self.crono = crono - # self.random_seed = random_seed - # self.test_consumes = test_consumes - # self.train_size = train_size - - def load(self): - # np.random.seed(self.random_seed) - trte_processor = dataset.TRTE() - data = trte_processor.process(self.dataset_path) - res = data - # traintest_processor = dataset.TrainTestConsumption( - # crono=self.crono, - # test_consumes=self.test_consumes, - # train_size=self.train_size, - # ) - # res = traintest_processor.process(data) - return res - - -class TRTEValidationDatasetLoader: - def __init__( - self, dataset_path, crono, random_seed, test_consumes, train_size - ) -> None: - self.dataset_path = dataset_path - self.crono = crono - self.random_seed = random_seed - self.test_consumes = test_consumes - self.train_size = train_size - - def load(self): - np.random.seed(self.random_seed) - random.seed(self.random_seed) - trte_processor = dataset.TRTE() - data = trte_processor.process(self.dataset_path) - - traintest_processor = dataset.TrainTestConsumption( - crono=self.crono, - test_consumes=self.test_consumes, - train_size=self.train_size, - ) - res = traintest_processor.process(data.train) - - return res diff --git a/irec/utils/Factory.py b/irec/utils/Factory.py index 47112cfe..127fdcf3 100644 --- a/irec/utils/Factory.py +++ b/irec/utils/Factory.py @@ -1,6 +1,6 @@ -from . import DatasetLoader -import irec.utils.dataset as dataset +# from irec.environment.loader.full_data import DatasetLoader import irec.agents +from irec.environment.registry import LoaderRegistry class Factory: @@ -124,5 +124,5 @@ class DatasetLoaderFactory(Factory): def create(self, dataset_name, dataset_settings): dataset_class_name = list(dataset_settings.keys())[0] dataset_parameters = list(dataset_settings.values())[0] - dataset_class = eval("irec.utils.DatasetLoader." + dataset_class_name) - return dataset_class(**dataset_settings[dataset_class_name]) + dataset_class = LoaderRegistry.get(dataset_class_name)(**dataset_settings[dataset_class_name]) + return dataset_class \ No newline at end of file diff --git a/irec/utils/dataset.py b/irec/utils/dataset.py deleted file mode 100644 index 1f7a010e..00000000 --- a/irec/utils/dataset.py +++ /dev/null @@ -1,573 +0,0 @@ -import pandas as pd -from irec import value_functions -import numpy as np -import irec.value_functions -import random -import scipy.sparse -import os -import numpy as np -import os -from copy import copy - - -def _si(x): - du0 = np.sort(np.unique(x)) - ind0 = np.searchsorted(du0, x) - return ind0 - - -class DatasetPreprocessor: - def __init__(self, name, dataset_descriptor, preprocessor, *args, **kwargs): - super().__init__(*args, **kwargs) - self.name = name - self.dataset_descriptor = dataset_descriptor - self.preprocessor = preprocessor - - -class Pipeline: - def __init__(self, steps=None, *args, **kwargs): - super().__init__(*args, **kwargs) - if steps is None: - self.steps = [] - else: - self.steps = steps - - def process(self, data): - buf = data - for element in self.steps: - buf = element.process(buf) - return buf - - -class DatasetDescriptor: - def __init__(self, dataset_dir, *args, **kwargs): - super().__init__(*args, **kwargs) - self.dataset_dir = dataset_dir - - -class Dataset: - def __init__( - self, - data, - num_total_users=None, - num_total_items=None, - num_users=None, - num_items=None, - rate_domain=None, - uids=None, - ): - self.data = data - self.num_users = num_users - self.num_items = num_items - self.rate_domain = rate_domain - self.uids = uids - self.num_total_users = num_total_users - self.num_total_items = num_total_items - - def update_from_data(self): - self.num_users = len(np.unique(self.data[:, 0])) - self.num_items = len(np.unique(self.data[:, 1])) - self.rate_domain = set(np.unique(self.data[:, 2])) - self.uids = np.unique(self.data[:, 0]).astype(int) - self.iids = np.unique(self.data[:, 1]).astype(int) - self.max_uid = np.max(self.uids) - self.max_iid = np.max(self.iids) - self.mean_rating = np.mean(self.data[:, 2]) - self.min_rating = np.min(self.data[:, 2]) - self.max_rating = np.max(self.data[:, 2]) - - def update_num_total_users_items(self): - # self.num_total_users = self.num_users - # self.num_total_items = self.num_items - self.num_total_users = self.max_uid + 1 - self.num_total_items = self.max_iid + 1 - # print(self.num_total_users) - # print(self.num_total_items) - - # self.consumption_matrix = scipy.sparse.csr_matrix((self.data[:,2],(self..data[:,0],self.train_dataset.data[:,1])),(self.train_dataset.users_num,self.train_dataset.items_num)) - - -class TrainTestDataset: - def __init__(self, train, test): - self.train = train - self.test = test - - -class DataProcessor: - def __init__(self, *args, **kwargs): - del args, kwargs - - -class TRTE(DataProcessor): - def process(self, dataset_dir): - # dataset_dir = dataset_descriptor.dataset_dir - train_data = np.loadtxt(os.path.join(dataset_dir, "train.data"), delimiter="::") - test_data = np.loadtxt(os.path.join(dataset_dir, "test.data"), delimiter="::") - - dataset = Dataset(np.vstack([train_data, test_data])) - dataset.update_from_data() - dataset.update_num_total_users_items() - train_dataset = copy(dataset) - train_dataset.data = train_data - train_dataset.update_from_data() - test_dataset = copy(dataset) - test_dataset.data = test_data - test_dataset.update_from_data() - return TrainTestDataset(train=train_dataset, test=test_dataset) - - -class TRTEPopular(DataProcessor): - def __init__(self, items_rate, *args, **kwargs): - super().__init__(*args, **kwargs) - self.items_rate = items_rate - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - data = np.vstack((test_dataset.data, train_dataset.data)) - dataset = Dataset(data) - dataset.update_from_data() - dataset.update_num_total_users_items() - num_items_to_sample = int(self.items_rate * dataset.num_total_items) - consumption_matrix = scipy.sparse.csr_matrix( - (dataset.data[:, 2], (dataset.data[:, 0], dataset.data[:, 1])), - (dataset.num_total_users, dataset.num_total_items), - ) - items_popularity = irec.value_functions.MostPopular.get_items_popularity( - consumption_matrix - ) - top_popular_items = np.argsort(items_popularity)[::-1][num_items_to_sample] - test_dataset.data = test_dataset.data[ - test_dataset.data[:, 1].isin(top_popular_items) - ] - test_dataset.update_from_data() - train_dataset.data = train_dataset.data[ - train_dataset.data[:, 1].isin(top_popular_items) - ] - train_dataset.update_from_data() - - # train_dataset.data[train_dataset.data[:,1].isin(top_popular_items)] - - # train_dataset, test_dataset = ttc.process(train_dataset) - return train_dataset, test_dataset - - -class TRTERandom(DataProcessor): - def __init__( - self, min_ratings, random_seed, probability_keep_item, *args, **kwargs - ): - super().__init__(*args, **kwargs) - self.min_ratings = min_ratings - self.random_seed = random_seed - self.probability_keep_item = probability_keep_item - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - # ttc = TrainTestConsumption(self.train_size, self.test_consumes, - # self.crono, self.random_seed) - # train_dataset, test_dataset = ttc.process(train_dataset) - return train_dataset, test_dataset - - -class MovieLens100k(DataProcessor): - def process(self, dataset_dir): - data = np.loadtxt(os.path.join(dataset_dir, "u.data"), delimiter="\t") - data[:, 0] = data[:, 0] - 1 - data[:, 1] = data[:, 1] - 1 - dataset = Dataset(data) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - -class DefaultDataset(DataProcessor): - - def filter_users(self, df_dataset, filters): - if filters == None: return df_dataset - def min_consumption(df_dataset, min_consumption): - selected_users = dict(df_dataset.groupby(0)[1].agg("count")[lambda consumption: consumption >= min_consumption]) - return df_dataset[df_dataset[0].isin(selected_users)] - - def num_users(df_dataset, num_users): - try: selected_users = random.sample(list(df_dataset[0].unique()), num_users) - except: return df_dataset - return df_dataset[df_dataset[0].isin(selected_users)] - - for filter_user in filters: - df_dataset = eval(filter_user)(df_dataset, filters[filter_user]) - - return df_dataset - - def filter_items(self, df_dataset, filters): - - def min_ratings(df_dataset, min_ratings): - selected_items = dict(df_dataset.groupby(1)[0].agg("count")[lambda ratings: ratings >= min_ratings]) - return df_dataset[df_dataset[1].isin(selected_items)] - - def num_items(df_dataset, num_items): - try: selected_items = random.sample(list(df_dataset[1].unique()), num_items) - except: return df_dataset - return df_dataset[df_dataset[1].isin(selected_items)] - - for filter_item in filters: - df_dataset = eval(filter_item)(df_dataset, filters[filter_item]) - - return df_dataset - - - def prefiltering(self, ds, filters): - - del filters["test_consumes"] - if len(filters) == 0: return ds - data_df = pd.DataFrame(ds.data) - - print("Applying filters...") - for key, filters in filters.items(): - print("\t", key, filters) - data_df = eval(f"self.{key}")(data_df, filters) - - dataset = Dataset(data_df.to_numpy()) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - def process(self, dataset_dir): - data = np.loadtxt( - os.path.join(dataset_dir, "ratings.csv"), delimiter=",", skiprows=1 - ) - - data[:, 0] = _si(data[:, 0]) - data[:, 1] = _si(data[:, 1]) - - dataset = Dataset(data) - dataset.update_from_data() - dataset.update_num_total_users_items() - # print(max(dataset.uids), dataset.num_total_users) - return dataset - - -class MovieLens1M(DataProcessor): - def process(self, dataset_dir): - data = np.loadtxt(os.path.join(dataset_dir, "ratings.dat"), delimiter="::") - iids = dict() - for i, iid in enumerate(np.unique(data[:, 1])): - iids[iid] = i - data[:, 1] = np.vectorize(lambda x: iids[x])(data[:, 1]) - data[:, 0] = data[:, 0] - 1 - dataset = Dataset(data) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - -class MovieLens10M(DataProcessor): - def process(self, dataset_dir): - data = np.loadtxt(os.path.join(dataset_dir, "ratings.dat"), delimiter="::") - iids = dict() - for i, iid in enumerate(np.unique(data[:, 1])): - iids[iid] = i - data[:, 1] = np.vectorize(lambda x: iids[x])(data[:, 1]) - data[:, 0] = data[:, 0] - 1 - dataset = Dataset(data) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - -class Netflix: - def _netflix_read_ratings(self, fileName): - file = open(fileName, "r") - file.readline() - numratings = np.sum([1 for line in open(fileName)]) - usersId = np.zeros(numratings, dtype=np.int32) - itemsId = np.zeros(numratings, dtype=np.int32) - timestamp = np.zeros(numratings, dtype=np.uint64) - ratings = np.zeros(numratings, dtype=np.uint8) - - file = open(fileName, "r") - file.readline() - cont = 0 - for row in file: - values = row.split(",") - uid, iid, rating, ts = ( - int(float(values[0])), - int(float(values[1])), - values[2], - int(float(values[3].replace("\n", ""))), - ) - usersId[cont] = uid - itemsId[cont] = iid - ratings[cont] = rating - timestamp[cont] = ts - cont += 1 - - file.close() - return usersId, itemsId, ratings, timestamp, numratings - - def process(self, dataset_dir): - # base_dir = self.BASES_DIRS[self.base] - # u_train, i_train, r_train, t_train, numr_train = _netflix_read_ratings( - # dataset_dir + "train.data" - # ) - # u_test, i_test, r_test, t_test, numr_test = _netflix_read_ratings( - # dataset_dir + "test.data" - # ) - # test_data = np.array((u_test, i_test, r_test, t_test)) - # train_data = np.array((u_train, i_train, r_train, t_train)) - - usersId, itemsId, ratings, timestamp, numratings = self._netflix_read_ratings( - dataset_dir + "ratings.csv" - ) - dataset = np.array([usersId, itemsId, ratings, timestamp]).T - # return train_data, test_data - dataset = Dataset(dataset) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - -class TrainTestConsumption(DataProcessor): - def __init__(self, train_size, test_consumes, strategy, *args, **kwargs): - super().__init__(*args, **kwargs) - self.train_size = train_size - self.test_consumes = test_consumes - self.strategy = strategy - - def splitting(self, strategy, data_df, num_test_users): - - users_items_consumed = data_df.groupby(0).count().iloc[:, 0] - test_candidate_users = list( - users_items_consumed[users_items_consumed >= self.test_consumes] - .to_dict() - .keys() - ) - - if strategy == "temporal": - test_candidate_users = np.array(test_candidate_users, dtype=int) - users_start_time = data_df.groupby(0).min()[3].to_numpy() - test_uids = np.array( - list( - test_candidate_users[ - list( - reversed(np.argsort(users_start_time[test_candidate_users])) - ) - ] - )[:num_test_users] - ) - elif strategy == "random": - test_uids = np.array(random.sample(test_candidate_users, k=num_test_users)) - else: - raise ValueError(f'The split strategy {strategy} does not exist', 'temporal', 'random') - - train_uids = np.array(list(set(range(len(data_df[0].unique()))) - set(test_uids))) - - return train_uids, test_uids - - def process(self, ds): - data = ds.data - - data[:, 0] = _si(data[:, 0]) - data[:, 1] = _si(data[:, 1]) - - ds = Dataset(data) - ds.update_from_data() - ds.update_num_total_users_items() - - num_users = len(np.unique(data[:, 0])) - num_train_users = round(num_users * (self.train_size)) - num_test_users = int(num_users - num_train_users) - data_df = pd.DataFrame(data) - - train_uids, test_uids = self.splitting(self.strategy, data_df, num_test_users) - - data_isin_test_uids = np.isin(data[:, 0], test_uids) - - train_dataset = copy(ds) - train_dataset.data = data[~data_isin_test_uids, :] - ds.update_from_data() - test_dataset = copy(ds) - test_dataset.data = data[data_isin_test_uids, :] - ds.update_from_data() - print("Test shape:", test_dataset.data.shape) - print("Train shape:", train_dataset.data.shape) - return TrainTestDataset(train=train_dataset, test=test_dataset) - - -class TRTETrainValidation(DataProcessor): - def __init__(self, train_size, test_consumes, crono, random_seed, *args, **kwargs): - super().__init__(*args, **kwargs) - self.train_size = train_size - self.test_consumes = test_consumes - self.crono = crono - self.random_seed = random_seed - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - ttc = TrainTestConsumption( - self.train_size, self.test_consumes, self.crono, self.random_seed - ) - train_dataset, test_dataset = ttc.process(train_dataset) - return train_dataset, test_dataset - - -class TRTESample(DataProcessor): - def __init__(self, items_rate, sample_method, *args, **kwargs): - super().__init__(*args, **kwargs) - self.items_rate = items_rate - self.sample_method = sample_method - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - dataset = Dataset(np.vstack([train_dataset.data, test_dataset.data])) - dataset.update_from_data() - dataset.update_num_total_users_items() - consumption_matrix = scipy.sparse.csr_matrix( - (dataset.data[:, 2], (dataset.data[:, 0], dataset.data[:, 1])), - (dataset.num_total_users, dataset.num_total_items), - ) - num_items_to_sample = int(self.items_rate * dataset.num_total_items) - if self.sample_method == "entropy": - items_values = irec.value_functions.Entropy.Entropy.get_items_entropy( - consumption_matrix - ) - elif self.sample_method == "popularity": - items_values = ( - irec.value_functions.MostPopular.MostPopular.get_items_popularity( - consumption_matrix - ) - ) - - best_items = np.argpartition(items_values, -num_items_to_sample)[ - -num_items_to_sample: - ] - dataset.data = dataset.data[np.isin(dataset.data[:, 1], best_items), :] - - new_iids = dict() - for i, iid in enumerate(np.unique(dataset.data[:, 1])): - new_iids[iid] = i - for i in range(len(dataset.data)): - dataset.data[i, 1] = new_iids[dataset.data[i, 1]] - - dataset.update_from_data() - dataset.update_num_total_users_items() - - train_uids = train_dataset.uids - test_uids = test_dataset.uids - - train_dataset = copy(dataset) - train_dataset.data = dataset.data[np.isin(dataset.data[:, 0], train_uids)] - train_dataset.update_from_data() - - test_dataset = copy(dataset) - test_dataset.data = dataset.data[np.isin(dataset.data[:, 0], test_uids)] - test_dataset.update_from_data() - return train_dataset, test_dataset - - -class PopularityFilter(DataProcessor): - def __init__(self, keep_popular, num_items_threshold, *args, **kwargs): - super().__init__(*args, **kwargs) - self.keep_popular = keep_popular - self.num_items_threshold = num_items_threshold - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - dataset = Dataset(np.vstack([train_dataset.data, test_dataset.data])) - dataset.update_from_data() - dataset.update_num_total_users_items() - consumption_matrix = scipy.sparse.csr_matrix( - (dataset.data[:, 2], (dataset.data[:, 0], dataset.data[:, 1])), - (dataset.num_total_users, dataset.num_total_items), - ) - # num_items_to_sample = int(self.items_rate * dataset.num_total_items) - items_values = irec.value_functions.MostPopular.get_items_popularity( - consumption_matrix - ) - items_sorted = np.argsort(items_values)[::-1] - if self.keep_popular: - items_to_keep = items_sorted[: self.num_items_threshold] - else: - items_to_keep = items_sorted[self.num_items_threshold :] - - dataset.data = dataset.data[np.isin(dataset.data[:, 1], items_to_keep), :] - - new_iids = dict() - for i, iid in enumerate(np.unique(dataset.data[:, 1])): - new_iids[iid] = i - for i in range(len(dataset.data)): - dataset.data[i, 1] = new_iids[dataset.data[i, 1]] - new_uids = dict() - for i, uid in enumerate(np.unique(dataset.data[:, 0])): - new_uids[uid] = i - for i in range(len(dataset.data)): - dataset.data[i, 0] = new_uids[dataset.data[i, 0]] - - dataset.update_from_data() - dataset.update_num_total_users_items() - - # train_uids = train_dataset.uids - # test_uids = test_dataset.uids - - # train_dataset = copy(dataset) - # train_dataset.data = dataset.data[np.isin(dataset.data[:, 0], - # train_uids)] - # train_dataset.update_from_data() - - # test_dataset = copy(dataset) - # test_dataset.data = dataset.data[np.isin(dataset.data[:, 0], test_uids)] - # test_dataset.update_from_data() - return dataset - - -class CombineTrainTest(DataProcessor): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - dataset = Dataset(np.vstack([train_dataset.data, test_dataset.data])) - dataset.update_from_data() - dataset.update_num_total_users_items() - return dataset - - -class PopRemoveEnt(DataProcessor): - def __init__(self, num_items_threshold, new_rating, *args, **kwargs): - super().__init__(*args, **kwargs) - # self.keep_popular = keep_popular - self.num_items_threshold = num_items_threshold - self.new_rating = new_rating - - def process(self, train_dataset_and_test_dataset): - train_dataset = train_dataset_and_test_dataset[0] - test_dataset = train_dataset_and_test_dataset[1] - dataset = Dataset(np.vstack([train_dataset.data, test_dataset.data])) - dataset.update_from_data() - dataset.update_num_total_users_items() - consumption_matrix = scipy.sparse.csr_matrix( - (dataset.data[:, 2], (dataset.data[:, 0], dataset.data[:, 1])), - (dataset.num_total_users, dataset.num_total_items), - ) - # num_items_to_sample = int(self.items_rate * dataset.num_total_items) - items_values = irec.value_functions.MostPopular.get_items_popularity( - consumption_matrix - ) - items_sorted = np.argsort(items_values)[::-1] - # if self.keep_popular: - items_to_keep = items_sorted[: self.num_items_threshold] - # else: - # items_to_keep = items_sorted[self.num_items_threshold:] - train_dataset.data[ - np.isin(train_dataset.data[:, 1], items_to_keep), 2 - ] = self.new_rating - test_dataset.data[ - np.isin(test_dataset.data[:, 1], items_to_keep), 2 - ] = self.new_rating - # dataset.update_from_data() - # dataset.update_num_total_users_items() - return train_dataset, test_dataset diff --git a/irec/value_functions/MostPopular.py b/irec/value_functions/MostPopular.py index 31a5f8b8..0aa88b27 100644 --- a/irec/value_functions/MostPopular.py +++ b/irec/value_functions/MostPopular.py @@ -45,6 +45,8 @@ def reset(self, observation): def action_estimates(self, candidate_actions): uid = candidate_actions[0] candidate_items = candidate_actions[1] + # print("items pop:", len(self.items_popularity), self.items_popularity, "\n") + # print("candidate items:", len(candidate_items), candidate_items) items_score = self.items_popularity[candidate_items] return items_score, None diff --git a/requirements_dev.txt b/requirements_dev.txt index f9abe375..8598fe42 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,3 +5,5 @@ flake8>=3.8 tox>=3.24 types-PyYAML types-cachetools +behave_pandas +behave diff --git a/setup.cfg b/setup.cfg index 2d306101..79b47ca0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,4 +63,4 @@ nn = irec = py.typed [flake8] -max-line-length = 180 +max-line-length = 120 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bdd/__init__.py b/tests/bdd/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bdd/features/environment/filter/filtering_by_items.feature b/tests/bdd/features/environment/filter/filtering_by_items.feature new file mode 100644 index 00000000..bd349f7e --- /dev/null +++ b/tests/bdd/features/environment/filter/filtering_by_items.feature @@ -0,0 +1,50 @@ +Feature: These are the scenarios to filter the dataset by items + + Scenario: test min_ratings if the comparison is greater or greater and equal + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + | 2 | 1 | 4.5 | 104 | + | 3 | 1 | 4.5 | 105 | + | 2 | 3 | 4.5 | 106 | + And using the minimum of expected ratings as "2" + When filtered by items with min_ratings + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + | 2 | 1 | 4.5 | 104 | + | 3 | 1 | 4.5 | 105 | + + Scenario: test min_ratings when it results in a empty df + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + And using the minimum of expected ratings as "5" + When filtered by items with min_ratings + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + + Scenario: test num_items selection + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + And using the number of items as "1" + And random seed equals to "0" + When filtered by items with num_items + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 1 | 4.0 | 102 | diff --git a/tests/bdd/features/environment/filter/filtering_by_users.feature b/tests/bdd/features/environment/filter/filtering_by_users.feature new file mode 100644 index 00000000..0cd8ba6e --- /dev/null +++ b/tests/bdd/features/environment/filter/filtering_by_users.feature @@ -0,0 +1,50 @@ +Feature: These are the scenarios to filter the dataset by users + + Scenario: test min_consumption if the comparison is greater or greater and equal + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + | 2 | 1 | 4.5 | 104 | + | 3 | 1 | 4.5 | 105 | + | 2 | 3 | 4.5 | 106 | + And using the minimum of expected ratings as "2" + When filtered by users with min_consumption + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + | 2 | 1 | 4.5 | 104 | + | 2 | 3 | 4.5 | 106 | + + Scenario: test min_consumption when it results in a empty df + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + And using the minimum of expected ratings as "5" + When filtered by users with min_consumption + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + + Scenario: test num_users selection + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 2 | 3.5 | 101 | + | 1 | 1 | 4.0 | 102 | + | 2 | 2 | 4.5 | 103 | + And using the number of users as "1" + And random seed equals to "0" + When filtered by users with num_users + Then the output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 2 | 2 | 4.5 | 103 | diff --git a/tests/bdd/features/environment/split/randomised.feature b/tests/bdd/features/environment/split/randomised.feature new file mode 100644 index 00000000..f8d85e90 --- /dev/null +++ b/tests/bdd/features/environment/split/randomised.feature @@ -0,0 +1,28 @@ +Feature: These are the scenarios to filter the dataset randomly + + Scenario: test random users selection + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 0 | 1 | 3.5 | 101 | + | 0 | 0 | 4.0 | 102 | + | 1 | 1 | 4.5 | 103 | + | 1 | 0 | 4.5 | 104 | + | 2 | 0 | 4.5 | 105 | + | 1 | 2 | 4.5 | 106 | + And using the train size as "0.8" + And using the tests consumes as "1" + And random seed equals to "0" + When split randomly + Then the train output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 1 | 1 | 4.5 | 103 | + | 1 | 0 | 4.5 | 104 | + | 2 | 0 | 4.5 | 105 | + | 1 | 2 | 4.5 | 106 | + And the test output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 0 | 1 | 3.5 | 101 | + | 0 | 0 | 4.0 | 102 | diff --git a/tests/bdd/features/environment/split/temporal.feature b/tests/bdd/features/environment/split/temporal.feature new file mode 100644 index 00000000..51110c1b --- /dev/null +++ b/tests/bdd/features/environment/split/temporal.feature @@ -0,0 +1,28 @@ +Feature: These are the scenarios to filter the dataset temporally + + Scenario: test temporal users selection + Given a dataset containing + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 0 | 1 | 3.5 | 101 | + | 0 | 0 | 4.0 | 102 | + | 1 | 1 | 4.5 | 103 | + | 1 | 0 | 4.5 | 104 | + | 2 | 0 | 4.5 | 105 | + | 1 | 2 | 4.5 | 106 | + And using the train size as "0.8" + And using the tests consumes as "1" + And random seed equals to "0" + When split temporal + Then the train output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 0 | 1 | 3.5 | 101 | + | 0 | 0 | 4.0 | 102 | + | 1 | 1 | 4.5 | 103 | + | 1 | 0 | 4.5 | 104 | + | 1 | 2 | 4.5 | 106 | + And the test output should be equal to + | int | int | float | int | + | userId | itemId | rating | timestamp | + | 2 | 0 | 4.5 | 105 | diff --git a/tests/bdd/steps/__init__.py b/tests/bdd/steps/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bdd/steps/given.py b/tests/bdd/steps/given.py new file mode 100644 index 00000000..6d162892 --- /dev/null +++ b/tests/bdd/steps/given.py @@ -0,0 +1,38 @@ +from behave import given, step +from behave_pandas import table_to_dataframe + + +@given('a dataset containing') +def step_impl(context): + context.input = context.table + context.input_df = table_to_dataframe(context.input) + + +@step('using the minimum of expected ratings as "{min_ratings}"') +def step_impl(context, min_ratings): + context.min_ratings = int(min_ratings) + + +@step('using the number of items as "{num_items}"') +def step_impl(context, num_items): + context.num_items = int(num_items) + + +@step('using the number of users as "{num_users}"') +def step_impl(context, num_users): + context.num_users = int(num_users) + + +@step('random seed equals to "{random_seed}"') +def step_impl(context, random_seed): + context.random_seed = int(random_seed) + + +@step('using the train size as "{train_size}"') +def step_impl(context, train_size): + context.train_size = float(train_size) + + +@step('using the tests consumes as "{test_consumes}"') +def step_impl(context, test_consumes): + context.test_consumes = int(test_consumes) diff --git a/tests/bdd/steps/then.py b/tests/bdd/steps/then.py new file mode 100644 index 00000000..a4626f67 --- /dev/null +++ b/tests/bdd/steps/then.py @@ -0,0 +1,29 @@ +import numpy as np + +from behave import then, step +from behave_pandas import table_to_dataframe + + +@then('the output should be equal to') +def step_impl(context): + context.expected_df = table_to_dataframe(context.table) + assert np.array_equal(context.result, context.expected_df), f"The result actually is: \n {context.result}" + + +@then('the array output should be equal to') +def step_impl(context): + context.expected_df = table_to_dataframe(context.table) + context.expected_array = np.array(context.expected_df["userId"]) + assert np.array_equal(context.result, context.expected_array), f"The result actually is: \n {context.result}" + + +@then("the train output should be equal to") +def step_impl(context): + context.expected_train_df = table_to_dataframe(context.table) + assert np.array_equal(context.train_df, context.expected_train_df), f"The result actually is: \n {context.train_df}" + + +@step("the test output should be equal to") +def step_impl(context): + context.expected_test_df = table_to_dataframe(context.table) + assert np.array_equal(context.test_df, context.expected_test_df), f"The result actually is: \n {context.test_df}" diff --git a/tests/bdd/steps/when.py b/tests/bdd/steps/when.py new file mode 100644 index 00000000..a24e2208 --- /dev/null +++ b/tests/bdd/steps/when.py @@ -0,0 +1,84 @@ +import random + +import numpy as np +import pandas as pd +from behave import when + +from irec.environment.dataset import Dataset +from irec.environment.filter.filtering_by_items import FilteringByItems +from irec.environment.filter.filtering_by_users import FilteringByUsers +from irec.environment.split.randomised import Random +from irec.environment.split.temporal import Temporal + + +@when('filtered by items with min_ratings') +def step_impl(context): + context.result = FilteringByItems.min_ratings(context.input_df, context.min_ratings) + + +@when('filtered by items with num_items') +def step_impl(context): + random.seed(context.random_seed) + context.result = FilteringByItems.num_items(context.input_df, context.num_items) + + +@when('filtered by users with min_consumption') +def step_impl(context): + context.result = FilteringByUsers.min_consumption(context.input_df, context.min_ratings) + + +@when('filtered by users with num_users') +def step_impl(context): + context.result = FilteringByUsers.num_users(context.input_df, context.num_users) + + +@when('split randomly') +def step_impl(context): + # parameters + num_users = len(context.input_df["userId"].unique()) + num_train_users = round(num_users * context.train_size) + num_test_users = int(num_users - num_train_users) + dtypes = {"userId": int, + "itemId": int, + "rating": float, + "timestamp": int} + # dataset + dataset = Dataset(data=context.input_df.to_numpy()) + dataset.reset_index() + # run metric + r = Random(train_size=context.train_size, + test_consumes=context.test_consumes) + test_uids = r.get_test_uids(dataset.data, num_test_users) + train_dataset, test_dataset = r.split_dataset(dataset.data, test_uids) + # train + context.train_df = pd.DataFrame(train_dataset.data, columns=dtypes.keys()) + context.train_df = context.train_df.astype(dtypes) + # test + context.test_df = pd.DataFrame(test_dataset.data, columns=dtypes.keys()) + context.test_df = context.test_df.astype(dtypes) + + +@when('split temporal') +def step_impl(context): + # parameters + num_users = len(context.input_df["userId"].unique()) + num_train_users = round(num_users * context.train_size) + num_test_users = int(num_users - num_train_users) + dtypes = {"userId": int, + "itemId": int, + "rating": float, + "timestamp": int} + # dataset + dataset = Dataset(data=context.input_df.to_numpy()) + dataset.reset_index() + # run metric + t = Temporal(train_size=context.train_size, + test_consumes=context.test_consumes) + test_uids = t.get_test_uids(dataset.data, num_test_users) + train_dataset, test_dataset = t.split_dataset(dataset.data, test_uids) + # train + context.train_df = pd.DataFrame(train_dataset.data, columns=dtypes.keys()) + context.train_df = context.train_df.astype(dtypes) + # test + context.test_df = pd.DataFrame(test_dataset.data, columns=dtypes.keys()) + context.test_df = context.test_df.astype(dtypes)