diff --git a/conf/analysis/trip_model.conf.json.sample b/conf/analysis/trip_model.conf.json.sample index 845e67a6a..b18da6a50 100644 --- a/conf/analysis/trip_model.conf.json.sample +++ b/conf/analysis/trip_model.conf.json.sample @@ -8,6 +8,56 @@ "similarity_threshold_meters": 500, "apply_cutoff": false, "incremental_evaluation": false + }, + "gbdt": { + "feature_list": { + "data.user_input.mode_confirm": [ + "drive", + "bike", + "walk", + "transit" + ], + "data.user_input.purpose_confirm": [ + "work", + "home", + "school" + ] + }, + "dependent_var": { + "name": "data.user_input.replaced_mode", + "classes": [ + "drive", + "walk", + "bike", + "transit" + ] + }, + "incremental_evaluation": false + }, + "svm": { + "feature_list": { + "data.user_input.mode_confirm": [ + "drive", + "bike", + "walk", + "transit" + ], + "data.user_input.purpose_confirm": [ + "work", + "home", + "school" + ] + }, + "dependent_var": { + "name": "data.user_input.replaced_mode", + "classes": [ + "drive", + "walk", + "bike", + "transit" + ] + }, + "incremental_evaluation": true } } } \ No newline at end of file diff --git a/emission/analysis/classification/inference/labels/inferrers.py b/emission/analysis/classification/inference/labels/inferrers.py index c6b939671..16d5cf3c3 100644 --- a/emission/analysis/classification/inference/labels/inferrers.py +++ b/emission/analysis/classification/inference/labels/inferrers.py @@ -156,3 +156,19 @@ def predict_cluster_confidence_discounting(trip, max_confidence=None, first_conf labels = copy.deepcopy(labels) for l in labels: l["p"] *= confidence_coeff return labels + +def predict_gradient_boosted_decision_tree(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): + # load application config + model_type = eamtc.get_model_type() + model_storage = eamtc.get_model_storage() + labels, n = eamur.predict_labels_with_gbdt(trip, model_type, model_storage) + if n <= 0: # No model data or trip didn't match a cluster + logging.debug(f"In predict_gradient_boosted_decision_tree: n={n}; returning as-is") + return labels + + # confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) + # logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") + + labels = copy.deepcopy(labels) + for l in labels: l["p"] *= confidence_coeff + return labels \ No newline at end of file diff --git a/emission/analysis/classification/inference/labels/pipeline_replace_mode.py b/emission/analysis/classification/inference/labels/pipeline_replace_mode.py new file mode 100644 index 000000000..3c8656f00 --- /dev/null +++ b/emission/analysis/classification/inference/labels/pipeline_replace_mode.py @@ -0,0 +1,102 @@ +# Standard imports +import logging +import random +import copy + +# Our imports +import emission.storage.pipeline_queries as epq +import emission.storage.timeseries.abstract_timeseries as esta +import emission.storage.decorations.analysis_timeseries_queries as esda +import emission.core.wrapper.labelprediction as ecwl +import emission.core.wrapper.entry as ecwe +import emission.analysis.classification.inference.labels.inferrers as eacili +import emission.analysis.classification.inference.labels.ensembles as eacile + + +# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which +# runs on the results of other algorithms), primary_algorithms specifies a corresponding +# function in eacili to run. This makes it easy to plug in additional algorithms later. +primary_algorithms = { + ecwl.AlgorithmTypes.GRADIENT_BOOSTED_DECISION_TREE: eacili.predict_gradient_boosted_decision_tree +} + +# ensemble specifies which algorithm in eacile to run. +# This makes it easy to test various ways of combining various algorithms. +ensemble = eacile.ensemble_first_prediction + + +# Does all the work necessary for a given user +def infer_labels(user_id): + time_query = epq.get_time_range_for_label_inference(user_id) + try: + lip = LabelInferencePipeline() + lip.user_id = user_id + lip.run_prediction_pipeline(user_id, time_query) + if lip.last_trip_done is None: + logging.debug("After run, last_trip_done == None, must be early return") + epq.mark_label_inference_done(user_id, lip.last_trip_done) + except: + logging.exception("Error while inferring labels, timestamp is unchanged") + epq.mark_label_inference_failed(user_id) + +# Code structure based on emission.analysis.classification.inference.mode.pipeline +# and emission.analysis.classification.inference.mode.rule_engine +class LabelInferencePipeline: + def __init__(self): + self._last_trip_done = None + + @property + def last_trip_done(self): + return self._last_trip_done + + # For a given user and time range, runs all the primary algorithms and ensemble, saves results + # to the database, and records progress + def run_prediction_pipeline(self, user_id, time_range): + self.ts = esta.TimeSeries.get_time_series(user_id) + self.toPredictTrips = esda.get_entries( + esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) + for cleaned_trip in self.toPredictTrips: + # Create an inferred trip + cleaned_trip_dict = copy.copy(cleaned_trip)["data"] + inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict) + + # Run the algorithms and the ensemble, store results + results = self.compute_and_save_algorithms(inferred_trip) + ensemble = self.compute_and_save_ensemble(inferred_trip, results) + + # Put final results into the inferred trip and store it + inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() + inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] + self.ts.insert(inferred_trip) + + if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]: + self._last_trip_done = cleaned_trip + + # This is where the labels for a given trip are actually predicted. + # Though the only information passed in is the trip object, the trip object can provide the + # user_id and other potentially useful information. + def compute_and_save_algorithms(self, trip): + predictions = [] + for algorithm_id, algorithm_fn in primary_algorithms.items(): + prediction = algorithm_fn(trip) + lp = ecwl.Labelprediction() + lp.trip_id = trip.get_id() + lp.algorithm_id = algorithm_id + lp.prediction = prediction + lp.start_ts = trip["data"]["start_ts"] + lp.end_ts = trip["data"]["end_ts"] + self.ts.insert_data(self.user_id, "inference/labels", lp) + predictions.append(lp) + return predictions + + # Combine all our predictions into a single ensemble prediction. + # As a placeholder, we just take the first prediction. + # TODO: implement a real combination algorithm. + def compute_and_save_ensemble(self, trip, predictions): + il = ecwl.Labelprediction() + il.trip_id = trip.get_id() + il.start_ts = trip["data"]["start_ts"] + il.end_ts = trip["data"]["end_ts"] + (il.algorithm_id, il.prediction) = ensemble(trip, predictions) + self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) + return il \ No newline at end of file diff --git a/emission/analysis/modelling/trip_model/gradient_boosted_decision_tree.py b/emission/analysis/modelling/trip_model/gradient_boosted_decision_tree.py new file mode 100644 index 000000000..5b2968318 --- /dev/null +++ b/emission/analysis/modelling/trip_model/gradient_boosted_decision_tree.py @@ -0,0 +1,87 @@ +import logging +from tokenize import group +from typing import Dict, List, Optional, Tuple + +import sklearn.ensemble as ske + +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.util as eamtu +import emission.analysis.modelling.trip_model.config as eamtc +import emission.core.wrapper.confirmedtrip as ecwc + + +class GradientBoostedDecisionTree(eamuu.TripModel): + + is_incremental: bool = False # overwritten during __init__ + + def __init__(self, config=None): + """ + Instantiate a gradient boosted decision tree for all users. + + This uses the sklearn implementation of a gradient boosted + decision tree to classify unlabeled replacement modes. + + https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html + + Replacement modes are considered to be the second-best choice for + a given trip (i.e., what mode would have been chosen if the actual + choice wasn't available). These labels are gathered from the user + along with the chosen mode and trip purpose after the trip takes place. + + The model is currently trained on data from all users. + """ + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.gbdt') + logging.debug(f'GradientBoostedDecisionTree loaded model config from file') + else: + logging.debug(f'GradientBoostedDecisionTree using model config argument') + expected_keys = [ + 'incremental_evaluation', + 'feature_list', + 'dependent_var' + ] + for k in expected_keys: + if config.get(k) is None: + msg = f"gbdt trip model config missing expected key {k}" + raise KeyError(msg) + self.is_incremental = config['incremental_evaluation'] + # use the sklearn implementation of a GBDT + self.gbdt = ske.GradientBoostingClassifier(n_estimators=50) + self.feature_list = config['feature_list'] + self.dependent_var = config['dependent_var'] + + def fit(self, trips: List[ecwc.Confirmedtrip]): + """train the model by passing data, where each row in the data + corresponds to a label at the matching index of the label input + + :param trips: 2D array of features to train from + """ + logging.debug(f'fit called with {len(trips)} trips') + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + X_train, y_train = self.extract_features(trips) + self.gbdt.fit(X_train, y_train) + logging.info(f"gradient boosted decision tree model fit to {len(X_train)} rows of trip data") + logging.info(f"training features were {X_train.columns}") + + def predict(self, trip: ecwc.Confirmedtrip) -> List[str]: + logging.debug(f"running gradient boosted mode prediction") + X_test, y_pred = self.extract_features(trip, is_prediction=True) + y_pred = self.gbdt.predict(X_test) + if y_pred is None: + logging.debug(f"unable to predict mode for trip {trip}") + return [] + else: + logging.debug(f"made predictions {y_pred}") + return y_pred + + def to_dict(self) -> Dict: + return self.gbdt.get_params() + + def from_dict(self, model: Dict): + self.gbdt.set_params(model) + + def extract_features(self, trips: ecwc.Confirmedtrip, is_prediction=False) -> List[float]: + return eamtu.get_replacement_mode_features(self.feature_list, self.dependent_var, trips, is_prediction) diff --git a/emission/analysis/modelling/trip_model/model_storage.py b/emission/analysis/modelling/trip_model/model_storage.py index 7e94d1217..6cce5db63 100644 --- a/emission/analysis/modelling/trip_model/model_storage.py +++ b/emission/analysis/modelling/trip_model/model_storage.py @@ -35,6 +35,57 @@ def from_str(cls, str): msg = f"{str} is not a known ModelStorage, must be one of {names}" raise KeyError(msg) +def load_model_all_users(model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]: + """load a user label model from a model storage location + + :param user_id: the user to request a model for + :param model_type: expected type of model stored + :param model_storage: storage format + :return: the model representation as a Python Dict or None + :raises: TypeError if loaded model has different type than expected type + KeyError if the ModelType is not known + """ + if model_storage == ModelStorage.DOCUMENT_DATABASE: + + # retrieve stored model with timestamp that matches/exceeds the most + # recent PipelineState.TRIP_MODEL entry + ms = esma.ModelStorage.get_model_storage(0) + latest_model_entry = ms.get_current_model(key=esda.REPLACE_MODEL_STORE_KEY) + + if latest_model_entry is None: + logging.debug(f'no {model_type.name} model found') + return None + + write_ts = latest_model_entry['metadata']['write_ts'] + logging.debug(f'retrieved latest trip model recorded at timestamp {write_ts}') + logging.debug(latest_model_entry) + + # parse str to enum for ModelType + latest_model_type_str = latest_model_entry.get('data', {}).get('model_type') + if latest_model_type_str is None: + raise TypeError('stored model does not have a model type') + latest_model_type = eamum.ModelType.from_str(latest_model_type_str) + + # validate and return + if latest_model_entry is None: + return None + elif latest_model_type != model_type: + msg = ( + f"loading model has model type '{latest_model_type.name}' " + f"but was expected to have model type {model_type.name}" + ) + raise TypeError(msg) + else: + return latest_model_entry['data']['model'] + + else: + storage_types_str = ",".join(ModelStorage.names()) + msg = ( + f"unknown model storage type {model_storage}, must be one of " + f"{{{storage_types_str}}}" + ) + raise TypeError(msg) + def load_model(user_id, model_type: eamum.ModelType, model_storage: ModelStorage) -> Optional[Dict]: """load a user label model from a model storage location diff --git a/emission/analysis/modelling/trip_model/model_type.py b/emission/analysis/modelling/trip_model/model_type.py index b5e761fb0..fcb5b552f 100644 --- a/emission/analysis/modelling/trip_model/model_type.py +++ b/emission/analysis/modelling/trip_model/model_type.py @@ -3,6 +3,7 @@ import emission.analysis.modelling.trip_model.trip_model as eamuu import emission.analysis.modelling.similarity.od_similarity as eamso import emission.analysis.modelling.trip_model.greedy_similarity_binning as eamug +import emission.analysis.modelling.trip_model.gradient_boosted_decision_tree as eamtg SIMILARITY_THRESHOLD_METERS=500 @@ -11,6 +12,7 @@ class ModelType(Enum): # ENUM_NAME_CAPS = 'SHORTHAND_NAME_CAPS' GREEDY_SIMILARITY_BINNING = 'GREEDY' + GRADIENT_BOOSTED_DECISION_TREE = 'GBDT' def build(self, config=None) -> eamuu.TripModel: """ @@ -25,7 +27,8 @@ def build(self, config=None) -> eamuu.TripModel: """ # Dict[ModelType, TripModel] MODELS = { - ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config) + ModelType.GREEDY_SIMILARITY_BINNING: eamug.GreedySimilarityBinning(config), + ModelType.GRADIENT_BOOSTED_DECISION_TREE: eamtg.GradientBoostedDecisionTree(config) } model = MODELS.get(self) if model is None: diff --git a/emission/analysis/modelling/trip_model/run_model.py b/emission/analysis/modelling/trip_model/run_model.py index e3e2b1c4e..094eacef1 100644 --- a/emission/analysis/modelling/trip_model/run_model.py +++ b/emission/analysis/modelling/trip_model/run_model.py @@ -118,6 +118,27 @@ def predict_labels_with_n( predictions, n = model.predict(trip) return predictions, n +def predict_labels_with_gbdt( + trip: ecwc.Confirmedtrip, + model_type = eamumt.ModelType.GRADIENT_BOOSTED_DECISION_TREE, + model_storage = eamums.ModelStorage.DOCUMENT_DATABASE, + model_config = None): + """ + invoke the user label prediction model to predict labels for a trip. + + :param trip: the trip to predict labels for + :param model_type: type of prediction model to run + :param model_storage: location to read/write models + :param model_config: optional configuration for model, for debugging purposes + :return: a list of predictions + """ + user_id = trip['user_id'] + model = _load_stored_trip_model_all_users(model_type, model_storage, model_config) + if model is None: + return [], -1 + else: + predictions, n = model.predict(trip) + return predictions, n def _get_training_data(user_id: UUID, time_query: Optional[estt.TimeQuery]): """ @@ -159,6 +180,25 @@ def _load_stored_trip_model( model.from_dict(model_dict) return model +def _load_stored_trip_model_all_users( + model_type: eamumt.ModelType, + model_storage: eamums.ModelStorage, + model_config = None) -> Optional[eamuu.TripModel]: + """helper to build a user label prediction model class with the + contents of a stored model shared across all users. + + :param model_type: TripModel type configured for this OpenPATH server + :param model_storage: storage type + :param model_config: optional configuration for model, for debugging purposes + :return: model, or None if no model is stored for this user + """ + model_dict = eamums.load_model_all_users(model_type, model_storage) + if model_dict is None: + return None + else: + model = model_type.build(model_config) + model.from_dict(model_dict) + return model def _latest_timestamp(trips: List[ecwc.Confirmedtrip]) -> float: """extract the latest timestamp observed from a list of trips diff --git a/emission/analysis/modelling/trip_model/support_vector_machine.py b/emission/analysis/modelling/trip_model/support_vector_machine.py new file mode 100644 index 000000000..f615a66c7 --- /dev/null +++ b/emission/analysis/modelling/trip_model/support_vector_machine.py @@ -0,0 +1,120 @@ +import logging +from tokenize import group +from typing import Dict, List, Optional, Tuple + +import numpy as np +import sklearn as ske + +import emission.analysis.modelling.trip_model.trip_model as eamuu +import emission.analysis.modelling.trip_model.config as eamtc +import emission.core.wrapper.confirmedtrip as ecwc +import emission.analysis.modelling.trip_model.util as eamtu + + +class SupportVectorMachine(eamuu.TripModel): + + is_incremental: bool = False # overwritten during __init__ + is_initialized: bool = False # overwritten during first fit() + + def __init__(self, config=None): + """ + Instantiate a linear support vector machine for all users. + + This uses the sklearn implementation of a support vector machine + to classify unlabeled replacement modes. The SVM is linear, and is fit + with the more general SGDClassifier class which can accommodate online + learning: + + https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html + + For anyone looking to implement a differnt online learning model in the + future, here is a list of sklearn models that implement "partial_fit" + and would be candidates for the online learning approach implemented + here: + + https://scikit-learn.org/0.15/modules/scaling_strategies.html + + Replacement modes are considered to be the second-best choice for + a given trip (i.e., what mode would have been chosen if the actual + choice wasn't available). These labels are gathered from the user + along with the chosen mode and trip purpose after the trip takes place. + + The model is currently trained on data from all users. + """ + if config is None: + config = eamtc.get_config_value_or_raise('model_parameters.svm') + logging.debug(f'SupportVectorMachine loaded model config from file') + else: + logging.debug(f'SupportVectorMachine using model config argument') + expected_keys = [ + 'incremental_evaluation', + 'feature_list', + 'dependent_var' + ] + for k in expected_keys: + if config.get(k) is None: + msg = f"svm trip model config missing expected key {k}" + raise KeyError(msg) + self.is_incremental = config['incremental_evaluation'] + # use the sklearn implementation of a svm + self.svm = ske.linear_model.SGDClassifier() + self.feature_list = config['feature_list'] + self.dependent_var = config['dependent_var'] + + def fit(self, trips: List[ecwc.Confirmedtrip]): + """train the model by passing data, where each row in the data + corresponds to a label at the matching index of the label input. + + If using an incremental model, the initial call to fit will store + the list of unique classes in y. The config file is used to store + a lookup for known classes for each categorical feature. This prevents + the need to store a lookup in the model itself, which must be updated + every time the model sees a new class or feature OR when it is given + an incremental training request that does not contain every feature class + etc. + + :param trips: 2D array of features to train from + """ + logging.debug(f'fit called with {len(trips)} trips') + unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) + if len(unlabeled) > 0: + msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' + raise Exception(msg) + X_train, y_train = self.extract_features(trips) + # the first time partial_fit is called, the incremental classes are initialized to the unique y values + if self.is_incremental and not self.is_initialized: + logging.debug(f'initializing incremental model fit') + self.svm.partial_fit(X_train, y_train, self.dependent_var['classes']) + self.is_initialized = True + # for all future partial fits, there is no need to pass the classes again + elif self.is_incremental and self.is_initialized: + logging.debug(f'updating incremental model fit') + try: + self.svm.partial_fit(X_train, y_train) + except ValueError: + raise ValueError("Error in incremental fit: Likely an unseen feature or dependent class was found") + # if not incremental, just train regularly + else: + self.svm.fit(X_train, y_train) + logging.info(f"support vector machine model fit to {len(X_train)} rows of trip data") + logging.info(f"training features were {X_train.columns}") + + def predict(self, trip: ecwc.Confirmedtrip) -> List[str]: + logging.debug(f"running support vector mode prediction") + X_test, y_pred = self.extract_features(trip, is_prediction=True) + y_pred = self.svm.predict(X_test) + if y_pred is None: + logging.debug(f"unable to predict mode for trip {trip}") + return [] + else: + logging.debug(f"made predictions {y_pred}") + return y_pred + + def to_dict(self) -> Dict: + return self.svm.get_params() + + def from_dict(self, model: Dict): + self.svm.set_params(model) + + def extract_features(self, trips: ecwc.Confirmedtrip, is_prediction=False) -> List[float]: + return eamtu.get_replacement_mode_features(self.feature_list, self.dependent_var, trips, is_prediction) diff --git a/emission/analysis/modelling/trip_model/util.py b/emission/analysis/modelling/trip_model/util.py index 7d22b5d22..31152d262 100644 --- a/emission/analysis/modelling/trip_model/util.py +++ b/emission/analysis/modelling/trip_model/util.py @@ -1,7 +1,100 @@ from typing import List, Tuple from past.utils import old_div +from math import radians, cos, sin, asin, sqrt import numpy from numpy.linalg import norm +import pandas as pd + +import emission.core.get_database as edb +import emission.core.wrapper.confirmedtrip as ecwc + + +# TODO: Currently fails to match the uuid_list to what is in the database. Needs to be split into 1 function which reads demographic data, 1 function which formats features +def get_survey_df(uuid_list, survey_features): + # we use the "survey." identifier to separate out the features which require survey attention in the config, but do not need it in actual key + survey_features_response = [".".join(x.split(".")[1:]) for x in survey_features] + # retrieve survey response records for any user that has supplied a trip which is being trained on + all_survey_results = list(edb.get_timeseries_db().find({"user_id": {"$in": uuid_list}, "metadata.key": "manual/demographic_survey"})) + # these are the uuids that were able to be retrieved from the database; if they don't match the requested ones throw an error since we cannot train/test + survey_result_uuids = [s["user_id"] for s in all_survey_results] + # the challenge is that one of the first dictionary keys changes across users, so we cannot apply a single json_normalize and take feature values + # each unique key will end up as its own column, even if it is really the same feature as others, and will be NaN for all other users + # this is the id that changes across users, we keep a list here to index later when summarizing all of the survey response results to a single df + survey_response_ids = [list(s['data']['jsonDocResponse'].keys())[0] for s in all_survey_results] + result = [] + for i, s in enumerate(all_survey_results): + response = pd.json_normalize(s['data']['jsonDocResponse'][survey_response_ids[i]]) + response = response[survey_features_response] + response.columns = survey_features + response['user_id'] = survey_result_uuids[i] + result.append(response) + return pd.concat(result, axis=0).reset_index(inplace=True, drop=True) + + +def get_replacement_mode_features(feature_list, dependent_var, trips: ecwc.Confirmedtrip, is_prediction=False) -> List[float]: + """extract the features needed to perform replacement mode modeling from a set of + trips. + + recodes variables that are categorical. + + :param feature_list: features to gather from each trip + :type feature_list: List[string] + :param dependent_var: the feature to predict for each trip + :type dependent_var: string + :param is_prediction: whether or not to extract the dependent var + :type is_prediction: bool + :param trips: all trips to extract features from + :type trips: List[ecwc.Confirmedtrip] + :return: the training X features and y for the replacement mode model + :rtype: Tuple[List[List[float]], List[]] + """ + # get dataframe from json trips + trips_df = pd.json_normalize(trips) + # any features that are part of the demographic survey require special attention + # the first nested value of the survey data responses changes depending on the user/response + feature_names = list(feature_list.keys()) + survey_features = [] + nonsurvey_features = [] + for x in feature_names: + if 'survey' in x: + survey_features.append(x) + else: + nonsurvey_features.append(x) + # make sure no features are being lost during separation + assert(len(survey_features) + len(nonsurvey_features) == len(feature_names)) + # need unique response id for every trip to identify survey features in the trip dataframe (key below jsonDocResponse) + if len(survey_features) > 0: + uuid_list = [list(trip['user_id'] for trip in trips)] + survey_df = get_survey_df(uuid_list, survey_features) + X = pd.concat([trips_df[nonsurvey_features], survey_df], axis=1) + else: + X = trips_df[nonsurvey_features] + # any features that are strings must be encoded as numeric variables + # we use one-hot encoding for categorical variables + # https://pbpython.com/pandas_dtypes.html + dummies = [] + numeric = [] + for col in X: + # object column == string or mixed variable + if X[col].dtype=='object': + cat_col = pd.Categorical(X[col], categories=feature_list[col]) + # if new features are present in X_test, throw value error + if cat_col.isnull().any(): + raise ValueError(f"Cannot predict on unseen classes in: {col}") + dummies.append(pd.get_dummies(cat_col, prefix=col)) + else: + numeric.append(X[col]) + numeric.extend(dummies) + X = pd.concat(numeric, axis=1) + # only extract dependent var if fitting a new model + # for the dependent variable of a classification model, sklearn will accept strings + # so no need to recode these to numeric and deal with complications of storing labels + # and decoding them later + if is_prediction: + y = None + else: + y = trips_df[dependent_var['name']] + return X, y def find_knee_point(values: List[float]) -> Tuple[float, int]: diff --git a/emission/core/wrapper/labelprediction.py b/emission/core/wrapper/labelprediction.py index c74609e5a..9725e95ee 100644 --- a/emission/core/wrapper/labelprediction.py +++ b/emission/core/wrapper/labelprediction.py @@ -19,6 +19,7 @@ class AlgorithmTypes(enum.Enum): TWO_STAGE_BIN_CLUSTER = 5 PLACEHOLDER_PREDICTOR_DEMO = 6 CONFIDENCE_DISCOUNTED_CLUSTER = 7 + GRADIENT_BOOSTED_DECISION_TREE = 8 class Labelprediction(ecwb.WrapperBase): diff --git a/emission/storage/decorations/analysis_timeseries_queries.py b/emission/storage/decorations/analysis_timeseries_queries.py index 9f8ab6a70..c684dc127 100644 --- a/emission/storage/decorations/analysis_timeseries_queries.py +++ b/emission/storage/decorations/analysis_timeseries_queries.py @@ -38,6 +38,7 @@ METRICS_DAILY_MEAN_MEDIAN_SPEED = "metrics/daily_mean_median_speed" INFERRED_LABELS_KEY = "inference/labels" TRIP_MODEL_STORE_KEY = "inference/trip_model" +REPLACE_MODEL_STORE_KEY = "inference/replace_model" # General methods diff --git a/emission/tests/modellingTests/TestReplacementTripModels.py b/emission/tests/modellingTests/TestReplacementTripModels.py new file mode 100644 index 000000000..e4fe4b2a7 --- /dev/null +++ b/emission/tests/modellingTests/TestReplacementTripModels.py @@ -0,0 +1,417 @@ +import unittest +import emission.core.get_database as edb +import emission.analysis.modelling.trip_model.gradient_boosted_decision_tree as eamtg +import emission.analysis.modelling.trip_model.support_vector_machine as eamts +import emission.tests.modellingTests.modellingTestAssets as etmm +import logging +import pandas as pd +import random + + +class TestReplacementTripModels(unittest.TestCase): + + def setUp(self) -> None: + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', + level=logging.DEBUG) + + + def testSmoke(self): + """ + the model should fit and predict on normal data without errors + """ + # though we cannot use mode_confirm or purpose_confirm to predict, they are required for mock trip generation + # for now, just pass it to user-label and sensed-label data + label_data = { + "mode_confirm": ['drive'], + "replaced_mode": ['drive','walk'], + "purpose_confirm": ['walk'] + } + # generate $n trips. + n = 20 + m = 5 + trips = etmm.generate_mock_trips( + user_id="joe", + trips=n, + origin=(0, 0), + destination=(1, 1), + label_data=label_data, + sensed_label_data=label_data, + within_threshold=m, + threshold=0.001, # ~ 111 meters in degrees WGS84 + ) + # pass in a test configuration + model_config = { + "incremental_evaluation": False, + "feature_list": { + "data.inferred_labels.mode_confirm": [ + "walk", + "bike", + "drive" + ] + }, + "dependent_var": { + "name": "data.user_input.replaced_mode", + "classes": [ + "walk", + "bike", + "drive", + ] + } + } + model = eamtg.GradientBoostedDecisionTree(model_config) + model.fit(trips) + model.predict(trips) + model = eamts.SupportVectorMachine(model_config) + model.fit(trips) + model.predict(trips) + + +#TODO: These tests were written using a prior version of the model which did not include the sensed data. +# So sensed features were actually being read from the 'user_input' key rather than 'inferred_labels' +# The above test shows how these can be set up to read from inferred labels, however they are dependent on the +# 'add_sensed_labels()' function in modellingTestAssets.py. Once that function correctly samples the labels +# (instead of just using drive for every label) these tests should work again. The exception is 'testFull' which +# will rely on a new function which adds mock-demographic-data to the database. See todo in get_survey_df() under util.py +# for more info on that. + + # def testUnseenFeatures(self): + # """ + # if the input classes for a feature change throw sklearn error + # """ + # train_label_data = { + # "mode_confirm": ['drive'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk'] + # } + # test_label_data = { + # "mode_confirm": ['walk','transit'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk'] + # } + # # generate $n trips. + # n = 20 + # m = 5 + # train_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=train_label_data, + # sensed_label_data=train_label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # test_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=test_label_data, + # sensed_label_data=test_label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # # pass in a test configuration + # model_config = { + # "incremental_evaluation": False, + # "feature_list": { + # "data.inferred_labels.mode_confirm": [ + # "drive" + # ] + # }, + # "dependent_var": { + # "name": "data.user_input.replaced_mode", + # "classes": [ + # "drive", + # "walk" + # ] + # } + # } + # model = eamtg.GradientBoostedDecisionTree(model_config) + # model.fit(train_trips) + # with self.assertRaises(ValueError): + # model.predict(test_trips) + # model = eamts.SupportVectorMachine(model_config) + # model.fit(train_trips) + # with self.assertRaises(ValueError): + # model.predict(test_trips) + + + # def testFull(self): + # """ + # the model should handle survey, trip, and user input features + # """ + # label_data = { + # "mode_confirm": ['walk', 'bike', 'transit'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk','bike','transit'] + # } + # # generate $n trips. + # n = 20 + # m = 5 + # # for the sake of testing, need a UUID that correlates to a survey response in the database; use the first one + # all_survey_results = list(edb.get_timeseries_db().find({"metadata.key": "manual/demographic_survey"})) + # sample_uuid = all_survey_results[0]['user_id'] + # trips = etmm.generate_mock_trips( + # user_id=sample_uuid, + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # print(trips[0]) + # # pass in a test configuration + # model_config = { + # "incremental_evaluation": False, + # "feature_list": { + # "data.inferred_labels.mode_confirm": [ + # "walk", + # "bike", + # "transit" + # ], + # "data.distance": None, + # "survey.group_hg4zz25.How_old_are_you": [ + # '0___25_years_old', + # '26___55_years_old', + # '56___70_years_old' + # ], + # "survey.group_hg4zz25.Are_you_a_student": [ + # 'not_a_student', + # 'yes' + # ], + # "survey.group_pa5ah98.Please_identify_which_category": [ + # '0_to__49_999', + # '_50_000_to__99_999', + # '100_000_or_more' + # ] + # }, + # "dependent_var": { + # "name": "data.user_input.replaced_mode", + # "classes": [ + # "drive", + # "walk", + # "bike", + # "transit" + # ] + # } + # } + # model = eamtg.GradientBoostedDecisionTree(model_config) + # model.fit(trips) + # y = model.predict(trips) + # # No class in predictions that's not in training data + # for predicted_class in pd.unique(y): + # self.assertIn(predicted_class, model_config['dependent_var']['classes']) + + # model = eamts.SupportVectorMachine(model_config) + # model.fit(trips) + # y = model.predict(trips) + # # No class in predictions that's not in training data + # for predicted_class in pd.unique(y): + # self.assertIn(predicted_class, model_config['dependent_var']['classes']) + + + # def testIncremental(self): + # """ + # the model should fit and predict incrementally on normal data without errors + # """ + # label_data = { + # "mode_confirm": ['walk', 'bike', 'transit'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk'] + # } + # # generate $n trips. + # n = 20 + # m = 5 + # initial_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # additional_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n*5, + # origin=(0, 0), + # destination=(1, 1), + # label_data=label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # # pass in a test configuration + # model_config = { + # "incremental_evaluation": True, + # "feature_list": { + # "data.inferred_labels.mode_confirm": [ + # "walk", + # "bike", + # "transit" + # ] + # }, + # "dependent_var": { + # "name": "data.user_input.replaced_mode", + # "classes": [ + # "drive", + # "walk", + # "bike", + # "transit" + # ] + # } + # } + # model = eamts.SupportVectorMachine(model_config) + # # Start with some initialization data + # model.fit(initial_trips) + # # Train on additional sets of data and predict for initial data + # for i in range(0, 5): + # model.fit(additional_trips[i:(i+1)*n]) + # model.predict(initial_trips) + + + # def testUnseenClassesIncremental(self): + # """ + # if the input classes for a feature change throw sklearn error + # """ + # train_label_data = { + # "mode_confirm": ['walk', 'bike', 'transit'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk'] + # } + # test_label_data = { + # "mode_confirm": ['drive'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk'] + # } + # # generate $n trips. + # n = 20 + # m = 5 + # initial_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=train_label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # additional_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n*5, + # origin=(0, 0), + # destination=(1, 1), + # label_data=train_label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # test_trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=test_label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # # pass in a test configuration + # model_config = { + # "incremental_evaluation": False, + # "feature_list": { + # "data.inferred_labels.mode_confirm": [ + # "walk", + # "bike", + # "transit" + # ] + # }, + # "dependent_var": { + # "name": "data.user_input.replaced_mode", + # "classes": [ + # "drive", + # "walk", + # "bike", + # "transit" + # ] + # } + # } + # model = eamts.SupportVectorMachine(model_config) + # # Start with some initialization data + # model.fit(initial_trips) + # # Train on additional sets of data + # for i in range(0, 5): + # model.fit(additional_trips[i:(i+1)*n]) + # # If an unseen class is introduced, allow sklearn to throw error + # with self.assertRaises(ValueError): + # model.predict(test_trips) + + + # def testPredictions(self): + # """ + # with a fixed seed, the model should make consistent predictions + # """ + # random.seed(42) + # label_data = { + # "mode_confirm": ['walk', 'bike', 'transit'], + # "purpose_confirm": ['work', 'home', 'school'], + # "replaced_mode": ['drive','walk','bike','transit'] + # } + # # generate $n trips. + # n = 20 + # m = 5 + # trips = etmm.generate_mock_trips( + # user_id="joe", + # trips=n, + # origin=(0, 0), + # destination=(1, 1), + # label_data=label_data, + # within_threshold=m, + # threshold=0.001, # ~ 111 meters in degrees WGS84 + # ) + # # pass in a test configuration + # model_config = { + # "incremental_evaluation": False, + # "feature_list": { + # "data.inferred_labels.mode_confirm": [ + # "walk", + # "bike", + # "transit" + # ] + # }, + # "dependent_var": { + # "name": "data.user_input.replaced_mode", + # "classes": [ + # "drive", + # "walk", + # "bike", + # "transit" + # ] + # } + # } + # model = eamtg.GradientBoostedDecisionTree(model_config) + # model.fit(trips) + # y = model.predict(trips) + # # Test that predicted == expected + # expected_result = [ + # 'transit', 'bike', 'transit', 'bike', 'transit', 'bike', 'drive', 'transit', + # 'transit', 'drive', 'transit', 'transit', 'bike', 'bike', 'bike', 'transit', + # 'transit', 'transit', 'bike', 'drive' + # ] + # for i, prediction in enumerate(y): + # self.assertEqual(prediction, expected_result[i]) + + # model = eamts.SupportVectorMachine(model_config) + # # there is a separate random number generator in SGDClassifier that + # # must be fixed to get consistent predictions + # model.svm.random_state = (3) + # model.fit(trips) + # y = model.predict(trips) + # # Test that predicted == expected + # # note that it seems with a small dataset the svm tends to predict a single category + # expected_result = [ + # 'drive', 'drive', 'transit', 'drive', 'drive', 'drive', 'drive', 'transit', + # 'transit', 'drive', 'drive', 'transit', 'drive', 'drive', 'drive', 'transit', + # 'drive', 'transit', 'drive', 'drive' + # ] + # for i, prediction in enumerate(y): + # self.assertEqual(prediction, expected_result[i]) diff --git a/emission/tests/modellingTests/modellingTestAssets.py b/emission/tests/modellingTests/modellingTestAssets.py index 879a3a2ca..674eb6f70 100644 --- a/emission/tests/modellingTests/modellingTestAssets.py +++ b/emission/tests/modellingTests/modellingTestAssets.py @@ -1,7 +1,9 @@ import random +import string from typing import Optional, Tuple, List, Dict from uuid import UUID import emission.analysis.modelling.trip_model.trip_model as eamtm +import emission.core.common as ecc import emission.core.wrapper.confirmedtrip as ecwc import emission.core.wrapper.entry as ecwe @@ -120,6 +122,7 @@ def build_mock_trip( "type": "Point", "coordinates": destination }, + "distance": ecc.calDistance(origin, destination), "user_input": labels } @@ -132,6 +135,7 @@ def generate_mock_trips( origin, destination, label_data = None, + sensed_label_data = None, within_threshold = None, start_ts: None = None, end_ts: None = None, @@ -143,7 +147,7 @@ def generate_mock_trips( within a threshold from the provided o/d pair, and some have labels. some other ones can be sampled to appear outside of the threshold of the o/d locations. - label_data is an optional dictionary with labels and sample weights, for example: + label_data/sensed_label-data is optional dictionary with labels and sample weights, for example: { "mode_confirm": ['walk', 'bike'], "replaced_mode": ['drive', 'tnc'], @@ -160,6 +164,7 @@ def generate_mock_trips( :param origin: origin coordinates :param destination: destination coordinates :param label_data: dictionary of label data, see above, defaults to None + :param sensed_label_data: dictionary of sensed data, see above, defaults to None :param within_threshold: number of trips that should fall within the provided distance threshold in degrees WGS84, defaults to None :param threshold: distance threshold in WGS84 for sampling, defaults to 0.01 @@ -187,12 +192,21 @@ def generate_mock_trips( purpose_weights=label_data.get('purpose_weights') ) trip = build_mock_trip(user_id, o, d, labels, start_ts, end_ts) + if sensed_label_data is not None: + trip = add_sensed_labels(trip, sensed_label_data) result.append(trip) random.shuffle(result) return result +def add_sensed_labels(trip, sensed_label_data): + for label in sensed_label_data: + # TODO: currently just makes one inferred label 'drive'; should be random option from sensed_label_data + trip['data']['inferred_labels'] = {'mode_confirm': sensed_label_data[label][0]} + return trip + + if __name__ == '__main__': label_data = { "mode_confirm": ['walk', 'bike', 'drive'],