diff --git a/activitysim/abm/models/disaggregate_accessibility.py b/activitysim/abm/models/disaggregate_accessibility.py index aa2703e19..e5a66916b 100644 --- a/activitysim/abm/models/disaggregate_accessibility.py +++ b/activitysim/abm/models/disaggregate_accessibility.py @@ -15,7 +15,11 @@ from activitysim.abm.models.util import tour_destination from activitysim.abm.tables import shadow_pricing from activitysim.core import estimation, los, tracing, util, workflow -from activitysim.core.configuration.base import PreprocessorSettings, PydanticReadable +from activitysim.core.configuration.base import ( + ComputeSettings, + PreprocessorSettings, + PydanticReadable, +) from activitysim.core.configuration.logit import TourLocationComponentSettings from activitysim.core.expressions import assign_columns @@ -184,6 +188,8 @@ class DisaggregateAccessibilitySettings(PydanticReadable, extra="forbid"): If not supplied or None, will default to the chunk size in the location choice model settings. """ + compute_settings: ComputeSettings | None = None + def read_disaggregate_accessibility_yaml( state: workflow.State, file_name @@ -783,6 +789,11 @@ def get_disaggregate_logsums( if disagg_model_settings.explicit_chunk is not None: model_settings.explicit_chunk = disagg_model_settings.explicit_chunk + # Can set compute settings for disaggregate accessibility + # Otherwise this will be set to whatever is in the location model settings + if disagg_model_settings.compute_settings is not None: + model_settings.compute_settings = disagg_model_settings.compute_settings + # Include the suffix tags to pass onto downstream logsum models (e.g., tour mode choice) if model_settings.LOGSUM_SETTINGS: suffixes = util.concat_suffix_dict(disagg_model_settings.suffixes) diff --git a/activitysim/abm/models/joint_tour_participation.py b/activitysim/abm/models/joint_tour_participation.py index 778ae0f44..1c8f3af9d 100644 --- a/activitysim/abm/models/joint_tour_participation.py +++ b/activitysim/abm/models/joint_tour_participation.py @@ -20,8 +20,8 @@ ) from activitysim.core.configuration.base import ComputeSettings, PreprocessorSettings from activitysim.core.configuration.logit import LogitComponentSettings -from activitysim.core.util import assign_in_place, reindex from activitysim.core.exceptions import InvalidTravelError +from activitysim.core.util import assign_in_place, reindex logger = logging.getLogger(__name__) @@ -127,7 +127,7 @@ def get_tour_satisfaction(candidates, participate): def participants_chooser( state: workflow.State, - probs: pd.DataFrame, + probs_or_utils: pd.DataFrame, choosers: pd.DataFrame, spec: pd.DataFrame, trace_label: str, @@ -147,9 +147,10 @@ def participants_chooser( Parameters ---------- - probs : pandas.DataFrame + probs_or_utils : pandas.DataFrame Rows for choosers and columns for the alternatives from which they - are choosing. Values are expected to be valid probabilities across + are choosing. If running with explicit_error_terms, these are utilities. + Otherwise, values are expected to be valid probabilities across each row, e.g. they should sum to 1. choosers : pandas.dataframe simple_simulate choosers df @@ -166,7 +167,7 @@ def participants_chooser( """ - assert probs.index.equals(choosers.index) + assert probs_or_utils.index.equals(choosers.index) # choice is boolean (participate or not) model_settings = JointTourParticipationSettings.read_settings_file( @@ -202,7 +203,7 @@ def participants_chooser( "%s max iterations exceeded (%s).", trace_label, MAX_ITERATIONS ) diagnostic_cols = ["tour_id", "household_id", "composition", "adult"] - unsatisfied_candidates = candidates[diagnostic_cols].join(probs) + unsatisfied_candidates = candidates[diagnostic_cols].join(probs_or_utils) state.tracing.write_csv( unsatisfied_candidates, file_name="%s.UNSATISFIED" % trace_label, @@ -215,9 +216,31 @@ def participants_chooser( f"Forcing joint tour participation for {num_tours_remaining} tours." ) # anybody with probability > 0 is forced to join the joint tour - probs[choice_col] = np.where(probs[choice_col] > 0, 1, 0) - non_choice_col = [col for col in probs.columns if col != choice_col][0] - probs[non_choice_col] = 1 - probs[choice_col] + if state.settings.use_explicit_error_terms: + # need "is valid choice" such that we certainly choose those with non-zero values, + # and do not choose others. Let's use 3.0 as large value here. + probs_or_utils[choice_col] = np.where( + probs_or_utils[choice_col] > logit.UTIL_MIN, + 3.0, + logit.UTIL_UNAVAILABLE, + ) + non_choice_col = [ + col for col in probs_or_utils.columns if col != choice_col + ][0] + probs_or_utils[non_choice_col] = np.where( + probs_or_utils[choice_col] <= logit.UTIL_MIN, + 3.0, + logit.UTIL_UNAVAILABLE, + ) + else: + probs_or_utils[choice_col] = np.where( + probs_or_utils[choice_col] > 0, 1, 0 + ) + non_choice_col = [ + col for col in probs_or_utils.columns if col != choice_col + ][0] + probs_or_utils[non_choice_col] = 1 - probs_or_utils[choice_col] + if iter > MAX_ITERATIONS + 1: raise InvalidTravelError( f"{num_tours_remaining} tours could not be satisfied even with forcing participation" @@ -227,8 +250,13 @@ def participants_chooser( f"{num_tours_remaining} tours could not be satisfied after {iter} iterations" ) - choices, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=choosers + choice_function = ( + logit.make_choices_utility_based + if state.settings.use_explicit_error_terms + else logit.make_choices + ) + choices, rands = choice_function( + state, probs_or_utils, trace_label=trace_label, trace_choosers=choosers ) participate = choices == PARTICIPATE_CHOICE @@ -252,7 +280,7 @@ def participants_chooser( rands_list.append(rands[satisfied]) # remove candidates of satisfied tours - probs = probs[~satisfied] + probs_or_utils = probs_or_utils[~satisfied] candidates = candidates[~satisfied] logger.debug( @@ -401,6 +429,16 @@ def joint_tour_participation( if i not in model_settings.compute_settings.protect_columns: model_settings.compute_settings.protect_columns.append(i) + # TODO EET: this is related to the difference in nested logit and logit choice as per comment in + # make_choices_utility_based. As soon as alt_order_array is removed from arguments to + # make_choices_explicit_error_term_nl this guard can be removed + if state.settings.use_explicit_error_terms: + assert ( + nest_spec is None + ), "Nested logit model custom chooser for EET requires name_mapping, currently not implemented in jtp" + + custom_chooser = participants_chooser + choices = simulate.simple_simulate_by_chunk_id( state, choosers=candidates, @@ -409,7 +447,7 @@ def joint_tour_participation( locals_d=constants, trace_label=trace_label, trace_choice_name="participation", - custom_chooser=participants_chooser, + custom_chooser=custom_chooser, estimator=estimator, compute_settings=model_settings.compute_settings, ) diff --git a/activitysim/abm/models/trip_departure_choice.py b/activitysim/abm/models/trip_departure_choice.py index 7b34f8e74..a0bf95e5a 100644 --- a/activitysim/abm/models/trip_departure_choice.py +++ b/activitysim/abm/models/trip_departure_choice.py @@ -24,10 +24,10 @@ PreprocessorSettings, PydanticCompute, ) +from activitysim.core.exceptions import SegmentedSpecificationError from activitysim.core.skim_dataset import SkimDataset from activitysim.core.skim_dictionary import SkimDict from activitysim.core.util import reindex -from activitysim.core.exceptions import SegmentedSpecificationError logger = logging.getLogger(__name__) @@ -351,37 +351,51 @@ def choose_tour_leg_pattern( column_labels=["alternative", "utility"], ) - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment - ) + if state.settings.use_explicit_error_terms: + utilities_df = logit.validate_utils( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices_utility_based( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) + + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, utilities_df, trace_label=trace_label, trace_choosers=trip_segment + ) - chunk_sizer.log_df(trace_label, "probs", probs) + chunk_sizer.log_df(trace_label, "probs", probs) - del utilities_df - chunk_sizer.log_df(trace_label, "utilities_df", None) + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) + + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=trip_segment ) - # make choices - # positions is series with the chosen alternative represented as a column index in probs - # which is an integer between zero and num alternatives in the alternative sample - positions, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=trip_segment - ) + del probs + chunk_sizer.log_df(trace_label, "probs", None) chunk_sizer.log_df(trace_label, "positions", positions) chunk_sizer.log_df(trace_label, "rands", rands) - del probs - chunk_sizer.log_df(trace_label, "probs", None) - # shouldn't have chosen any of the dummy pad utilities assert positions.max() < max_sample_count diff --git a/activitysim/abm/models/util/cdap.py b/activitysim/abm/models/util/cdap.py index 21f42de82..48ec7a31a 100644 --- a/activitysim/abm/models/util/cdap.py +++ b/activitysim/abm/models/util/cdap.py @@ -999,11 +999,18 @@ def household_activity_choices( # add joint util to util utils = utils.add(joint_tour_utils) - probs = logit.utils_to_probs(state, utils, trace_label=trace_label) + if state.settings.use_explicit_error_terms: + utils = logit.validate_utils(state, utils, trace_label=trace_label) - # select an activity pattern alternative for each household based on probability - # result is a series indexed on _hh_index_ with the (0 based) index of the column from probs - idx_choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + idx_choices, rands = logit.make_choices_utility_based( + state, utils, trace_label=trace_label + ) + else: + probs = logit.utils_to_probs(state, utils, trace_label=trace_label) + + # select an activity pattern alternative for each household based on probability + # result is a series indexed on _hh_index_ with the (0 based) index of the column from probs + idx_choices, rands = logit.make_choices(state, probs, trace_label=trace_label) # convert choice expressed as index into alternative name from util column label choices = pd.Series(utils.columns[idx_choices].values, index=utils.index) @@ -1021,16 +1028,20 @@ def household_activity_choices( "%s.hhsize%d_utils" % (trace_label, hhsize), column_labels=["expression", "household"], ) - state.tracing.trace_df( - probs, - "%s.hhsize%d_probs" % (trace_label, hhsize), - column_labels=["expression", "household"], - ) + + if not state.settings.use_explicit_error_terms: + state.tracing.trace_df( + probs, + "%s.hhsize%d_probs" % (trace_label, hhsize), + column_labels=["expression", "household"], + ) + state.tracing.trace_df( choices, "%s.hhsize%d_activity_choices" % (trace_label, hhsize), column_labels=["expression", "household"], ) + state.tracing.trace_df( rands, "%s.hhsize%d_rands" % (trace_label, hhsize), columns=[None, "rand"] ) diff --git a/activitysim/core/configuration/base.py b/activitysim/core/configuration/base.py index 6936e6d1a..aad266ff1 100644 --- a/activitysim/core/configuration/base.py +++ b/activitysim/core/configuration/base.py @@ -135,6 +135,10 @@ class ComputeSettings(PydanticBase): Sharrow settings for a component. """ + # Make this more general compute settings and use for explicit error term overrides + # Default None work for sub-components defined in getter below (eet_subcomponent) + use_explicit_error_terms: None | bool | dict[str, bool] = None + sharrow_skip: bool | dict[str, bool] = False """Skip sharrow when evaluating this component. @@ -218,6 +222,13 @@ def should_skip(self, subcomponent: str) -> bool: else: return bool(self.sharrow_skip) + def eet_subcomponent(self, subcomponent: str) -> bool: + """Check for EET overrides for a particular subcomponent.""" + if isinstance(self.use_explicit_error_terms, dict): + return self.use_explicit_error_terms.get(subcomponent, None) + else: + return self.use_explicit_error_terms + @contextmanager def pandas_option_context(self): """Context manager to set pandas options for compute settings.""" @@ -266,6 +277,7 @@ def subcomponent_settings(self, subcomponent: str) -> ComputeSettings: use_numba=self.use_numba, drop_unused_columns=self.drop_unused_columns, protect_columns=self.protect_columns, + use_explicit_error_terms=self.eet_subcomponent(subcomponent), ) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 405560810..bded7123d 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -701,6 +701,7 @@ def _check_store_skims_in_shm(self): "memory_profile", "instrument", "sharrow", + "use_explicit_error_terms", ) """ Setting to log on startup. @@ -778,7 +779,18 @@ def _check_store_skims_in_shm(self): """ run checks to validate that YAML settings files are loadable and spec and coefficent csv can be resolved. - should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. + should catch many common errors early, including missing required configurations or specified coefficient labels without defined values. + """ + + use_explicit_error_terms: bool = False + """ + Make choice from random utility model by drawing from distribution of unobserved + part of utility and taking the maximum of total utility. + + Defaults to standard Monte Carlo method, i.e., calculating probabilities and then + drawing a single uniform random number to draw from cumulative probabily. + + .. versionadded:: 1.x """ other_settings: dict[str, Any] = None diff --git a/activitysim/core/interaction_sample.py b/activitysim/core/interaction_sample.py index 52116638b..66e59eed8 100644 --- a/activitysim/core/interaction_sample.py +++ b/activitysim/core/interaction_sample.py @@ -18,15 +18,109 @@ workflow, ) from activitysim.core.configuration.base import ComputeSettings +from activitysim.core.exceptions import SegmentedSpecificationError from activitysim.core.skim_dataset import DatasetWrapper from activitysim.core.skim_dictionary import SkimWrapper -from activitysim.core.exceptions import SegmentedSpecificationError +from activitysim.core.workflow import State logger = logging.getLogger(__name__) DUMP = False +def make_sample_choices_utility_based( + state: workflow.State, + choosers, + utilities, + alternatives, + sample_size, + alternative_count, + alt_col_name, + allow_zero_probs, + trace_label, + chunk_sizer, +): + assert isinstance(utilities, pd.DataFrame) + assert utilities.shape == (len(choosers), alternative_count) + + assert isinstance(alternatives, pd.DataFrame) + assert len(alternatives) == alternative_count + + if allow_zero_probs: + zero_probs = ( + utilities.sum(axis=1) <= utilities.shape[1] * logit.UTIL_UNAVAILABLE + ) + if zero_probs.all(): + utils = pd.DataFrame( + columns=[alt_col_name, "rand", "prob", choosers.index.name] + ) + probs = pd.DataFrame(0.0, index=utils.index, columns=utils.columns) + return utils, probs + if zero_probs.any(): + # remove from sample + utilities = utilities[~zero_probs] + choosers = choosers[~zero_probs] + + utils_array = utilities.to_numpy() + chunk_sizer.log_df(trace_label, "utils_array", utils_array) + chosen_destinations = [] + + rands = state.get_rn_generator().gumbel_for_df(utilities, n=alternative_count) + chunk_sizer.log_df(trace_label, "rands", rands) + + # TODO-EET [janzill Jun2022]: using for-loop to keep memory usage low, an array of dimension + # (len(choosers), alternative_count, sample_size) can get very large. Probably better to + # use chunking for this. + for i in range(sample_size): + # created this once for memory logging + if i > 0: + rands = state.get_rn_generator().gumbel_for_df( + utilities, n=alternative_count + ) + chosen_destinations.append(np.argmax(utils_array + rands, axis=1)) + chosen_destinations = np.concatenate(chosen_destinations, axis=0) + + chunk_sizer.log_df(trace_label, "chosen_destinations", chosen_destinations) + + del utils_array + chunk_sizer.log_df(trace_label, "utils_array", None) + del rands + chunk_sizer.log_df(trace_label, "rands", None) + + chooser_idx = np.tile(np.arange(utilities.shape[0]), sample_size) + chunk_sizer.log_df(trace_label, "chooser_idx", chooser_idx) + + probs = logit.utils_to_probs( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + overflow_protection=not allow_zero_probs, + trace_choosers=choosers, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + choices_df = pd.DataFrame( + { + alt_col_name: alternatives.index.values[chosen_destinations], + "prob": probs.to_numpy()[chooser_idx, chosen_destinations], + choosers.index.name: choosers.index.values[chooser_idx], + } + ) + chunk_sizer.log_df(trace_label, "choices_df", choices_df) + + del chooser_idx + chunk_sizer.log_df(trace_label, "chooser_idx", None) + del chosen_destinations + chunk_sizer.log_df(trace_label, "chosen_destinations", None) + + # handing this off to caller + chunk_sizer.log_df(trace_label, "probs", None) + chunk_sizer.log_df(trace_label, "choices_df", None) + + return choices_df, probs + + def make_sample_choices( state: workflow.State, choosers, @@ -191,12 +285,12 @@ def _interaction_sample( choices_df : pandas.DataFrame A DataFrame where index should match the index of the choosers DataFrame - and columns alt_col_name, prob, rand, pick_count + and columns alt_col_name, prob, pick_count + alt_col_name: int + the identifier of the alternatives prob: float the probability of the chosen alternative - rand: float - the rand that did the choosing pick_count : int number of duplicate picks for chooser, alt """ @@ -443,54 +537,55 @@ def _interaction_sample( state.tracing.dump_df(DUMP, utilities, trace_label, "utilities") - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, - utilities, - allow_zero_probs=allow_zero_probs, - trace_label=trace_label, - trace_choosers=choosers, - overflow_protection=not allow_zero_probs, - ) - chunk_sizer.log_df(trace_label, "probs", probs) - - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) - - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + if compute_settings.use_explicit_error_terms is not None: + use_eet = compute_settings.use_explicit_error_terms + logger.info( + f"Interaction sample model-specific EET overrides for {trace_label}: eet = {use_eet}" ) + else: + use_eet = state.settings.use_explicit_error_terms if sample_size == 0: - # FIXME return full alternative set rather than sample - logger.info( - "Estimation mode for %s using unsampled alternatives" % (trace_label,) - ) + # Return full alternative set rather than sample + logger.info("Using unsampled alternatives for %s" % (trace_label,)) - index_name = probs.index.name + index_name = utilities.index.name choices_df = ( - pd.melt(probs.reset_index(), id_vars=[index_name]) + pd.melt( + utilities.reset_index(), + id_vars=[index_name], + value_name="prob", + var_name=alt_col_name, + ) .sort_values(by=index_name, kind="mergesort") .set_index(index_name) - .rename(columns={"value": "prob"}) - .drop(columns="variable") + .assign(prob=1) + .assign(pick_count=1) ) + chunk_sizer.log_df(trace_label, "choices_df", choices_df) - choices_df["pick_count"] = 1 - choices_df.insert( - 0, alt_col_name, np.tile(alternatives.index.values, len(choosers.index)) - ) + # utilities are numbered 0..n-1 so we need to map back to alt ids + alternative_map = pd.Series(alternatives.index).to_dict() + choices_df[alt_col_name] = choices_df[alt_col_name].map(alternative_map) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) return choices_df - else: - choices_df = make_sample_choices( + + if use_eet: + utilities = logit.validate_utils( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + ) + + choices_df, probs = make_sample_choices_utility_based( state, choosers, - probs, + utilities, alternatives, sample_size, alternative_count, @@ -500,45 +595,77 @@ def _interaction_sample( chunk_sizer=chunk_sizer, ) - chunk_sizer.log_df(trace_label, "choices_df", choices_df) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if estimation.manager.enabled and sample_size > 0: - # we need to ensure chosen alternative is included in the sample - survey_choices = estimation.manager.get_survey_destination_choices( - state, choosers, trace_label - ) - if survey_choices is not None: - assert ( - survey_choices.index == choosers.index - ).all(), "survey_choices and choosers must have the same index" - survey_choices.name = alt_col_name - survey_choices = survey_choices.dropna().astype( - choices_df[alt_col_name].dtype + if estimation.manager.enabled and sample_size > 0: + + choices_df = _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives, + choices_df, + choosers, + probs, + state, + trace_label, ) - # merge all survey choices onto choices_df - probs_df = probs.reset_index().melt( - id_vars=[choosers.index.name], - var_name=alt_col_name, - value_name="prob", + del probs + chunk_sizer.log_df(trace_label, "probs", None) + + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, + utilities, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) + + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], ) - # probs are numbered 0..n-1 so we need to map back to alt ids - zone_map = pd.Series(alternatives.index).to_dict() - probs_df[alt_col_name] = probs_df[alt_col_name].map(zone_map) - - survey_choices = pd.merge( - survey_choices, - probs_df, - on=[choosers.index.name, alt_col_name], - how="left", + + choices_df = make_sample_choices( + state, + choosers, + probs, + alternatives, + sample_size, + alternative_count, + alt_col_name, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + chunk_sizer=chunk_sizer, + ) + + chunk_sizer.log_df(trace_label, "choices_df", choices_df) + + if estimation.manager.enabled and sample_size > 0: + choices_df = _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives, + choices_df, + choosers, + probs, + state, + trace_label, ) - survey_choices["rand"] = 0 - survey_choices["prob"].fillna(0, inplace=True) - choices_df = pd.concat([choices_df, survey_choices], ignore_index=True) - choices_df.sort_values(by=[choosers.index.name], inplace=True) - del probs - chunk_sizer.log_df(trace_label, "probs", None) + del probs + chunk_sizer.log_df(trace_label, "probs", None) + + chunk_sizer.log_df(trace_label, "choices_df", choices_df) # pick_count and pick_dup # pick_count is number of duplicate picks @@ -570,8 +697,10 @@ def _interaction_sample( column_labels=["sample_alt", "alternative"], ) - # don't need this after tracing - del choices_df["rand"] + if not state.settings.use_explicit_error_terms: + # don't need this after tracing + del choices_df["rand"] + chunk_sizer.log_df(trace_label, "choices_df", choices_df) # - NARROW @@ -582,6 +711,49 @@ def _interaction_sample( return choices_df +def _ensure_chosen_alts_in_sample( + alt_col_name, + alternatives: pd.DataFrame, + choices_df: pd.DataFrame, + choosers: pd.DataFrame, + probs: pd.DataFrame, + state: State, + trace_label, +) -> pd.DataFrame: + # we need to ensure chosen alternative is included in the sample + survey_choices = estimation.manager.get_survey_destination_choices( + state, choosers, trace_label + ) + if survey_choices is not None: + assert ( + survey_choices.index == choosers.index + ).all(), "survey_choices and choosers must have the same index" + survey_choices.name = alt_col_name + survey_choices = survey_choices.dropna().astype(choices_df[alt_col_name].dtype) + + # merge all survey choices onto choices_df + probs_df = probs.reset_index().melt( + id_vars=[choosers.index.name], + var_name=alt_col_name, + value_name="prob", + ) + # probs are numbered 0..n-1 so we need to map back to alt ids + zone_map = pd.Series(alternatives.index).to_dict() + probs_df[alt_col_name] = probs_df[alt_col_name].map(zone_map) + + survey_choices = pd.merge( + survey_choices, + probs_df, + on=[choosers.index.name, alt_col_name], + how="left", + ) + survey_choices["rand"] = 0 + survey_choices["prob"].fillna(0, inplace=True) + choices_df = pd.concat([choices_df, survey_choices], ignore_index=True) + choices_df.sort_values(by=[choosers.index.name], inplace=True) + return choices_df + + def interaction_sample( state: workflow.State, choosers: pd.DataFrame, @@ -675,6 +847,7 @@ def interaction_sample( # FIXME - legacy logic - not sure this is needed or even correct? sample_size = min(sample_size, len(alternatives.index)) + logger.info(f" --- interaction_sample sample size = {sample_size}") result_list = [] for ( diff --git a/activitysim/core/interaction_sample_simulate.py b/activitysim/core/interaction_sample_simulate.py index df1c53fa0..3c6a673ed 100644 --- a/activitysim/core/interaction_sample_simulate.py +++ b/activitysim/core/interaction_sample_simulate.py @@ -9,8 +9,8 @@ from activitysim.core import chunk, interaction_simulate, logit, tracing, util, workflow from activitysim.core.configuration.base import ComputeSettings -from activitysim.core.simulate import set_skim_wrapper_targets from activitysim.core.exceptions import SegmentedSpecificationError +from activitysim.core.simulate import set_skim_wrapper_targets logger = logging.getLogger(__name__) @@ -287,50 +287,88 @@ def _interaction_sample_simulate( column_labels=["alternative", "utility"], ) - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - if want_logsums: - probs, logsums = logit.utils_to_probs( + if state.settings.use_explicit_error_terms: + if want_logsums: + logsums = logit.utils_to_logsums( + utilities_df, allow_zero_probs=allow_zero_probs + ) + chunk_sizer.log_df(trace_label, "logsums", logsums) + + if skip_choice: + return choosers.join(logsums.to_frame("logsums")) + + utilities_df = logit.validate_utils( state, utilities_df, allow_zero_probs=allow_zero_probs, trace_label=trace_label, trace_choosers=choosers, - overflow_protection=not allow_zero_probs, - return_logsums=True, ) - chunk_sizer.log_df(trace_label, "logsums", logsums) - else: - probs = logit.utils_to_probs( - state, - utilities_df, - allow_zero_probs=allow_zero_probs, - trace_label=trace_label, - trace_choosers=choosers, - overflow_protection=not allow_zero_probs, + + if allow_zero_probs: + zero_probs = ( + utilities_df.sum(axis=1) + <= utilities_df.shape[1] * logit.UTIL_UNAVAILABLE + ) + if zero_probs.any(): + # copied from proabability below, fix when that gets fixed + # FIXME this is kind of gnarly, but we force choice of first alt + utilities_df.loc[ + zero_probs, 0 + ] = 3.0 # arbitrary value much larger than UTIL_UNAVAILABLE + + # positions is series with the chosen alternative represented as a column index in utilities_df + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices_utility_based( + state, utilities_df, trace_label=trace_label, trace_choosers=choosers ) - chunk_sizer.log_df(trace_label, "probs", probs) - del utilities_df - chunk_sizer.log_df(trace_label, "utilities_df", None) + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + if want_logsums: + probs, logsums = logit.utils_to_probs( + state, + utilities_df, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + return_logsums=True, + ) + chunk_sizer.log_df(trace_label, "logsums", logsums) + else: + probs = logit.utils_to_probs( + state, + utilities_df, + allow_zero_probs=allow_zero_probs, + trace_label=trace_label, + trace_choosers=choosers, + overflow_protection=not allow_zero_probs, + ) + chunk_sizer.log_df(trace_label, "probs", probs) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], - ) + del utilities_df + chunk_sizer.log_df(trace_label, "utilities_df", None) - if allow_zero_probs: - zero_probs = probs.sum(axis=1) == 0 - if zero_probs.any(): - # FIXME this is kind of gnarly, but we force choice of first alt - probs.loc[zero_probs, 0] = 1.0 + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) - if skip_choice: - return choosers.join(logsums.to_frame("logsums")) + if allow_zero_probs: + zero_probs = probs.sum(axis=1) == 0 + if zero_probs.any(): + # FIXME this is kind of gnarly, but we force choice of first alt + probs.loc[zero_probs, 0] = 1.0 + + if skip_choice: + return choosers.join(logsums.to_frame("logsums")) - else: # make choices # positions is series with the chosen alternative represented as a column index in probs # which is an integer between zero and num alternatives in the alternative sample @@ -338,59 +376,59 @@ def _interaction_sample_simulate( state, probs, trace_label=trace_label, trace_choosers=choosers ) - chunk_sizer.log_df(trace_label, "positions", positions) - chunk_sizer.log_df(trace_label, "rands", rands) - del probs chunk_sizer.log_df(trace_label, "probs", None) - # shouldn't have chosen any of the dummy pad utilities - assert positions.max() < max_sample_count + chunk_sizer.log_df(trace_label, "positions", positions) + chunk_sizer.log_df(trace_label, "rands", rands) - # need to get from an integer offset into the alternative sample to the alternative index - # that is, we want the index value of the row that is offset by rows into the - # tranche of this choosers alternatives created by cross join of alternatives and choosers + # shouldn't have chosen any of the dummy pad utilities + assert positions.max() < max_sample_count - # resulting pandas Int64Index has one element per chooser row and is in same order as choosers - choices = alternatives[choice_column].take(positions + first_row_offsets) + # need to get from an integer offset into the alternative sample to the alternative index + # that is, we want the index value of the row that is offset by rows into the + # tranche of this choosers alternatives created by cross join of alternatives and choosers - # create a series with index from choosers and the index of the chosen alternative - choices = pd.Series(choices, index=choosers.index) + # resulting pandas Int64Index has one element per chooser row and is in same order as choosers + choices = alternatives[choice_column].take(positions + first_row_offsets) - chunk_sizer.log_df(trace_label, "choices", choices) + # create a series with index from choosers and the index of the chosen alternative + choices = pd.Series(choices, index=choosers.index) - if allow_zero_probs and zero_probs.any() and zero_prob_choice_val is not None: - # FIXME this is kind of gnarly, patch choice for zero_probs - choices.loc[zero_probs] = zero_prob_choice_val + chunk_sizer.log_df(trace_label, "choices", choices) - if have_trace_targets: - state.tracing.trace_df( - choices, - tracing.extend_trace_label(trace_label, "choices"), - columns=[None, trace_choice_name], - ) + if allow_zero_probs and zero_probs.any() and zero_prob_choice_val is not None: + # FIXME this is kind of gnarly, patch choice for zero_probs + choices.loc[zero_probs] = zero_prob_choice_val + + if have_trace_targets: + state.tracing.trace_df( + choices, + tracing.extend_trace_label(trace_label, "choices"), + columns=[None, trace_choice_name], + ) + state.tracing.trace_df( + rands, + tracing.extend_trace_label(trace_label, "rands"), + columns=[None, "rand"], + ) + if want_logsums: state.tracing.trace_df( - rands, - tracing.extend_trace_label(trace_label, "rands"), - columns=[None, "rand"], + logsums, + tracing.extend_trace_label(trace_label, "logsum"), + columns=[None, "logsum"], ) - if want_logsums: - state.tracing.trace_df( - logsums, - tracing.extend_trace_label(trace_label, "logsum"), - columns=[None, "logsum"], - ) - if want_logsums: - choices = choices.to_frame("choice") - choices["logsum"] = logsums + if want_logsums: + choices = choices.to_frame("choice") + choices["logsum"] = logsums - chunk_sizer.log_df(trace_label, "choices", choices) + chunk_sizer.log_df(trace_label, "choices", choices) - # handing this off to our caller - chunk_sizer.log_df(trace_label, "choices", None) + # handing this off to our caller + chunk_sizer.log_df(trace_label, "choices", None) - return choices + return choices def interaction_sample_simulate( diff --git a/activitysim/core/interaction_simulate.py b/activitysim/core/interaction_simulate.py index 0d8424139..ef6bbb121 100644 --- a/activitysim/core/interaction_simulate.py +++ b/activitysim/core/interaction_simulate.py @@ -15,8 +15,8 @@ from activitysim.core import chunk, logit, simulate, timing, tracing, util, workflow from activitysim.core.configuration.base import ComputeSettings -from activitysim.core.fast_eval import fast_eval from activitysim.core.exceptions import SegmentedSpecificationError +from activitysim.core.fast_eval import fast_eval logger = logging.getLogger(__name__) @@ -904,29 +904,42 @@ def _interaction_simulate( state.tracing.dump_df(DUMP, utilities, trace_label, "utilities") - # convert to probabilities (utilities exponentiated and normalized to probs) - # probs is same shape as utilities, one row per chooser and one column for alternative - probs = logit.utils_to_probs( - state, utilities, trace_label=trace_label, trace_choosers=choosers - ) - chunk_sizer.log_df(trace_label, "probs", probs) + if state.settings.use_explicit_error_terms: + utilities = logit.validate_utils( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + positions, rands = logit.make_choices_utility_based( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if have_trace_targets: - state.tracing.trace_df( - probs, - tracing.extend_trace_label(trace_label, "probs"), - column_labels=["alternative", "probability"], + else: + # convert to probabilities (utilities exponentiated and normalized to probs) + # probs is same shape as utilities, one row per chooser and one column for alternative + probs = logit.utils_to_probs( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) + + if have_trace_targets: + state.tracing.trace_df( + probs, + tracing.extend_trace_label(trace_label, "probs"), + column_labels=["alternative", "probability"], + ) + + # make choices + # positions is series with the chosen alternative represented as a column index in probs + # which is an integer between zero and num alternatives in the alternative sample + positions, rands = logit.make_choices( + state, probs, trace_label=trace_label, trace_choosers=choosers ) - # make choices - # positions is series with the chosen alternative represented as a column index in probs - # which is an integer between zero and num alternatives in the alternative sample - positions, rands = logit.make_choices( - state, probs, trace_label=trace_label, trace_choosers=choosers - ) chunk_sizer.log_df(trace_label, "positions", positions) chunk_sizer.log_df(trace_label, "rands", rands) diff --git a/activitysim/core/logit.py b/activitysim/core/logit.py index 105e18fec..816e1d0d2 100644 --- a/activitysim/core/logit.py +++ b/activitysim/core/logit.py @@ -22,6 +22,11 @@ EXP_UTIL_MIN = 1e-300 EXP_UTIL_MAX = np.inf +# TODO-EET: Figure out what type we want UTIL_MIN to be, currently np.float64 +UTIL_MIN = np.log(EXP_UTIL_MIN, dtype=np.float64) +UTIL_UNAVAILABLE = 1000.0 * (UTIL_MIN - 1.0) + + PROB_MIN = 0.0 PROB_MAX = 1.0 @@ -128,6 +133,70 @@ def utils_to_logsums(utils, exponentiated=False, allow_zero_probs=False): return logsums +def validate_utils( + state: workflow.State, + utils, + trace_label=None, + allow_zero_probs=False, + trace_choosers=None, +): + """ + Validate utilities to ensure non-available choices are treated the same in EET and MC. + For EET decisions, no conversion to probabilities is required because choices + are made on the basis of comparing utilities (only differences matter). + However, large negative utility values are used in practice to make choices + unavailable based on probability calculations, which boils down to evaluating + exp(utility). We here use this to define a minimum utility that corresponds + to an unavailable choice. + + Parameters + ---------- + utils : pandas.DataFrame + Rows should be choosers and columns should be alternatives. + + trace_label : str, optional + label for tracing bad utility or probability values + + allow_zero_probs : bool + if True value rows in which all utility alts are UTIL_MIN will be set to + UTIL_UNAVAILABLE. + + trace_choosers : pandas.dataframe + the choosers df (for interaction_simulate) to facilitate the reporting of hh_id + by report_bad_choices because it can't deduce hh_id from the interaction_dataset + which is indexed on index values from alternatives df + + Returns + ------- + utils : pandas.DataFrame + utils with values that would lead to zero probability replaced by UTIL_UNAVAILABLE + + """ + trace_label = tracing.extend_trace_label(trace_label, "validate_utils") + + utils_arr = utils.values + + np.putmask(utils_arr, utils_arr <= UTIL_MIN, UTIL_UNAVAILABLE) + + arr_sum = utils_arr.sum(axis=1) + + if not allow_zero_probs: + zero_probs = arr_sum <= utils_arr.shape[1] * UTIL_UNAVAILABLE + if zero_probs.any(): + report_bad_choices( + state, + zero_probs, + utils, + trace_label=tracing.extend_trace_label(trace_label, "zero_prob_utils"), + msg="all probabilities are zero", + trace_choosers=trace_choosers, + ) + + utils = pd.DataFrame(utils_arr, columns=utils.columns, index=utils.index) + + return utils + + def utils_to_probs( state: workflow.State, utils, @@ -275,6 +344,106 @@ def utils_to_probs( return probs +# TODO-EET: add doc string, tracing +def add_ev1_random(state: workflow.State, df: pd.DataFrame): + nest_utils_for_choice = df.copy() + nest_utils_for_choice += state.get_rn_generator().gumbel_for_df( + nest_utils_for_choice, n=nest_utils_for_choice.shape[1] + ) + return nest_utils_for_choice + + +def choose_from_tree( + nest_utils, all_alternatives, logit_nest_groups, nest_alternatives_by_name +): + for level, nest_names in logit_nest_groups.items(): + if level == 1: + next_level_alts = nest_alternatives_by_name[nest_names[0]] + continue + choice_this_level = nest_utils[nest_utils.index.isin(next_level_alts)].idxmax() + if choice_this_level in all_alternatives: + return choice_this_level + next_level_alts = nest_alternatives_by_name[choice_this_level] + raise ValueError("This should never happen - no alternative found") + + +# TODO-EET: add doc string, tracing +def make_choices_explicit_error_term_nl( + state, nested_utilities, alt_order_array, nest_spec, trace_label +): + """walk down the nesting tree and make choice at each level, which is the root of the next level choice.""" + nest_utils_for_choice = add_ev1_random(state, nested_utilities) + + all_alternatives = set(nest.name for nest in each_nest(nest_spec, type="leaf")) + logit_nest_groups = group_nest_names_by_level(nest_spec) + nest_alternatives_by_name = {n.name: n.alternatives for n in each_nest(nest_spec)} + + # Apply is slow. It could *maybe* be sped up by using the fact that the nesting structure is the same for all rows: + # Add ev1(0,1) to all entries (as is currently being done). Then, at each level, pick the maximum of the available + # composite alternatives and set the corresponding entry to 1 for each row, set all other alternatives at this level + # to zero. Once the tree is walked (all alternatives have been processed), take the product of the alternatives in + # each leaf's alternative list. Then pick the only alternative with entry 1, all others must be 0. + choices = nest_utils_for_choice.apply( + lambda x: choose_from_tree( + x, all_alternatives, logit_nest_groups, nest_alternatives_by_name + ), + axis=1, + ) + # TODO-EET: reporting like for zero probs + assert not choices.isnull().any(), f"No choice for {trace_label}" + choices = pd.Series(choices, index=nest_utils_for_choice.index) + + # In order for choice indexing to be consistent with MNL and cumsum MC choices, we need to index in the order + # alternatives were originally created before adding nest nodes that are not elemental alternatives + choices = choices.map({v: k for k, v in enumerate(alt_order_array)}) + + return choices + + +# TODO-EET: add doc string, tracing +def make_choices_explicit_error_term_mnl(state, utilities, trace_label): + utilities_incl_unobs = add_ev1_random(state, utilities) + choices = np.argmax(utilities_incl_unobs.to_numpy(), axis=1) + # TODO-EET: reporting like for zero probs + assert not np.isnan(choices).any(), f"No choice for {trace_label}" + choices = pd.Series(choices, index=utilities_incl_unobs.index) + return choices + + +def make_choices_explicit_error_term( + state, utilities, alt_order_array, nest_spec=None, trace_label=None +): + trace_label = tracing.extend_trace_label(trace_label, "make_choices_eet") + if nest_spec is None: + choices = make_choices_explicit_error_term_mnl(state, utilities, trace_label) + else: + choices = make_choices_explicit_error_term_nl( + state, utilities, alt_order_array, nest_spec, trace_label + ) + return choices + + +def make_choices_utility_based( + state: workflow.State, + utilities: pd.DataFrame, + name_mapping=None, + nest_spec=None, + trace_label: str | None = None, + trace_choosers=None, + allow_bad_probs=False, +) -> tuple[pd.Series, pd.Series]: + trace_label = tracing.extend_trace_label(trace_label, "make_choices_utility_based") + + # TODO-EET: index of choices for nested utilities is different than unnested - this needs to be consistent for + # turning indexes into alternative names to keep code changes to minimum for now + choices = make_choices_explicit_error_term( + state, utilities, name_mapping, nest_spec, trace_label + ) + # TODO-EET: rands - log all zeros for now + rands = pd.Series(np.zeros_like(utilities.index.values), index=utilities.index) + return choices, rands + + def make_choices( state: workflow.State, probs: pd.DataFrame, @@ -284,28 +453,23 @@ def make_choices( ) -> tuple[pd.Series, pd.Series]: """ Make choices for each chooser from among a set of alternatives. - Parameters ---------- probs : pandas.DataFrame Rows for choosers and columns for the alternatives from which they are choosing. Values are expected to be valid probabilities across each row, e.g. they should sum to 1. - trace_choosers : pandas.dataframe the choosers df (for interaction_simulate) to facilitate the reporting of hh_id by report_bad_choices because it can't deduce hh_id from the interaction_dataset which is indexed on index values from alternatives df - Returns ------- choices : pandas.Series Maps chooser IDs (from `probs` index) to a choice, where the choice is an index into the columns of `probs`. - rands : pandas.Series The random numbers used to make the choices (for debugging, tracing) - """ trace_label = tracing.extend_trace_label(trace_label, "make_choices") @@ -546,6 +710,7 @@ def _each_nest(spec: LogitNestSpec, parent_nest, post_order): nest.level = parent_nest.level + 1 nest.product_of_coefficients = parent_nest.product_of_coefficients nest.ancestors = parent_nest.ancestors + [name] + nest.coefficient = parent_nest.coefficient yield spec, nest @@ -602,3 +767,12 @@ def count_each_nest(spec, count): return 1 return count_each_nest(nest_spec, 0) if nest_spec is not None else 0 + + +def group_nest_names_by_level(nest_spec): + # group nests by level, returns {level: [nest.name at that level]} + depth = np.max([x.level for x in each_nest(nest_spec)]) + nest_levels = {x: [] for x in range(1, depth + 1)} + for n in each_nest(nest_spec): + nest_levels[n.level].append(n.name) + return nest_levels diff --git a/activitysim/core/pathbuilder.py b/activitysim/core/pathbuilder.py index 31393ceea..da241addc 100644 --- a/activitysim/core/pathbuilder.py +++ b/activitysim/core/pathbuilder.py @@ -991,16 +991,16 @@ def build_virtual_path( utilities_df.index = orig.index with memo("#TVPB build_virtual_path make_choices"): - probs = logit.utils_to_probs( - self.network_los.state, - utilities_df, - allow_zero_probs=True, - trace_label=trace_label, - overflow_protection=False, - ) - chunk_sizer.log_df(trace_label, "probs", probs) - if trace: + probs = logit.utils_to_probs( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + overflow_protection=False, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + choices = override_choices utilities_df["choices"] = choices @@ -1008,21 +1008,44 @@ def build_virtual_path( probs["choices"] = choices self.trace_df(probs, trace_label, "probs") + del probs + chunk_sizer.log_df(trace_label, "probs", None) else: - choices, rands = logit.make_choices( - self.network_los.state, - probs, - allow_bad_probs=True, - trace_label=trace_label, - ) + if self.network_los.state.settings.use_explicit_error_terms: + utilities_df = logit.validate_utils( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + ) + choices, rands = logit.make_choices_utility_based( + self.network_los.state, + utilities_df, + trace_label=trace_label, + ) + else: + probs = logit.utils_to_probs( + self.network_los.state, + utilities_df, + allow_zero_probs=True, + trace_label=trace_label, + overflow_protection=False, + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + choices, rands = logit.make_choices( + self.network_los.state, + probs, + allow_bad_probs=True, + trace_label=trace_label, + ) + del probs + chunk_sizer.log_df(trace_label, "probs", None) chunk_sizer.log_df(trace_label, "rands", rands) del rands chunk_sizer.log_df(trace_label, "rands", None) - del probs - chunk_sizer.log_df(trace_label, "probs", None) - # we need to get path_set, btap, atap from path_df row with same seq and path_num # drop seq join column, but keep path_num of choice to override_choices when tracing columns_to_cache = ["btap", "atap", "path_set", "path_num"] diff --git a/activitysim/core/random.py b/activitysim/core/random.py index 37b197640..79e6f7e7a 100644 --- a/activitysim/core/random.py +++ b/activitysim/core/random.py @@ -9,8 +9,8 @@ import numpy as np import pandas as pd -from activitysim.core.util import reindex from activitysim.core.exceptions import DuplicateLoadableObjectError, TableIndexError +from activitysim.core.util import reindex from .tracing import print_elapsed_time @@ -194,6 +194,8 @@ def _generators_for_df(self, df): df_row_states = self.row_states.loc[df.index] + # https://numpy.org/doc/stable/reference/random/generator.html + # np.random.default_rng() prng = np.random.RandomState() for row in df_row_states.itertuples(): prng.seed(row.row_seed) @@ -245,6 +247,50 @@ def random_for_df(self, df, step_name, n=1): self.row_states.loc[df.index, "offset"] += n return rands + def gumbel_for_df(self, df, step_name, n=1): + """ + Return n floating point gumbel-distributed numbers for each row in df + using the appropriate random channel for each row. + + Subsequent calls (in the same step) will return the next rand for each df row + + The resulting array will be the same length (and order) as df + This method is designed to support alternative selection from a probability array + + The columns in df are ignored; the index name and values are used to determine + which random number sequence to to use. + + If "true pseudo random" behavior is desired (i.e. NOT repeatable) the set_base_seed + method (q.v.) may be used to globally reseed all random streams. + + Parameters + ---------- + df : pandas.DataFrame + df with index name and values corresponding to a registered channel + + n : int + number of rands desired per df row + + Returns + ------- + rands : 2-D ndarray + array the same length as df, with n floats in range [0, 1) for each df row + """ + + assert self.step_name + assert self.step_name == step_name + + # - reminder: prng must be called when yielded as generated sequence, not serialized + generators = self._generators_for_df(df) + + # rands = np.asanyarray([prng.gumbel(size=n) for prng in generators]) + # this is about 20% faster for large arrays, like for destination choice + rands = np.asanyarray([-np.log(-np.log(prng.rand(n))) for prng in generators]) + + # update offset for rows we handled + self.row_states.loc[df.index, "offset"] += n + return rands + def normal_for_df(self, df, step_name, mu, sigma, lognormal=False, size=None): """ Return a floating point random number in normal (or lognormal) distribution @@ -617,6 +663,42 @@ def random_for_df(self, df, n=1): rands = channel.random_for_df(df, self.step_name, n) return rands + def gumbel_for_df(self, df, n=1): + """ + Return a single floating point gumbel for each row in df + using the appropriate random channel for each row. + + Subsequent calls (in the same step) will return the next rand for each df row + + The resulting array will be the same length (and order) as df + This method is designed to support alternative selection from a probability array + + The columns in df are ignored; the index name and values are used to determine + which random number sequence to to use. + + We assume that we can identify the channel to used based on the name of df.index + This channel should have already been registered by a call to add_channel (q.v.) + + If "true pseudo random" behavior is desired (i.e. NOT repeatable) the set_base_seed + method (q.v.) may be used to globally reseed all random streams. + + Parameters + ---------- + df : pandas.DataFrame + df with index name and values corresponding to a registered channel + + n : int + number of rands desired (default 1) + + Returns + ------- + choices : 1-D ndarray the same length as df + a single float in range [0, 1) for each row in df + """ + channel = self.get_channel_for_df(df) + rands = channel.gumbel_for_df(df, self.step_name, n) + return rands + def normal_for_df(self, df, mu=0, sigma=1, broadcast=False, size=None): """ Return a single floating point normal random number in range (-inf, inf) for each row in df diff --git a/activitysim/core/simulate.py b/activitysim/core/simulate.py index c38e16a85..7796b0a8f 100644 --- a/activitysim/core/simulate.py +++ b/activitysim/core/simulate.py @@ -33,6 +33,7 @@ TemplatedLogitComponentSettings, ) from activitysim.core.estimation import Estimator +from activitysim.core.exceptions import ModelConfigurationError from activitysim.core.fast_eval import fast_eval from activitysim.core.simulate_consts import ( ALT_LOSER_UTIL, @@ -40,7 +41,6 @@ SPEC_EXPRESSION_NAME, SPEC_LABEL_NAME, ) -from activitysim.core.exceptions import ModelConfigurationError logger = logging.getLogger(__name__) @@ -1093,6 +1093,46 @@ def set_skim_wrapper_targets(df, skims, allow_partial_success: bool = True): # ) +def compute_nested_utilities(raw_utilities, nest_spec): + """ + compute nest utilities based on nesting coefficients + + For nest nodes this is the logsum of alternatives adjusted by nesting coefficient + + leaf <- raw_utility / nest_coefficient + nest <- ln(sum of exponentiated raw_utility of leaves) * nest_coefficient) + + Parameters + ---------- + raw_utilities : pandas.DataFrame + dataframe with the raw alternative utilities of all leaves + (what in non-nested logit would be the utilities of all the alternatives) + nest_spec : dict + Nest tree dict from the model spec yaml file + + Returns + ------- + nested_utilities : pandas.DataFrame + Will have the index of `raw_utilities` and columns for leaf and node utilities + """ + nested_utilities = pd.DataFrame(index=raw_utilities.index) + + for nest in logit.each_nest(nest_spec, post_order=True): + name = nest.name + if nest.is_leaf: + nested_utilities[name] = ( + raw_utilities[name].astype(float) / nest.product_of_coefficients + ) + else: + # the alternative nested_utilities will already have been computed due to post_order + with np.errstate(divide="ignore"): + nested_utilities[name] = nest.coefficient * np.log( + np.exp(nested_utilities[nest.alternatives]).sum(axis=1) + ) + + return nested_utilities + + def compute_nested_exp_utilities(raw_utilities, nest_spec): """ compute exponentiated nest utilities based on nesting coefficients @@ -1224,7 +1264,7 @@ def eval_mnl( choosers, spec, locals_d, - custom_chooser: CustomChooser_T, + custom_chooser: CustomChooser_T | None, estimator, log_alt_losers=False, want_logsums=False, @@ -1308,29 +1348,47 @@ def eval_mnl( column_labels=["alternative", "utility"], ) - probs = logit.utils_to_probs( - state, utilities, trace_label=trace_label, trace_choosers=choosers - ) - chunk_sizer.log_df(trace_label, "probs", probs) + if state.settings.use_explicit_error_terms: + utilities = logit.validate_utils( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) - del utilities - chunk_sizer.log_df(trace_label, "utilities", None) + if custom_chooser: + choices, rands = custom_chooser( + state, utilities, choosers, spec, trace_label + ) + else: + choices, rands = logit.make_choices_utility_based( + state, utilities, trace_label=trace_label + ) - if have_trace_targets: - # report these now in case make_choices throws error on bad_choices - state.tracing.trace_df( - probs, - "%s.probs" % trace_label, - column_labels=["alternative", "probability"], - ) + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - if custom_chooser: - choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) else: - choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + probs = logit.utils_to_probs( + state, utilities, trace_label=trace_label, trace_choosers=choosers + ) + chunk_sizer.log_df(trace_label, "probs", probs) + + del utilities + chunk_sizer.log_df(trace_label, "utilities", None) - del probs - chunk_sizer.log_df(trace_label, "probs", None) + if have_trace_targets: + # report these now in case make_choices throws error on bad_choices + state.tracing.trace_df( + probs, + "%s.probs" % trace_label, + column_labels=["alternative", "probability"], + ) + + if custom_chooser: + choices, rands = custom_chooser(state, probs, choosers, spec, trace_label) + else: + choices, rands = logit.make_choices(state, probs, trace_label=trace_label) + + del probs + chunk_sizer.log_df(trace_label, "probs", None) if have_trace_targets: state.tracing.trace_df( @@ -1431,87 +1489,140 @@ def eval_nl( column_labels=["alternative", "utility"], ) - # exponentiated utilities of leaves and nests - nested_exp_utilities = compute_nested_exp_utilities(raw_utilities, nest_spec) - chunk_sizer.log_df(trace_label, "nested_exp_utilities", nested_exp_utilities) + if state.settings.use_explicit_error_terms: + # TODO-EET: Nested utility zero choice probability + raw_utilities = logit.validate_utils( + state, raw_utilities, allow_zero_probs=True, trace_label=trace_label + ) - del raw_utilities - chunk_sizer.log_df(trace_label, "raw_utilities", None) + # utilities of leaves and nests + nested_utilities = compute_nested_utilities(raw_utilities, nest_spec) + chunk_sizer.log_df(trace_label, "nested_utilities", nested_utilities) - if have_trace_targets: - state.tracing.trace_df( - nested_exp_utilities, - "%s.nested_exp_utilities" % trace_label, - column_labels=["alternative", "utility"], - ) + # TODO-EET: use nested_utiltites directly to compute logsums? + if want_logsums: + # logsum of nest root + # exponentiated utilities of leaves and nests + nested_exp_utilities = compute_nested_exp_utilities( + raw_utilities, nest_spec + ) + chunk_sizer.log_df( + trace_label, "nested_exp_utilities", nested_exp_utilities + ) + logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) + chunk_sizer.log_df(trace_label, "logsums", logsums) + + # TODO-EET: index of choices for nested utilities is different than unnested - this needs to be consistent for + # turning indexes into alternative names to keep code changes to minimum for now + name_mapping = raw_utilities.columns.values + + del raw_utilities + chunk_sizer.log_df(trace_label, "raw_utilities", None) + + if custom_chooser: + choices, rands = custom_chooser( + state, + utilities=nested_utilities, + name_mapping=name_mapping, + choosers=choosers, + spec=spec, + nest_spec=nest_spec, + trace_label=trace_label, + ) + else: + choices, rands = logit.make_choices_utility_based( + state, + nested_utilities, + name_mapping=name_mapping, + nest_spec=nest_spec, + trace_label=trace_label, + ) - # probabilities of alternatives relative to siblings sharing the same nest - nested_probabilities = compute_nested_probabilities( - state, nested_exp_utilities, nest_spec, trace_label=trace_label - ) - chunk_sizer.log_df(trace_label, "nested_probabilities", nested_probabilities) + del nested_utilities + chunk_sizer.log_df(trace_label, "nested_utilities", None) - if want_logsums: - # logsum of nest root - logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) - chunk_sizer.log_df(trace_label, "logsums", logsums) + else: + # exponentiated utilities of leaves and nests + nested_exp_utilities = compute_nested_exp_utilities(raw_utilities, nest_spec) + chunk_sizer.log_df(trace_label, "nested_exp_utilities", nested_exp_utilities) - del nested_exp_utilities - chunk_sizer.log_df(trace_label, "nested_exp_utilities", None) + del raw_utilities + chunk_sizer.log_df(trace_label, "raw_utilities", None) - if have_trace_targets: - state.tracing.trace_df( - nested_probabilities, - "%s.nested_probabilities" % trace_label, - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + nested_exp_utilities, + "%s.nested_exp_utilities" % trace_label, + column_labels=["alternative", "utility"], + ) + + # probabilities of alternatives relative to siblings sharing the same nest + nested_probabilities = compute_nested_probabilities( + state, nested_exp_utilities, nest_spec, trace_label=trace_label ) + chunk_sizer.log_df(trace_label, "nested_probabilities", nested_probabilities) - # global (flattened) leaf probabilities based on relative nest coefficients (in spec order) - base_probabilities = compute_base_probabilities( - nested_probabilities, nest_spec, spec - ) - chunk_sizer.log_df(trace_label, "base_probabilities", base_probabilities) + if want_logsums: + # logsum of nest root + logsums = pd.Series(np.log(nested_exp_utilities.root), index=choosers.index) + chunk_sizer.log_df(trace_label, "logsums", logsums) - del nested_probabilities - chunk_sizer.log_df(trace_label, "nested_probabilities", None) + del nested_exp_utilities + chunk_sizer.log_df(trace_label, "nested_exp_utilities", None) - if have_trace_targets: - state.tracing.trace_df( - base_probabilities, - "%s.base_probabilities" % trace_label, - column_labels=["alternative", "probability"], + if have_trace_targets: + state.tracing.trace_df( + nested_probabilities, + "%s.nested_probabilities" % trace_label, + column_labels=["alternative", "probability"], + ) + + # global (flattened) leaf probabilities based on relative nest coefficients (in spec order) + base_probabilities = compute_base_probabilities( + nested_probabilities, nest_spec, spec ) + chunk_sizer.log_df(trace_label, "base_probabilities", base_probabilities) - # note base_probabilities could all be zero since we allowed all probs for nests to be zero - # check here to print a clear message but make_choices will raise error if probs don't sum to 1 - BAD_PROB_THRESHOLD = 0.001 - no_choices = (base_probabilities.sum(axis=1) - 1).abs() > BAD_PROB_THRESHOLD + del nested_probabilities + chunk_sizer.log_df(trace_label, "nested_probabilities", None) - if no_choices.any(): - logit.report_bad_choices( - state, - no_choices, - base_probabilities, - trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), - trace_choosers=choosers, - msg="base_probabilities do not sum to one", - ) + if have_trace_targets: + state.tracing.trace_df( + base_probabilities, + "%s.base_probabilities" % trace_label, + column_labels=["alternative", "probability"], + ) - if custom_chooser: - choices, rands = custom_chooser( - state, - base_probabilities, - choosers, - spec, - trace_label, - ) - else: - choices, rands = logit.make_choices( - state, base_probabilities, trace_label=trace_label - ) + # note base_probabilities could all be zero since we allowed all probs for nests to be zero + # check here to print a clear message but make_choices will raise error if probs don't sum to 1 + BAD_PROB_THRESHOLD = 0.001 + no_choices = (base_probabilities.sum(axis=1) - 1).abs() > BAD_PROB_THRESHOLD + + if no_choices.any(): + logit.report_bad_choices( + state, + no_choices, + base_probabilities, + trace_label=tracing.extend_trace_label(trace_label, "bad_probs"), + trace_choosers=choosers, + msg="base_probabilities do not sum to one", + ) + + if custom_chooser: + choices, rands = custom_chooser( + state, + base_probabilities, + choosers, + spec, + trace_label, + ) + else: + choices, rands = logit.make_choices( + state, base_probabilities, trace_label=trace_label + ) - del base_probabilities - chunk_sizer.log_df(trace_label, "base_probabilities", None) + del base_probabilities + chunk_sizer.log_df(trace_label, "base_probabilities", None) if have_trace_targets: state.tracing.trace_df( diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index e249475de..c82606981 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -70,6 +70,9 @@ def utilities(choosers, spec, test_data): ) +# TODO-EET: Add tests here! + + def test_utils_to_probs(utilities, test_data): state = workflow.State().default_settings() probs = logit.utils_to_probs(state, utilities, trace_label=None)