From d4d0dfc924044327aee932a461f7b8bf230e2dfe Mon Sep 17 00:00:00 2001 From: Shahar Bar <33932594+shaharbar1@users.noreply.github.com> Date: Thu, 5 Sep 2024 15:28:40 +0300 Subject: [PATCH] Add offline policy evaluation module and update dependencies ### Changes * Introduced `offline_policy_evaluator.py` with classes for propensity score estimation and offline policy evaluation. * Updated `pyproject.toml` to include new dependencies: `bokeh`, `obp` and `optuna`. Further adjusted existing dependencies to compatible versions. * Added class method to PyBanditsBaseModel on base.py to allow seeing default values for arguments that were not passed to the model. * Added test_offline_policy_evaluator.py as a test suite for the OfflinePolicyEvaluator. * Added `visualize_via_bokeh` and `in_jupyter_notebook` utility functions. --- pybandits/base.py | 4 + pybandits/offline_policy_evaluator.py | 1072 ++++++++++++++++++++++++ pybandits/utils.py | 64 +- pyproject.toml | 16 +- tests/test_offline_policy_evaluator.py | 310 +++++++ 5 files changed, 1459 insertions(+), 7 deletions(-) create mode 100644 pybandits/offline_policy_evaluator.py create mode 100644 tests/test_offline_policy_evaluator.py diff --git a/pybandits/base.py b/pybandits/base.py index 97e42d8..bf195d4 100644 --- a/pybandits/base.py +++ b/pybandits/base.py @@ -48,6 +48,10 @@ class PyBanditsBaseModel(BaseModel, extra="forbid"): BaseModel of the PyBandits library. """ + @classmethod + def _get_value(cls, key: str, values: Dict[str, Any]) -> Any: + return values.get(key, cls.model_fields[key].default) + class Model(PyBanditsBaseModel, ABC): """ diff --git a/pybandits/offline_policy_evaluator.py b/pybandits/offline_policy_evaluator.py new file mode 100644 index 0000000..7cf319e --- /dev/null +++ b/pybandits/offline_policy_evaluator.py @@ -0,0 +1,1072 @@ +import os +from copy import deepcopy +from functools import partial +from itertools import product +from math import floor +from multiprocessing import Pool, cpu_count +from typing import Any, Dict, List, Literal, Optional, Tuple, Union + +import numpy as np +import optuna +import pandas as pd +from bokeh.models import ColumnDataSource, TabPanel +from bokeh.plotting import figure +from loguru import logger +from obp.ope import OffPolicyEvaluation +from obp.ope.estimators import ( + BalancedInverseProbabilityWeighting, # BIPW + BaseOffPolicyEstimator, + DirectMethod, # DM + DoublyRobust, # DR + DoublyRobustWithShrinkage, # DRos + InverseProbabilityWeighting, # IPW + SelfNormalizedDoublyRobust, # SNDR + SelfNormalizedInverseProbabilityWeighting, # SNIPW + SubGaussianDoublyRobust, # Sub-DR + SubGaussianInverseProbabilityWeighting, # Sub-IPW + SwitchDoublyRobust, # Switch-DR +) +from optuna import Trial +from pydantic import NonNegativeInt, PositiveInt, PrivateAttr, field_validator, model_validator, validate_call +from sklearn.base import ClassifierMixin, TransformerMixin +from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import cross_val_score +from sklearn.neural_network import MLPClassifier +from sklearn.preprocessing import LabelEncoder, OneHotEncoder +from tqdm import tqdm +from typing_extensions import Self + +from pybandits.base import ActionId, BaseMab, Float01, PyBanditsBaseModel +from pybandits.utils import ( + in_jupyter_notebook, + visualize_via_bokeh, +) + +optuna.logging.enable_propagation() # Propagate logs to the root logger. +optuna.logging.disable_default_handler() # Stop showing logs in sys.stderr. + + +class _FunctionEstimator(PyBanditsBaseModel, ClassifierMixin, arbitrary_types_allowed=True): + """ + This class provides functionality for model optimization using hyperparameter tuning via Optuna, + and prediction with optimized or default machine learning models. + It is used to estimate the propensity score and expected reward. + + + Parameters + ---------- + estimator_type : Optional[Literal["logreg", "gbm", "rf", "mlp"]] + The model type to optimize. + + fast_fit : bool + Whether to use the default parameter set for the model. + + action_one_hot_encoder : OneHotEncoder + Fitted one hot encoder for action encoding. + + n_trials : int + Number of trials for the Optuna optimization process. + + verbose : bool + Whether to log detailed information during the optimization process. + + study_name : Optional[str] + Name of the study to be created by Optuna. + + multi_action_prediction : bool + Whether to predict for all actions or only for real action. + + """ + + estimator_type: Literal["logreg", "gbm", "rf", "mlp"] + fast_fit: bool + action_one_hot_encoder: OneHotEncoder = OneHotEncoder(sparse=False) + n_trials: int + verbose: bool + study_name: Optional[str] = None + multi_action_prediction: bool + _model: Union[LogisticRegression, GradientBoostingClassifier, RandomForestClassifier, MLPClassifier] = PrivateAttr() + _model_mapping = { + "mlp": MLPClassifier, + "rf": RandomForestClassifier, + "logreg": LogisticRegression, + "gbm": GradientBoostingClassifier, + } + + def _pre_process(self, batch: Dict[str, Any]) -> np.ndarray: + """ + Preprocess the feature vectors to be used for regression model training. + This method concatenates the context vector and action context vectors. + + Parameters + ---------- + batch : Dict[str, Any] + The batch of data containing context, action, and action context. + + Returns + ------- + np.ndarray + A concatenated array of context and action context, shape (n_rounds, n_features_context + dim_action_context). + """ + context = batch["context"] + action = batch["action_ids"] + return np.concatenate([context, self.action_one_hot_encoder.transform(action.reshape((-1, 1)))], axis=1) + + def _sample_parameter_space(self, trial: Trial) -> Dict[str, Union[str, int, float]]: + """ + Define the hyperparameter search space for a given model type in Optuna. + + The search space is dynamically selected based on the model type being optimized. + + Parameters + ---------- + trial : optuna.Trial + A single trial in the Optuna optimization process. + + Returns + ------- + dict + A dictionary representing the search space for the model's hyperparameters. + """ + + if self.estimator_type == "mlp": + return { + "hidden_layer_sizes": 2 ** trial.suggest_int("hidden_layer_sizes", 2, 6), + "activation": trial.suggest_categorical("activation", ["relu", "logistic", "tanh"]), + "solver": trial.suggest_categorical("solver", ["lbfgs", "sgd", "adam"]), + "alpha": np.sqrt(10) ** -trial.suggest_int("learning_rate_init", 0, 10), + "max_iter": 1000, + "learning_rate_init": np.sqrt(10) ** -trial.suggest_int("learning_rate_init", 0, 6), + } + elif self.estimator_type == "rf": + return { + "max_depth": trial.suggest_int("max_depth", 2, 5), + "criterion": trial.suggest_categorical("criterion", ["gini", "entropy"]), + "max_features": trial.suggest_int("max_features", 1, 3), + "n_estimators": trial.suggest_int("n_estimators", 10, 50), + "n_jobs": -1, + } + elif self.estimator_type == "logreg": + return { + "tol": trial.suggest_float("tol", 0.00001, 0.0001), + "C": trial.suggest_float("C", 0.05, 3), + "solver": trial.suggest_categorical("solver", ["newton-cg", "lbfgs", "liblinear", "sag", "saga"]), + "max_iter": 1000, + "n_jobs": -1, + } + elif self.estimator_type == "gbm": + return { + "n_estimators": trial.suggest_int("n_estimators", 10, 100), + "learning_rate": np.sqrt(10) ** -trial.suggest_int("learning_rate_init", 0, 6), + "max_depth": trial.suggest_int("max_depth", 2, 10), + } + + def _objective(self, trial: Trial, feature_set: np.ndarray, label: np.ndarray) -> float: + """ + Objective function for Optuna optimization. + + This function trains a model using cross-validation and returns the negative accuracy + to be minimized. + + Parameters + ---------- + trial : Trial + A single trial in the Optuna optimization process. + + feature_set : np.ndarray + The training dataset, containing context and encoded actions. + + label : np.ndarray + The labels for the dataset. + + Returns + ------- + score : float + The score to be maximized by Optuna. + """ + params = self._sample_parameter_space(trial) + model = self._model_mapping[self.estimator_type](**params) + score = cross_val_score(model, feature_set, label).mean() + trial.set_user_attr("model_params", params) + + return score + + def _optimize(self, feature_set: np.ndarray, label: np.ndarray, study: optuna.Study) -> dict: + """ + Optimize the model's hyperparameters using Optuna. + + Parameters + ---------- + feature_set : np.ndarray + The training dataset, containing 'context' and 'action_ids' keys. + + study : optuna.Study + The Optuna study object to store optimization results. + + Returns + ------- + best_params : dict + The best set of hyperparameters found by Optuna. + """ + + study.optimize(lambda trial: self._objective(trial, feature_set, label), n_trials=self.n_trials) + + best_params = study.best_trial.user_attrs["model_params"] + if self.verbose: + logger.info(f"Optuna best model with optimized parameters for {self.estimator_type}:\n {best_params}") + + return best_params + + @validate_call(config=dict(arbitrary_types_allowed=True)) + def fit(self, X: dict, y: np.ndarray) -> Self: + """ + Fit the model using the given dataset X and labels y. + + Parameters + ---------- + X : dict + The dataset containing 'context' and 'action_ids' keys. + y : np.ndarray + The labels for the dataset. + + Returns + ------- + self : _FunctionEstimator + The fitted model. + """ + feature_set = self._pre_process(X) + if self.fast_fit: + model_parameters = {} + else: + pruner = optuna.pruners.MedianPruner() + sampler = optuna.samplers.TPESampler(multivariate=True, group=True) + study = optuna.create_study( + direction="maximize", study_name=self.study_name, pruner=pruner, sampler=sampler + ) + model_parameters = self._optimize(feature_set, y, study) + + model = self._model_mapping[self.estimator_type](**model_parameters) + model.fit(feature_set, y) + self._model = model + return self + + @validate_call + def predict(self, X: dict) -> np.ndarray: + """ + Predict the labels for the given dataset X. + + Parameters + ---------- + X : dict + The dataset containing 'context' and 'action_ids' keys. + + Returns + ------- + prediction : np.ndarray + The predicted labels for the dataset. + """ + if not self._model: + raise AttributeError("Model has not been fitted yet.") + + if self.multi_action_prediction: + specific_action_X = X.copy() + prediction = np.empty((X["n_rounds"], len(X["unique_actions"]), 1)) + for action_index, action in enumerate(X["unique_actions"]): + specific_action_X["action_ids"] = np.array([action] * X["n_rounds"]) + specific_action_feature_set = self._pre_process(specific_action_X) + specific_action_prediction = self._model.predict_proba(specific_action_feature_set)[:, :1] + prediction[:, action_index, :] = specific_action_prediction + else: + feature_set = self._pre_process(X) + prediction = self._model.predict_proba(feature_set)[:, 0] + return prediction + + +class OfflinePolicyEvaluator(PyBanditsBaseModel, arbitrary_types_allowed=True): + """ + Class to conduct OPE with multiple OPE estimators + + Reference: https://arxiv.org/abs/2008.07146 https://github.com/st-tech/zr-obp + Open Bandit Dataset and Pipeline: Towards Realistic and Reproducible Off-Policy Evaluation + + + Parameters + ---------- + logged_data : pd.DataFrame + Logging data set + split_prop: Float01 + Proportion of dataset used as training set + propensity_score_model_type: Literal["logreg", "gbm", "rf", "mlp", "batch_empirical", "empirical", "propensity_score"] + Method used to compute/estimate propensity score pi_b (propensity_score, logging / behavioral policy). + expected_reward_model_type: Literal["logreg", "gbm", "rf", "mlp"] + Method used to estimate expected reward for each action a in the training set. + n_trials : Optional[int] + Number of trials for the Optuna optimization process. + fast_fit : bool + Whether to use the default parameter set for the function estimator models. + ope_metrics: Optional[List[BaseOffPolicyEstimator]] + List of OPE estimators used to evaluate the policy value of evaluation policy. + All available estimators are if not specified. + batch_feature: str + Column name for batch as available in logged_data + action_feature: str + Column name for action as available in logged_data + reward_feature: Union[str, List[str]] + Column name for reward as available in logged_data + contextual_features: Optional[List[str]] + Column names for contextual features as available in logged_data + cost_feature: Optional[str] + Column name for cost as available in logged_data; used for bandit with cost control + group_feature: Optional[str] + Column name for group definition feature as available in logged_data; available from simulated data + to define samples with similar contextual profile + true_reward_feature: Optional[Union[str, List[str]]] + Column names for reward proba distribution features as available in simulated logged_data. Used to compute ground truth + propensity_score_feature : Optional[str] + Column name for propensity score as available in logged_data; used for evaluation of the policy value + verbose : bool + Whether to log detailed information during the optimization process. + """ + + logged_data: pd.DataFrame + split_prop: Float01 + propensity_score_model_type: Literal[ + "logreg", "gbm", "rf", "mlp", "batch_empirical", "empirical", "propensity_score" + ] + expected_reward_model_type: Literal["logreg", "gbm", "rf", "mlp"] + importance_weights_model_type: Literal["logreg", "gbm", "rf", "mlp"] + scaler: Optional[Union[TransformerMixin, Dict[str, TransformerMixin]]] = None + n_trials: Optional[int] = 100 + fast_fit: bool = False + ope_metrics: Optional[List[BaseOffPolicyEstimator]] + batch_feature: str + action_feature: str + reward_feature: Union[str, List[str]] + contextual_features: Optional[List[str]] = None + cost_feature: Optional[str] = None + group_feature: Optional[str] = None + true_reward_feature: Optional[Union[str, List[str]]] = None + propensity_score_feature: Optional[str] = None + verbose: bool = False + _train_data: Optional[Dict[str, Any]] = PrivateAttr() + _test_data: Optional[Dict[str, Any]] = PrivateAttr() + _estimated_expected_reward: Optional[Dict[str, np.ndarray]] = None + _estimated_importance_weights: Optional[np.ndarray] = None + _action_one_hot_encoder = OneHotEncoder(sparse=False) + _propensity_score_epsilon = 1e-08 + + @field_validator("split_prop", mode="before") + @classmethod + def check_split_prop(cls, value): + if value == 0 or value == 1: + raise ValueError("split_prop should be strictly between 0 and 1") + return value + + @field_validator("reward_feature", mode="before") + @classmethod + def unify_reward_feature(cls, value): + return value if isinstance(value, list) else [value] + + @field_validator("true_reward_feature", mode="before") + @classmethod + def unify_true_reward_feature(cls, value): + return value if isinstance(value, list) else [value] if value is not None else None + + @field_validator("ope_metrics", mode="before") + @classmethod + def populate_ope_metrics(cls, value): + return ( + value + if value is not None + else [ + DirectMethod(), + InverseProbabilityWeighting(), + DoublyRobust(), + DoublyRobustWithShrinkage(), + SelfNormalizedInverseProbabilityWeighting(), + SelfNormalizedDoublyRobust(), + BalancedInverseProbabilityWeighting(), + SubGaussianDoublyRobust(), + SubGaussianInverseProbabilityWeighting(), + SwitchDoublyRobust(), + ] + ) + + @model_validator(mode="before") + @classmethod + def check_batch_feature(cls, values): + if values["batch_feature"] not in values["logged_data"]: + raise AttributeError("Batch feature missing from logged data.") + if not ( + pd.api.types.is_datetime64_ns_dtype(values["logged_data"][values["batch_feature"]]) + or pd.api.types.is_integer_dtype(values["logged_data"][values["batch_feature"]]) + ): + raise TypeError(f"Column {values['batch_feature']} should be either date or int type") + return values + + @model_validator(mode="before") + @classmethod + def check_action_feature(cls, values): + if values["action_feature"] not in values["logged_data"]: + raise AttributeError("Action feature missing from logged data.") + return values + + @model_validator(mode="after") + def check_propensity_score_estimation_method(self): + if self.propensity_score_model_type == "propensity_score": + if self.propensity_score_feature is None: + raise ValueError( + "Propensity score feature should be defined when using it as propensity_score_model_type" + ) + return self + + @model_validator(mode="after") + def check_reward_features(self): + if not all([reward in self.logged_data for reward in self.reward_feature]): + raise AttributeError("Reward feature missing from logged data.") + if self.true_reward_feature: + if not all([true_reward in self.logged_data for true_reward in self.true_reward_feature]): + raise AttributeError("True reward feature missing from logged data.") + if len(self.reward_feature) != len(self.true_reward_feature): + raise ValueError("Reward and true reward features should have the same length") + + return self + + @model_validator(mode="before") + @classmethod + def check_optional_scalar_features(cls, values): + for feature in [ + "cost_feature", + "group_feature", + "propensity_score_feature", + ]: + value = cls._get_value(feature, values) + if value is not None and value not in values["logged_data"]: + raise AttributeError(f"{feature} missing from logged data.") + return values + + @model_validator(mode="before") + @classmethod + def check_contextual_features(cls, values): + value = cls._get_value("contextual_features", values) + if value is not None and not set(value).issubset(values["logged_data"].columns): + raise AttributeError("contextual_features missing from logged data.") + return values + + @model_validator(mode="before") + @classmethod + def check_model_optimization(cls, values): + n_trials_value = cls._get_value("n_trials", values) + fast_fit_value = cls._get_value("fast_fit", values) + + if (n_trials_value is None or fast_fit_value is None) and values["propensity_score_model_type"] not in [ + "logreg", + "gbm", + "rf", + "mlp", + ]: + raise ValueError("The requested propensity score model requires n_trials and fast_fit to be well defined") + if (n_trials_value is None or fast_fit_value is None) and any( + [isinstance(estimator, DirectMethod) for estimator in values["ope_metrics"]] + ): + raise ValueError( + "The requested offline policy evaluation metrics model require estimation of the expected reward. " + "Thus, n_trials and fast_fit to be well defined" + ) + return values + + def model_post_init(self, __context: Any) -> None: + # Extract batches for train and test set + self._extract_batches() + + # Estimate propensity score in the train and test set + self._estimate_propensity_score() + + # Estimate expected reward estimator and predict in the test set, used by DM- and DR-based metrics + if any([isinstance(estimator, (DirectMethod, DoublyRobust)) for estimator in self.ope_metrics]): + self._estimate_expected_reward() + + def _extract_batches(self): + """ + Create list of dictionaries, one for training set and on for testing set as required by obp package + + """ + logged_data = self.logged_data.sort_values(by=self.batch_feature) + unique_batch = logged_data[self.batch_feature].unique() + split_batch = unique_batch[int(floor(len(unique_batch)) * self.split_prop)] + + # add list of actions in dict in order to avoid test set with n_actions + # lower than nb of total actions + unique_actions = sorted(self.logged_data[self.action_feature].unique().tolist()) + action_label_encoder = LabelEncoder() + for batch_idx in tqdm(range(2)): + # extract samples batch + if batch_idx == 0: + extracted_batch = self.logged_data[self.logged_data[self.batch_feature] <= split_batch] + else: + extracted_batch = self.logged_data[self.logged_data[self.batch_feature] > split_batch] + extracted_batch = extracted_batch.reset_index(drop=True) + + # dict data set information for OPE + action_ids = extracted_batch[self.action_feature].values + if batch_idx == 0: + self._action_one_hot_encoder.fit(np.array(unique_actions).reshape((-1, 1))) + reward = extracted_batch[self.reward_feature].values + + # if cost control bandit + if self.cost_feature is not None: + cost = extracted_batch[self.cost_feature].values + else: + cost = None + + # if contextual information required + if self.contextual_features is not None: + if self.scaler is not None: + if type(self.scaler) is dict: + if batch_idx == 0: + x_scale = np.array( + pd.concat( + [ + self.scaler[feature].fit_transform(np.array(extracted_batch[[feature]])) + for feature in self.contextual_features + ], + axis=1, + ) + ) + else: + x_scale = np.array( + pd.concat( + [ + self.scaler[feature].transform(np.array(extracted_batch[[feature]])) + for feature in self.contextual_features + ], + axis=1, + ) + ) + else: + if batch_idx == 0: + x_scale = self.scaler.fit_transform(np.array(extracted_batch[self.contextual_features])) + else: + x_scale = self.scaler.transform(np.array(extracted_batch[self.contextual_features])) + else: + x_scale = np.array(extracted_batch[self.contextual_features]) + else: + x_scale = np.zeros((len(action_ids), 0)) # zero-columns 2d array to allow concatenation later + + # extract data for policy information + policy_information_cols = [ + self.batch_feature, + self.action_feature, + ] + self.reward_feature + if self.group_feature: + policy_information_cols.append(self.group_feature) + + policy_information = extracted_batch[policy_information_cols] + + # reward probability distribution as used during simulation process if available + ground_truth = extracted_batch[self.true_reward_feature] if self.true_reward_feature else None + + # propensity_score may be available from simulation: propensity_score is added to the dict + propensity_score = ( + extracted_batch[self.propensity_score_feature].values if self.propensity_score_feature else None + ) + if batch_idx == 0: + action_label_encoder.fit(unique_actions) + actions = action_label_encoder.transform(action_ids) + + # Store information in a dictionary as required by obp package + data_batch = { + "n_rounds": len(action_ids), # number of samples + "n_action": len(unique_actions), # number of actions + "unique_actions": unique_actions, # list of actions in the whole data set + "action_ids": action_ids, # action identifiers + "action": actions, # encoded action identifiers for obp bandit_feedback + "position": None, + "reward": reward, # samples' reward + "propensity_score": propensity_score, # propensity score, pi_b(a|x), vector + "context": x_scale, # the matrix of features i.e. context + "data": policy_information, # data array with informative features + "ground_truth": ground_truth, # true reward probability for each action and samples, list of list + "cost": cost, # samples' action cost for bandit with cost control + } + if batch_idx == 0: + self._train_data = data_batch + else: + self._test_data = data_batch + + def _estimate_propensity_score_empirical( + self, batch: Dict[str, Any], groupby_cols: List[str], inner_groupby_cols: Optional[List[str]] = None + ) -> np.ndarray: + """ + Empirical propensity score computation based on batches average + + Parameters + ---------- + batch: Dict[str, Any] + Dataset dictionary + groupby_cols : List[str] + Columns to group by + inner_groupby_cols : Optional[List[str]] + Columns to group by after the first groupby + + Returns + ------- + propensity_score : np.ndarray + computed propensity score for each of the objectives + """ + inner_groupby_cols = [] if inner_groupby_cols is None else inner_groupby_cols + overall_groupby_cols = groupby_cols + inner_groupby_cols + # number of recommended actions per group and batch + grouped_data = batch["data"].groupby(overall_groupby_cols)[self.reward_feature[0]].count() + + # proportion of recommended actions per group + if inner_groupby_cols: + empirical_distribution = pd.DataFrame( + grouped_data / grouped_data.groupby(inner_groupby_cols).sum() + ).reset_index() + else: + empirical_distribution = pd.DataFrame(grouped_data / grouped_data.sum()).reset_index() + + empirical_distribution.columns = overall_groupby_cols + ["propensity_score"] + + # deal with missing segment after group by + if len(overall_groupby_cols) > 1: + all_combinations = pd.DataFrame( + list(product(*[empirical_distribution[col].unique() for col in overall_groupby_cols])), + columns=overall_groupby_cols, + ) + + # Merge with the original dataframe, filling missing values in 'c' with 0 + empirical_distribution = pd.merge( + all_combinations, empirical_distribution, on=groupby_cols + inner_groupby_cols, how="left" + ).fillna(0) + + # extract propensity_score in the test set for user according to group and action recommended + matching_df = pd.DataFrame({k: batch["data"][k] for k in overall_groupby_cols}) + merged_df = pd.merge( + matching_df, + empirical_distribution[overall_groupby_cols + ["propensity_score"]], + how="left", # left join to ensure we get all rows from the batch + on=overall_groupby_cols, + ) + propensity_score = merged_df["propensity_score"].values + + return propensity_score + + def _empirical_averaged_propensity_score(self, batch: Dict[str, Any]) -> np.ndarray: + """ + Empirical propensity score computation based on batches average + + Parameters + ---------- + batch : Dict[str, Any] + obp dict batch. + + Returns + ------ + : np.ndarray + updated obp dict with propensity_score computed + """ + + return self._estimate_propensity_score_empirical( + batch=batch, groupby_cols=[self.action_feature], inner_groupby_cols=[self.batch_feature] + ) + + def _empirical_propensity_score(self, batch: Dict[str, Any]) -> np.ndarray: + """ + Propensity score empirical computation based on data set average + + Parameters + ---------- + batch : Dict[str, Any] + obp dict batch. + + Return + ------ + : np.ndarray + updated obp dict with propensity_score computed + """ + + return self._estimate_propensity_score_empirical(batch=batch, groupby_cols=[self.action_feature]) + + def _estimate_propensity_score(self): + """ + Compute/approximate propensity score based on different methods in the train and test set. + Different approaches may be evaluated when logging policy is unknown. + """ + if not self.contextual_features: + # if no contextual features, propensity score is directly defined by the action taken, + # thus uniformly set to 1 + train_propensity_score = np.ones(self._train_data["n_rounds"]) + test_propensity_score = np.ones(self._test_data["n_rounds"]) + logger.warning( + f"No contextual features available, " + f"overriding the requested propensity_score_model_type={self.propensity_score_model_type} " + f"using uniform propensity score" + ) + else: + if self.propensity_score_model_type == "batch_empirical": + if self.verbose: + logger.info("Data batch-empirical estimation of propensity score.") + + # Empirical approach: propensity score pi_b computed as action means per samples batch + train_propensity_score = self._empirical_propensity_score(self._train_data) + test_propensity_score = self._empirical_propensity_score(self._test_data) + + elif self.propensity_score_model_type == "empirical": + if self.verbose: + logger.info("Data empirical estimation of propensity score.") + + # Empirical approach: propensity score pi_b computed as action means per samples batch + train_propensity_score = self._empirical_averaged_propensity_score(self._train_data) + test_propensity_score = self._empirical_averaged_propensity_score(self._test_data) + + elif self.propensity_score_model_type == "propensity_score": + if self.verbose: + logger.info("Data given value of propensity score.") + + train_propensity_score = self._train_data["propensity_score"] + test_propensity_score = self._test_data["propensity_score"] + + else: # self.propensity_score_model_type in ["gbm", "rf", "logreg", "mlp"] + if self.verbose: + logger.info( + f"Data prediction of propensity score based on {self.propensity_score_model_type} model." + ) + propensity_score_estimator = _FunctionEstimator( + estimator_type=self.propensity_score_model_type, + fast_fit=self.fast_fit, + action_one_hot_encoder=self._action_one_hot_encoder, + n_trials=self.n_trials, + verbose=self.verbose, + study_name=f"{self.propensity_score_model_type}_propensity_score", + multi_action_prediction=False, + ) + propensity_score_estimator.fit(X=self._train_data, y=self._train_data["action"]) + train_propensity_score = np.clip( + propensity_score_estimator.predict(self._train_data), self._propensity_score_epsilon, 1 + ) + test_propensity_score = np.clip( + propensity_score_estimator.predict(self._test_data), self._propensity_score_epsilon, 1 + ) + self._train_data["pscore"] = train_propensity_score + self._test_data["pscore"] = test_propensity_score + + def _estimate_expected_reward(self): + """ + Compute expected reward (q(x, a) also called r(x, a)) or each round and action. + The resulting array is required to compute DirectMethod and DoublyRobust. + """ + if self.verbose: + logger.info(f"Data prediction of expected reward based on {self.expected_reward_model_type} model.") + estimated_expected_reward = {} + for reward_feature, reward in zip(self.reward_feature, self._train_data["reward"].T): + expected_reward_model = _FunctionEstimator( + estimator_type=self.expected_reward_model_type, + fast_fit=self.fast_fit, + action_one_hot_encoder=self._action_one_hot_encoder, + n_trials=self.n_trials, + verbose=self.verbose, + study_name=f"{self.expected_reward_model_type}_expected_reward", + multi_action_prediction=True, + ) + + expected_reward_model.fit(X=self._train_data, y=reward.T) + + # predict in test set + estimated_expected_reward[reward_feature] = expected_reward_model.predict(self._test_data) + self._estimated_expected_reward = estimated_expected_reward + + def _estimate_importance_weights(self, mab: BaseMab): + """ + Compute importance weights induced by the behavior and evaluation policies. + The resulting array is required to compute BalancedInverseProbabilityWeighting. + """ + if self.verbose: + logger.info(f"Data prediction of importance weights based on {self.importance_weights_model_type} model.") + + importance_weights_model = _FunctionEstimator( + estimator_type=self.importance_weights_model_type, + fast_fit=self.fast_fit, + action_one_hot_encoder=self._action_one_hot_encoder, + n_trials=self.n_trials, + verbose=self.verbose, + study_name=f"{self.importance_weights_model_type}_importance_weights", + multi_action_prediction=False, + ) + train_data = deepcopy(self._train_data) + mab_data = self._train_data["context"] if self.contextual_features else self._train_data["n_rounds"] + selected_actions = _mab_predict(mab, mab_data) + train_data["action_ids"] = np.concatenate((train_data["action_ids"], selected_actions), axis=0) + train_data["context"] = np.concatenate((train_data["context"], train_data["context"]), axis=0) + y = np.concatenate((np.zeros(len(selected_actions)), np.ones(len(selected_actions))), axis=0) + importance_weights_model.fit(X=train_data, y=y) + + # predict in test set + self._estimated_importance_weights = importance_weights_model.predict(self._test_data) + + def _estimate_policy( + self, + mab: BaseMab, + n_mc_experiments: PositiveInt = 1000, + n_cores: Optional[NonNegativeInt] = None, + ) -> np.ndarray: + """ + Estimate policy via Monte Carlo (MC) sampling based on sampling distribution of each action a in the test set. + + Reference: Estimation Considerations in Contextual Bandit + https://arxiv.org/pdf/1711.07077.pdf + Reference: Debiased Off-Policy Evaluation for Recommendation Systems + https://arxiv.org/pdf/2002.08536.pdf + Reference: CAB: Continuous Adaptive Blending for Policy Evaluation and Learning + https://arxiv.org/pdf/1811.02672.pdf + + Parameters + ---------- + mab : BaseMab + Multi-armed bandit to be evaluated + n_mc_experiments: PositiveInt + Number of MC sampling rounds. Default: 1000 + n_cores: Optional[NonNegativeInt], all available cores if not specified + Number of cores used for multiprocessing + + Returns + ------- + estimated_policy: np.ndarray (nb samples, nb actions, 1) + action probabilities for each action and samples + """ + if self.verbose: + logger.info("Data prediction of expected policy based on Monte Carlo experiments.") + n_cores = n_cores or cpu_count() + + # using MC, create a () best actions matrix + mc_actions = [] + mab_data = self._test_data["context"] if self.contextual_features else self._test_data["n_rounds"] + predict_func = partial(_mab_predict, mab, mab_data) + with Pool(processes=n_cores) as pool: + # predict best action for a new prior parameters draw + # using argmax(p(r|a, x)) with a in the list of actions + for mc_action in tqdm(pool.imap_unordered(predict_func, range(n_mc_experiments))): + mc_actions.append(mc_action) + + # finalize the dataframe shape to #samples X #mc experiments + mc_actions = pd.DataFrame(mc_actions).T + + # for each sample / each action, count the occurrence frequency during MC iteration + estimated_policy = np.zeros((self._test_data["n_rounds"], len(self._test_data["unique_actions"]))) + mc_action_counts = mc_actions.apply(pd.Series.value_counts, axis=1).fillna(0) + + for u in tqdm(range(self._test_data["n_rounds"])): + estimated_policy[u, :] = ( + mc_action_counts.iloc[u, :].reindex(self._test_data["unique_actions"], fill_value=0).values + / mc_actions.shape[1] + ) + estimated_policy = estimated_policy[..., None] + return estimated_policy + + def evaluate( + self, + mab: BaseMab, + n_mc_experiments: int = 1000, + save_path: Optional[str] = None, + visualize: bool = True, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Execute the OPE process with multiple estimators simultaneously. + + Parameters + ---------- + mab : BaseMab + Multi-armed bandit model to be evaluated + n_mc_experiments : int + Number of Monte Carlo experiments for policy estimation + save_path : Optional[str] + Path to save the results. Nothing is saved if not specified. + visualize : bool + Whether to visualize the results of the OPE process + + Returns + ------- + estimated_policy_value_df : pd.DataFrame + Estimated policy values + estimated_interval_df : pd.DataFrame + Estimated confidence intervals + """ + if visualize and not save_path and not in_jupyter_notebook(): + raise ValueError("save_path is required for visualization when not running in a Jupyter notebook") + + # Estimate policy in the test set + estimated_policy = self._estimate_policy(mab=mab, n_mc_experiments=n_mc_experiments) + + if any([isinstance(estimator, BalancedInverseProbabilityWeighting) for estimator in self.ope_metrics]): + self._estimate_importance_weights(mab) + + # Instantiate class to conduct OPE by multiple estimators simultaneously + single_objective_test_data = self._test_data.copy() + multi_objective_estimated_policy_value_df = pd.DataFrame() + multi_objective_estimated_interval_df = pd.DataFrame() + for reward_feature in self.reward_feature: + if self.verbose: + logger.info(f"Offline Policy Evaluation for {reward_feature}.") + single_objective_test_data["reward"] = self._test_data["reward"][ + :, self.reward_feature.index(reward_feature) + ] + estimated_expected_reward = ( + self._estimated_expected_reward[reward_feature] if self._estimated_expected_reward else None + ) + ope = OffPolicyEvaluation(bandit_feedback=single_objective_test_data, ope_estimators=self.ope_metrics) + # Summarize policy values and their confidence intervals estimated by OPE estimators + estimated_policy_value_df, estimated_interval_df = ope.summarize_off_policy_estimates( + action_dist=estimated_policy, + estimated_rewards_by_reg_model=estimated_expected_reward, + estimated_importance_weights=self._estimated_importance_weights, + ) + multi_objective_estimated_policy_value_df = pd.concat( + [multi_objective_estimated_policy_value_df, estimated_policy_value_df.assign(objective=reward_feature)], + axis=0, + ) + multi_objective_estimated_interval_df = pd.concat( + [multi_objective_estimated_interval_df, estimated_interval_df.assign(objective=reward_feature)], axis=0 + ) + if save_path: + multi_objective_estimated_policy_value_df.to_csv(os.path.join(save_path, "estimated_policy_value.csv")) + multi_objective_estimated_interval_df.to_csv(os.path.join(save_path, "estimated_interval.csv")) + + if visualize: + self._visualize_results(save_path, multi_objective_estimated_interval_df) + + return multi_objective_estimated_policy_value_df, multi_objective_estimated_interval_df + + def update_and_evaluate( + self, + mab: BaseMab, + n_mc_experiments: int = 1000, + save_path: Optional[str] = None, + visualize: bool = True, + with_test: bool = False, + ) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Execute update of the multi-armed bandit based on the logged data, + followed by the OPE process with multiple estimators simultaneously. + + Parameters + ---------- + mab : BaseMab + Multi-armed bandit model to be updated and evaluated + n_mc_experiments : int + Number of Monte Carlo experiments for policy estimation + save_path : Optional[str] + Path to save the results. Nothing is saved if not specified. + visualize : bool + Whether to visualize the results of the OPE process + with_test : bool + Whether to update the bandit model with the test data + + Returns + ------- + estimated_policy_value_df : pd.DataFrame + Estimated policy values + estimated_interval_df : pd.DataFrame + Estimated confidence intervals + """ + self._update_mab(mab, self._train_data) + if with_test: + self._update_mab(mab, self._test_data) + return self.evaluate(mab, n_mc_experiments, save_path, visualize) + + def _update_mab(self, mab: BaseMab, data: Dict[str, Any]): + """ + Update the multi-armed bandit model based on the logged data. + + Parameters + ---------- + mab : BaseMab + Multi-armed bandit model to be updated. + data : Dict[str, Any] + Data used to update the bandit model. + """ + if self.verbose: + logger.info(f"Offline policy update for {type(mab)}.") + kwargs = {"context": data["context"]} if self.contextual_features else {} + mab.update(actions=data["action_ids"].tolist(), rewards=np.squeeze(data["reward"]).tolist(), **kwargs) + + def _visualize_results(self, save_path: Optional[str], multi_objective_estimated_interval_df: pd.DataFrame): + """ + Visualize the results of the OPE process. + + Parameters + ---------- + save_path : Optional[str] + Path to save the visualization results. Required if not running in a Jupyter notebook. + multi_objective_estimated_interval_df : pd.DataFrame + Estimated confidence intervals + """ + + tabs = [] + grouped_df = multi_objective_estimated_interval_df.groupby("objective") + tools = "crosshair, pan, wheel_zoom, box_zoom, reset, hover, save" + + tooltips = [ + ("Estimator", "@estimators"), + ("Estimated policy value", "@values"), + ("Lower CI", "@lower"), + ("Upper CI", "@upper"), + ] + for group_name, estimated_interval_df in grouped_df: + source = ColumnDataSource( + data=dict( + estimators=estimated_interval_df.index.tolist(), + values=estimated_interval_df["mean"], + lower=estimated_interval_df["95.0% CI (lower)"], + upper=estimated_interval_df["95.0% CI (upper)"], + ) + ) + fig = figure( + title=f"Policy value estimates for {group_name} objective", + x_axis_label="Estimator", + y_axis_label="Estimated policy value (\u00b1 95% CI)", + sizing_mode="inherit", + x_range=source.data["estimators"], + tools=tools, + tooltips=tooltips, + ) + fig.vbar(x="estimators", top="values", width=0.9, source=source) + + # Add error bars for confidence intervals + fig.segment( + x0="estimators", y0="lower", x1="estimators", y1="upper", source=source, line_width=2, color="black" + ) # error bar line + fig.vbar( + x="estimators", width=0.1, bottom="lower", top="upper", source=source, color="black" + ) # error bar cap + + fig.xgrid.grid_line_color = None + + tabs.append(TabPanel(child=fig, title=f"{group_name}")) + + output_path = os.path.join(save_path, "multi_objective_estimated_policy.html") if save_path else None + visualize_via_bokeh(tabs=tabs, output_path=output_path) + + +def _mab_predict(mab: BaseMab, mab_data: Union[np.ndarray, PositiveInt], mc_experiment: int = 0) -> List[ActionId]: + """ + bandit action probabilities prediction in test set + + Parameters + ---------- + mab : BaseMab + Multi-armed bandit model + mab_data : Union[np.ndarray, PositiveInt] + test data used to update the bandit model; context or number of samples. + mc_experiment : int + placeholder for multi processing + + Returns + ------- + actions: List[ActionId] of shape (n_samples,) + The actions selected by the multi-armed bandit model. + """ + mab_output = mab.predict(context=mab_data) if type(mab_data) is np.ndarray else mab.predict(n_samples=mab_data) + actions = mab_output[0] + return actions diff --git a/pybandits/utils.py b/pybandits/utils.py index 45a69a6..2bbe384 100644 --- a/pybandits/utils.py +++ b/pybandits/utils.py @@ -1,6 +1,9 @@ import json -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union +from bokeh.io import curdoc, output_file, output_notebook, save, show +from bokeh.models import InlineStyleSheet, TabPanel, Tabs +from IPython import get_ipython from pydantic import validate_call JSONSerializable = Union[str, int, float, bool, None, List["JSONSerializable"], Dict[str, "JSONSerializable"]] @@ -19,3 +22,62 @@ def to_serializable_dict(d: Dict[str, Any]) -> Dict[str, JSONSerializable]: """ return json.loads(json.dumps(d, default=dict)) + + +def in_jupyter_notebook() -> bool: + """ + Check if the code is running in a Jupyter notebook. + + Reference: https://stackoverflow.com/a/39662359 + + Returns + ------- + bool + True if the code is running in a Jupyter notebook, False otherwise. + """ + + try: + shell = get_ipython().__class__.__name__ + + if shell == "ZMQInteractiveShell": + return True # Jupyter notebook or qtconsole + + elif shell == "TerminalInteractiveShell": + return False # Terminal running IPython + + else: + return False # Other type (likely shouldn't happen) + + except NameError: + return False # Probably standard Python interpreter + + +def visualize_via_bokeh(output_path: Optional[str], tabs: List[TabPanel]): + """ + Visualize output to either a Jupyter notebook or an HTML file. + + Parameters + ---------- + output_path : Optional[str] + Path to the output file. Required if not running in a Jupyter notebook. + """ + + if in_jupyter_notebook(): + output_notebook() + else: + if output_path is None: + raise ValueError("output_path is required when not running in a Jupyter notebook.") + output_file(output_path) + + # Add a Div model to the Bokeh layout for flexible tabs + css = """ + :host(.bk-Tabs) .bk-header { + flex-wrap: wrap !important; + } + """ + stylesheet = InlineStyleSheet(css=css) + curdoc().title = "Visual report" + if in_jupyter_notebook(): + show(Tabs(tabs=tabs, stylesheets=[stylesheet])) + else: + save(Tabs(tabs=tabs, sizing_mode="stretch_both", stylesheets=[stylesheet])) diff --git a/pyproject.toml b/pyproject.toml index 565164e..4a866db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pybandits" -version = "0.5.1" +version = "0.6.0" description = "Python Multi-Armed Bandit Library" authors = [ "Dario d'Andrea ", @@ -13,19 +13,23 @@ license = "MIT License" readme = "README.md" [tool.poetry.dependencies] -python = ">=3.8.1,<3.12" +python = ">=3.8.1,<3.11" loguru = "^0.6.0" -numpy = "^1.24.2" +numpy = "^1.23.5" pydantic = "^2.8.2" -scipy = "^1.10.1" +scipy = "^1.9.3" pymc = "^5.3.0" -scikit-learn = "^1.2.2" +scikit-learn = "^1.1.3" +obp = "^0.5.7" +optuna = "^3.6.0" +pyarrow = "^6.0.0" +bokeh = "^3.1.0" [tool.poetry.group.dev.dependencies] hypothesis = "^6.68.2" pytest = "^7.2.2" tox = "^4.4.7" -pandas = "^1.5.3" +pandas = "^1.5.2" pre-commit = "^3.1.1" nbdev = "^2.3.12" rich = "^13.3.2" diff --git a/tests/test_offline_policy_evaluator.py b/tests/test_offline_policy_evaluator.py new file mode 100644 index 0000000..3ac78ef --- /dev/null +++ b/tests/test_offline_policy_evaluator.py @@ -0,0 +1,310 @@ +from tempfile import TemporaryDirectory +from typing import Dict, List, Optional, Union, get_args, get_type_hints + +import numpy as np +import pandas as pd +import pytest +from hypothesis import given, settings +from hypothesis import strategies as st +from matplotlib.pyplot import close +from obp.ope import ( + BaseOffPolicyEstimator, +) +from pydantic import PositiveInt +from pytest_mock import MockerFixture +from sklearn.base import TransformerMixin +from sklearn.preprocessing import MinMaxScaler + +from pybandits.cmab import create_cmab_bernoulli_cc_cold_start, create_cmab_bernoulli_cold_start +from pybandits.offline_policy_evaluator import OfflinePolicyEvaluator +from pybandits.smab import ( + create_smab_bernoulli_cc_cold_start, + create_smab_bernoulli_cold_start, + create_smab_bernoulli_mo_cc_cold_start, + create_smab_bernoulli_mo_cold_start, +) + + +@pytest.fixture(scope="module") +def logged_data(n_samples=100, n_actions=3, n_batches=3, n_rewards=2, n_groups=2, n_features=10): + unique_actions = [f"a{i}" for i in range(n_actions)] + action_ids = np.random.choice(unique_actions, n_samples * n_batches) + batches = [i for i in range(n_batches) for _ in range(n_samples)] + rewards = [np.random.randint(2, size=(n_samples * n_batches)) for _ in range(n_rewards)] + action_true_rewards = {(a, r): np.random.rand() for a in unique_actions for r in range(n_rewards)} + true_rewards = [ + np.array([action_true_rewards[(a, r)] for a in action_ids]).reshape(n_samples * n_batches) + for r in range(n_rewards) + ] + groups = np.random.randint(n_groups, size=n_samples * n_batches) + action_costs = {action: np.random.rand() for action in unique_actions} + costs = np.array([action_costs[a] for a in action_ids]) + context = np.random.rand(n_samples * n_batches, n_features) + action_propensity_score = {action: np.random.rand() for action in unique_actions} + propensity_score = np.array([action_propensity_score[a] for a in action_ids]) + return pd.DataFrame( + { + "batch": batches, + "action_id": action_ids, + "cost": costs, + "group": groups, + **{f"reward_{r}": rewards[r] for r in range(n_rewards)}, + **{f"true_reward_{r}": true_rewards[r] for r in range(n_rewards)}, + **{f"context_{i}": context[:, i] for i in range(n_features)}, + "propensity_score": propensity_score, + } + ) + + +# validate failure for empty logged_data +def test_empty_logged_data( + split_prop=0.5, + n_trials=10, + verbose=False, + batch_feature="batch", + action_feature="action_id", + reward_feature="reward", + propensity_score_model_type="empirical", + expected_reward_model_type="logreg", + importance_weights_model_type="logreg", +): + with pytest.raises(AttributeError): + OfflinePolicyEvaluator( + logged_data=pd.DataFrame(), + split_prop=split_prop, + propensity_score_model_type=propensity_score_model_type, + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + n_trials=n_trials, + ope_metrics=None, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + verbose=verbose, + ) + + +@pytest.mark.usefixtures("logged_data") +@given( + split_prop=st.sampled_from([0.0, 1.0]), + n_trials=st.just(10), + ope_metrics=st.just(None), + verbose=st.just(False), + batch_feature=st.just("batch"), + action_feature=st.just("action_id"), + reward_feature=st.just("reward"), + propensity_score_model_type=st.just("empirical"), + expected_reward_model_type=st.just("logreg"), + importance_weights_model_type=st.just("logreg"), +) +# validate failure for extreme split_prop values +def test_initialization_extreme_split_prop( + logged_data: MockerFixture, + split_prop: float, + n_trials: PositiveInt, + ope_metrics: Optional[List[BaseOffPolicyEstimator]], + verbose: bool, + batch_feature: str, + action_feature: str, + reward_feature: str, + propensity_score_model_type: str, + expected_reward_model_type: str, + importance_weights_model_type: str, +): + with pytest.raises(ValueError): + OfflinePolicyEvaluator( + logged_data=logged_data, + split_prop=split_prop, + propensity_score_model_type=propensity_score_model_type, + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + n_trials=n_trials, + ope_metrics=ope_metrics, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + true_reward_feature=[reward_feature, reward_feature], + verbose=verbose, + ) + + +# validate failure for invalid initialization parameters +def test_initialization_mismatches( + logged_data: MockerFixture, + split_prop=0.5, + n_trials=10, + ope_metrics=None, + verbose=False, + batch_feature="batch", + action_feature="action_id", + reward_feature="reward_0", + propensity_score_model_type="empirical", + expected_reward_model_type="logreg", + importance_weights_model_type="logreg", +): + # more true_reward_features than rewards + with pytest.raises(ValueError): + OfflinePolicyEvaluator( + logged_data=logged_data, + split_prop=split_prop, + propensity_score_model_type=propensity_score_model_type, + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + n_trials=n_trials, + ope_metrics=ope_metrics, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + true_reward_feature=[reward_feature, reward_feature], + verbose=verbose, + ) + # missing propensity_score_feature + with pytest.raises(ValueError): + OfflinePolicyEvaluator( + logged_data=logged_data, + split_prop=split_prop, + propensity_score_model_type="propensity_score", + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + n_trials=n_trials, + ope_metrics=ope_metrics, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + visualize=False, + ) + # missing context + with pytest.raises(AttributeError): + OfflinePolicyEvaluator( + logged_data=logged_data, + split_prop=split_prop, + propensity_score_model_type=propensity_score_model_type, + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + n_trials=n_trials, + ope_metrics=ope_metrics, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + verbose=False, + contextual_features=["non_existent"], + ) + + +@pytest.mark.usefixtures("logged_data") +@settings(deadline=None) +@given( + split_prop=st.just(0.5), + n_trials=st.just(10), + fast_fit=st.booleans(), + scaler=st.sampled_from([None, MinMaxScaler()]), + verbose=st.booleans(), + visualize=st.booleans(), + propensity_score_model_type=st.sampled_from( + get_args(get_type_hints(OfflinePolicyEvaluator)["propensity_score_model_type"]) + ), + expected_reward_model_type=st.sampled_from( + get_args(get_type_hints(OfflinePolicyEvaluator)["expected_reward_model_type"]) + ), + importance_weights_model_type=st.sampled_from( + get_args(get_type_hints(OfflinePolicyEvaluator)["importance_weights_model_type"]) + ), + batch_feature=st.just("batch"), + action_feature=st.just("action_id"), + reward_feature=st.sampled_from(["reward_0", ["reward_0", "reward_1"]]), + context=st.booleans(), + group_feature=st.sampled_from(["group", None]), + cost_feature=st.sampled_from(["cost", None]), + propensity_score_feature=st.just("propensity_score"), + n_mc_experiments=st.just(4), + update=st.booleans(), +) +# test various OfflinePolicyEvaluator configurations to validate that everything works +def test_running_configuration( + logged_data: MockerFixture, + split_prop: float, + n_trials: PositiveInt, + fast_fit: bool, + scaler: Optional[Union[TransformerMixin, Dict[str, TransformerMixin]]], + verbose: bool, + visualize: bool, + propensity_score_model_type: str, + expected_reward_model_type: str, + importance_weights_model_type: str, + batch_feature: str, + action_feature: str, + reward_feature: Union[str, List[int]], + context: bool, + group_feature: Optional[str], + cost_feature: Optional[str], + propensity_score_feature: Optional[str], + n_mc_experiments: int, + update: bool, +): + if context and type(reward_feature) is List: + pass # CmabMO and CmabMOCC are not supported yet + true_reward_feature = ( + f"true_{reward_feature}" if isinstance(reward_feature, str) else [f"true_{r}" for r in reward_feature] + ) + contextual_features = [col for col in logged_data.columns if col.startswith("context")] if context else None + unique_actions = logged_data["action_id"].unique() + if cost_feature: + action_ids_cost = { + action_id: logged_data["cost"][logged_data["action_id"] == action_id].iloc[0] + for action_id in unique_actions + } + if context: + if cost_feature: + if type(reward_feature) is list: + return # CmabMOCC is not supported yet + else: + mab = create_cmab_bernoulli_cc_cold_start( + action_ids_cost=action_ids_cost, n_features=len(contextual_features) + ) + else: + if type(reward_feature) is list: + return # CmabMO is not supported yet + else: + mab = create_cmab_bernoulli_cold_start( + action_ids=set(unique_actions), n_features=len(contextual_features) + ) + else: + if cost_feature: + if type(reward_feature) is list: + mab = create_smab_bernoulli_mo_cc_cold_start( + action_ids_cost=action_ids_cost, n_objectives=len(reward_feature) + ) + else: + mab = create_smab_bernoulli_cc_cold_start(action_ids_cost=action_ids_cost) + else: + if type(reward_feature) is list: + mab = create_smab_bernoulli_mo_cold_start( + action_ids=set(unique_actions), n_objectives=len(reward_feature) + ) + else: + mab = create_smab_bernoulli_cold_start(action_ids=set(unique_actions)) + evaluator = OfflinePolicyEvaluator( + logged_data=logged_data, + split_prop=split_prop, + n_trials=n_trials, + fast_fit=fast_fit, + scaler=scaler, + ope_metrics=None, + verbose=verbose, + propensity_score_model_type=propensity_score_model_type, + expected_reward_model_type=expected_reward_model_type, + importance_weights_model_type=importance_weights_model_type, + batch_feature=batch_feature, + action_feature=action_feature, + reward_feature=reward_feature, + true_reward_feature=true_reward_feature, + contextual_features=contextual_features, + group_feature=group_feature, + cost_feature=cost_feature, + propensity_score_feature=propensity_score_feature, + ) + execution_func = evaluator.update_and_evaluate if update else evaluator.evaluate + with TemporaryDirectory() as tmp_dir: + execution_func(mab=mab, visualize=visualize, n_mc_experiments=n_mc_experiments, save_path=tmp_dir) + if visualize: + close("all") # close all figures to avoid memory leak