-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace mode pipeline #892
base: master
Are you sure you want to change the base?
Changes from all commits
6dc4995
8a35ff0
8fa921d
b99c499
f3c7160
a4cb4f3
10e5c74
063d9ab
2e18226
d194d2b
693b8e0
1c06954
9467356
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -156,3 +156,19 @@ def predict_cluster_confidence_discounting(trip, max_confidence=None, first_conf | |
labels = copy.deepcopy(labels) | ||
for l in labels: l["p"] *= confidence_coeff | ||
return labels | ||
|
||
def predict_gradient_boosted_decision_tree(trip, max_confidence=None, first_confidence=None, confidence_multiplier=None): | ||
# load application config | ||
model_type = eamtc.get_model_type() | ||
model_storage = eamtc.get_model_storage() | ||
labels, n = eamur.predict_labels_with_gbdt(trip, model_type, model_storage) | ||
if n <= 0: # No model data or trip didn't match a cluster | ||
logging.debug(f"In predict_gradient_boosted_decision_tree: n={n}; returning as-is") | ||
return labels | ||
|
||
# confidence_coeff = n_to_confidence_coeff(n, max_confidence, first_confidence, confidence_multiplier) | ||
# logging.debug(f"In predict_cluster_confidence_discounting: n={n}; discounting with coefficient {confidence_coeff}") | ||
|
||
labels = copy.deepcopy(labels) | ||
for l in labels: l["p"] *= confidence_coeff | ||
return labels | ||
Comment on lines
+172
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. concretely, this is also wrong because there will not be a label array or probabilities. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# Standard imports | ||
import logging | ||
import random | ||
import copy | ||
|
||
# Our imports | ||
import emission.storage.pipeline_queries as epq | ||
import emission.storage.timeseries.abstract_timeseries as esta | ||
import emission.storage.decorations.analysis_timeseries_queries as esda | ||
import emission.core.wrapper.labelprediction as ecwl | ||
import emission.core.wrapper.entry as ecwe | ||
import emission.analysis.classification.inference.labels.inferrers as eacili | ||
import emission.analysis.classification.inference.labels.ensembles as eacile | ||
|
||
|
||
# For each algorithm in ecwl.AlgorithmTypes that runs on a trip (e.g., not the ensemble, which | ||
# runs on the results of other algorithms), primary_algorithms specifies a corresponding | ||
# function in eacili to run. This makes it easy to plug in additional algorithms later. | ||
primary_algorithms = { | ||
ecwl.AlgorithmTypes.GRADIENT_BOOSTED_DECISION_TREE: eacili.predict_gradient_boosted_decision_tree | ||
} | ||
|
||
# ensemble specifies which algorithm in eacile to run. | ||
# This makes it easy to test various ways of combining various algorithms. | ||
ensemble = eacile.ensemble_first_prediction | ||
|
||
|
||
# Does all the work necessary for a given user | ||
def infer_labels(user_id): | ||
time_query = epq.get_time_range_for_label_inference(user_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, this is not the time range to query for because that will return the time range for the label inference algorithm. You are your own algorithm and you need your own time range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will break the pipeline unless changed. |
||
try: | ||
lip = LabelInferencePipeline() | ||
lip.user_id = user_id | ||
lip.run_prediction_pipeline(user_id, time_query) | ||
if lip.last_trip_done is None: | ||
logging.debug("After run, last_trip_done == None, must be early return") | ||
epq.mark_label_inference_done(user_id, lip.last_trip_done) | ||
except: | ||
logging.exception("Error while inferring labels, timestamp is unchanged") | ||
epq.mark_label_inference_failed(user_id) | ||
|
||
# Code structure based on emission.analysis.classification.inference.mode.pipeline | ||
# and emission.analysis.classification.inference.mode.rule_engine | ||
class LabelInferencePipeline: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, this needs to change for clarity |
||
def __init__(self): | ||
self._last_trip_done = None | ||
|
||
@property | ||
def last_trip_done(self): | ||
return self._last_trip_done | ||
|
||
# For a given user and time range, runs all the primary algorithms and ensemble, saves results | ||
# to the database, and records progress | ||
def run_prediction_pipeline(self, user_id, time_range): | ||
self.ts = esta.TimeSeries.get_time_series(user_id) | ||
self.toPredictTrips = esda.get_entries( | ||
esda.CLEANED_TRIP_KEY, user_id, time_query=time_range) | ||
for cleaned_trip in self.toPredictTrips: | ||
# Create an inferred trip | ||
cleaned_trip_dict = copy.copy(cleaned_trip)["data"] | ||
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict) | ||
|
||
Comment on lines
+60
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you have basically copy-pasted the other pipeline.py, you need to understand how it works and adapt it to be a separate step. |
||
# Run the algorithms and the ensemble, store results | ||
results = self.compute_and_save_algorithms(inferred_trip) | ||
ensemble = self.compute_and_save_ensemble(inferred_trip, results) | ||
|
||
# Put final results into the inferred trip and store it | ||
inferred_trip["data"]["cleaned_trip"] = cleaned_trip.get_id() | ||
inferred_trip["data"]["inferred_labels"] = ensemble["prediction"] | ||
self.ts.insert(inferred_trip) | ||
|
||
if self._last_trip_done is None or self._last_trip_done["data"]["end_ts"] < cleaned_trip["data"]["end_ts"]: | ||
self._last_trip_done = cleaned_trip | ||
|
||
# This is where the labels for a given trip are actually predicted. | ||
# Though the only information passed in is the trip object, the trip object can provide the | ||
# user_id and other potentially useful information. | ||
def compute_and_save_algorithms(self, trip): | ||
predictions = [] | ||
for algorithm_id, algorithm_fn in primary_algorithms.items(): | ||
prediction = algorithm_fn(trip) | ||
lp = ecwl.Labelprediction() | ||
lp.trip_id = trip.get_id() | ||
lp.algorithm_id = algorithm_id | ||
lp.prediction = prediction | ||
lp.start_ts = trip["data"]["start_ts"] | ||
lp.end_ts = trip["data"]["end_ts"] | ||
self.ts.insert_data(self.user_id, "inference/labels", lp) | ||
predictions.append(lp) | ||
return predictions | ||
|
||
# Combine all our predictions into a single ensemble prediction. | ||
# As a placeholder, we just take the first prediction. | ||
# TODO: implement a real combination algorithm. | ||
def compute_and_save_ensemble(self, trip, predictions): | ||
il = ecwl.Labelprediction() | ||
il.trip_id = trip.get_id() | ||
il.start_ts = trip["data"]["start_ts"] | ||
il.end_ts = trip["data"]["end_ts"] | ||
(il.algorithm_id, il.prediction) = ensemble(trip, predictions) | ||
self.ts.insert_data(self.user_id, "analysis/inferred_labels", il) | ||
return il |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import logging | ||
from tokenize import group | ||
from typing import Dict, List, Optional, Tuple | ||
|
||
import sklearn.ensemble as ske | ||
|
||
import emission.analysis.modelling.trip_model.trip_model as eamuu | ||
import emission.analysis.modelling.trip_model.util as eamtu | ||
import emission.analysis.modelling.trip_model.config as eamtc | ||
import emission.core.wrapper.confirmedtrip as ecwc | ||
|
||
|
||
class GradientBoostedDecisionTree(eamuu.TripModel): | ||
|
||
is_incremental: bool = False # overwritten during __init__ | ||
|
||
def __init__(self, config=None): | ||
""" | ||
Instantiate a gradient boosted decision tree for all users. | ||
|
||
This uses the sklearn implementation of a gradient boosted | ||
decision tree to classify unlabeled replacement modes. | ||
|
||
https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.GradientBoostingClassifier.html | ||
|
||
Replacement modes are considered to be the second-best choice for | ||
a given trip (i.e., what mode would have been chosen if the actual | ||
choice wasn't available). These labels are gathered from the user | ||
along with the chosen mode and trip purpose after the trip takes place. | ||
|
||
The model is currently trained on data from all users. | ||
""" | ||
if config is None: | ||
config = eamtc.get_config_value_or_raise('model_parameters.gbdt') | ||
logging.debug(f'GradientBoostedDecisionTree loaded model config from file') | ||
else: | ||
logging.debug(f'GradientBoostedDecisionTree using model config argument') | ||
expected_keys = [ | ||
'incremental_evaluation', | ||
'feature_list', | ||
'dependent_var' | ||
] | ||
for k in expected_keys: | ||
if config.get(k) is None: | ||
msg = f"gbdt trip model config missing expected key {k}" | ||
raise KeyError(msg) | ||
self.is_incremental = config['incremental_evaluation'] | ||
# use the sklearn implementation of a GBDT | ||
self.gbdt = ske.GradientBoostingClassifier(n_estimators=50) | ||
self.feature_list = config['feature_list'] | ||
self.dependent_var = config['dependent_var'] | ||
|
||
def fit(self, trips: List[ecwc.Confirmedtrip]): | ||
"""train the model by passing data, where each row in the data | ||
corresponds to a label at the matching index of the label input | ||
|
||
:param trips: 2D array of features to train from | ||
""" | ||
logging.debug(f'fit called with {len(trips)} trips') | ||
unlabeled = list(filter(lambda t: len(t['data']['user_input']) == 0, trips)) | ||
if len(unlabeled) > 0: | ||
msg = f'model.fit cannot be called with unlabeled trips, found {len(unlabeled)}' | ||
raise Exception(msg) | ||
X_train, y_train = self.extract_features(trips) | ||
self.gbdt.fit(X_train, y_train) | ||
logging.info(f"gradient boosted decision tree model fit to {len(X_train)} rows of trip data") | ||
logging.info(f"training features were {X_train.columns}") | ||
|
||
def predict(self, trip: ecwc.Confirmedtrip) -> List[str]: | ||
logging.debug(f"running gradient boosted mode prediction") | ||
X_test, y_pred = self.extract_features(trip, is_prediction=True) | ||
y_pred = self.gbdt.predict(X_test) | ||
if y_pred is None: | ||
logging.debug(f"unable to predict mode for trip {trip}") | ||
return [] | ||
else: | ||
logging.debug(f"made predictions {y_pred}") | ||
return y_pred | ||
|
||
def to_dict(self) -> Dict: | ||
return self.gbdt.get_params() | ||
|
||
def from_dict(self, model: Dict): | ||
self.gbdt.set_params(model) | ||
|
||
def extract_features(self, trips: ecwc.Confirmedtrip, is_prediction=False) -> List[float]: | ||
return eamtu.get_replacement_mode_features(self.feature_list, self.dependent_var, trips, is_prediction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like it is just a copy/paste of the previous
predict_cluster_confidence_discounting
Why does this have to be in the
labels
directory anyway?labels
is for predicting labels based on other labelsreplaced_mode
is for predicting the replaced mode based on other characteristics (e.g. demographics).So while it is appropriate to have this be inspired by the label assist algorithm, it is its own algorithm/model, and for clarity, it should be in its own directory. Its scaffolding can be similar to the label assist, but it is not a label assist.