Skip to content

Fix memory usage #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion activitysim/abm/models/parking_location_choice.py
Original file line number Diff line number Diff line change
@@ -318,7 +318,7 @@ def parking_location(
if "trip_period" not in trips_merged_df:
# TODO: resolve this to the skim time period index not the label, it will be faster
trips_merged_df["trip_period"] = network_los.skim_time_period_label(
trips_merged_df[proposed_trip_departure_period]
trips_merged_df[proposed_trip_departure_period], as_cat=True
)
model_settings["TRIP_DEPARTURE_PERIOD"] = "trip_period"

2 changes: 1 addition & 1 deletion activitysim/abm/models/trip_mode_choice.py
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ def trip_mode_choice(
# setup skim keys
assert "trip_period" not in trips_merged
trips_merged["trip_period"] = network_los.skim_time_period_label(
trips_merged.depart
trips_merged.depart, as_cat=True
)

orig_col = "origin"
16 changes: 10 additions & 6 deletions activitysim/abm/models/util/logsums.py
Original file line number Diff line number Diff line change
@@ -75,10 +75,10 @@ def compute_logsums(
# FIXME - are we ok with altering choosers (so caller doesn't have to set these)?
if (in_period_col is not None) and (out_period_col is not None):
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_period_col]
choosers[in_period_col], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_period_col]
choosers[out_period_col], as_cat=True
)
elif ("in_period" not in choosers.columns) and (
"out_period" not in choosers.columns
@@ -92,17 +92,21 @@ def compute_logsums(
and tour_purpose in model_settings["OUT_PERIOD"]
):
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"][tour_purpose]
model_settings["IN_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"][tour_purpose]
model_settings["OUT_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
else:
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"]
model_settings["IN_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"]
model_settings["OUT_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
else:
logger.error("Choosers table already has columns 'in_period' and 'out_period'.")
8 changes: 6 additions & 2 deletions activitysim/abm/models/util/mode.py
Original file line number Diff line number Diff line change
@@ -131,8 +131,12 @@ def run_tour_mode_choice_simulate(
assert ("in_period" not in choosers) and ("out_period" not in choosers)
in_time = skims["in_time_col_name"]
out_time = skims["out_time_col_name"]
choosers["in_period"] = network_los.skim_time_period_label(choosers[in_time])
choosers["out_period"] = network_los.skim_time_period_label(choosers[out_time])
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_time], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_time], as_cat=True
)

expressions.annotate_preprocessors(
state, choosers, locals_dict, skims, model_settings, trace_label
50 changes: 34 additions & 16 deletions activitysim/abm/models/util/vectorize_tour_scheduling.py
Original file line number Diff line number Diff line change
@@ -185,6 +185,12 @@ def dedupe_alt_tdd(state: workflow.State, alt_tdd, tour_purpose, trace_label):

logger.info("tdd_alt_segments specified for representative logsums")

if tdd_segments is not None:
# apply categorical dtypes
tdd_segments["time_period"] = tdd_segments["time_period"].astype(
alt_tdd["out_period"].dtype
)

with chunk.chunk_log(
state, tracing.extend_trace_label(trace_label, "dedupe_alt_tdd")
) as chunk_sizer:
@@ -328,11 +334,12 @@ def compute_tour_scheduling_logsums(
assert "out_period" not in alt_tdd
assert "in_period" not in alt_tdd

# FIXME:MEMORY
# These two lines each generate a massive array of strings,
# using a bunch of RAM and slowing things down.
alt_tdd["out_period"] = network_los.skim_time_period_label(alt_tdd["start"])
alt_tdd["in_period"] = network_los.skim_time_period_label(alt_tdd["end"])
alt_tdd["out_period"] = network_los.skim_time_period_label(
alt_tdd["start"], as_cat=True
)
alt_tdd["in_period"] = network_los.skim_time_period_label(
alt_tdd["end"], as_cat=True
)

alt_tdd["duration"] = alt_tdd["end"] - alt_tdd["start"]

@@ -383,17 +390,28 @@ def compute_tour_scheduling_logsums(

# tracing.log_runtime(model_name=trace_label, start_time=t0)

# redupe - join the alt_tdd_period logsums to alt_tdd to get logsums for alt_tdd
logsums = (
pd.merge(
alt_tdd.reset_index(),
deduped_alt_tdds.reset_index(),
on=[index_name] + redupe_columns,
how="left",
)
.set_index(index_name)
.logsums
)
logsums = pd.Series(data=0, index=alt_tdd.index, dtype=np.float64)
left_on = [alt_tdd.index]
right_on = [deduped_alt_tdds.index]
for i in redupe_columns:
if (
alt_tdd[i].dtype == "category"
and alt_tdd[i].dtype.ordered
and alt_tdd[i].dtype == deduped_alt_tdds[i].dtype
):
left_on += [alt_tdd[i].cat.codes]
right_on += [deduped_alt_tdds[i].cat.codes]
else:
left_on += [alt_tdd[i].to_numpy()]
right_on += [deduped_alt_tdds[i].to_numpy()]

logsums.iloc[:] = pd.merge(
pd.DataFrame(index=alt_tdd.index),
deduped_alt_tdds.logsums,
left_on=left_on,
right_on=right_on,
how="left",
).logsums.to_numpy()
chunk_sizer.log_df(trace_label, "logsums", logsums)

del deduped_alt_tdds
12 changes: 8 additions & 4 deletions activitysim/abm/tables/landuse.py
Original file line number Diff line number Diff line change
@@ -23,12 +23,16 @@ def land_use(state: workflow.State):

sharrow_enabled = state.settings.sharrow
if sharrow_enabled:
err_msg = (
"a zero-based land_use index is required for sharrow,\n"
"try adding `recode_pipeline_columns: true` to your settings file."
)
# when using sharrow, the land use file must be organized (either in raw
# form or via recoding) so that the index is zero-based and contiguous
assert df.index.is_monotonic_increasing
assert df.index[0] == 0
assert df.index[-1] == len(df.index) - 1
assert df.index.dtype.kind == "i"
assert df.index.is_monotonic_increasing, err_msg
assert df.index[0] == 0, err_msg
assert df.index[-1] == len(df.index) - 1, err_msg
assert df.index.dtype.kind == "i", err_msg

# try to make life easy for everybody by keeping everything in canonical order
# but as long as coalesce_pipeline doesn't sort tables it coalesces, it might not stay in order
36 changes: 29 additions & 7 deletions activitysim/core/flow.py
Original file line number Diff line number Diff line change
@@ -267,6 +267,7 @@ def skims_mapping(
parking_col_name=None,
zone_layer=None,
primary_origin_col_name=None,
predigitized_time_periods=False,
):
logger.info("loading skims_mapping")
logger.info(f"- orig_col_name: {orig_col_name}")
@@ -337,6 +338,10 @@ def skims_mapping(
),
)
else:
if predigitized_time_periods:
time_rel = "_code ->"
else:
time_rel = " @"
return dict(
# TODO:SHARROW: organize dimensions.
odt_skims=skim_dataset,
@@ -347,16 +352,16 @@ def skims_mapping(
relationships=(
f"df._orig_col_name -> odt_skims.{odim}",
f"df._dest_col_name -> odt_skims.{ddim}",
"df.out_period @ odt_skims.time_period",
f"df.out_period{time_rel} odt_skims.time_period",
f"df._dest_col_name -> dot_skims.{odim}",
f"df._orig_col_name -> dot_skims.{ddim}",
"df.in_period @ dot_skims.time_period",
f"df.in_period{time_rel} dot_skims.time_period",
f"df._orig_col_name -> odr_skims.{odim}",
f"df._dest_col_name -> odr_skims.{ddim}",
"df.in_period @ odr_skims.time_period",
f"df.in_period{time_rel} odr_skims.time_period",
f"df._dest_col_name -> dor_skims.{odim}",
f"df._orig_col_name -> dor_skims.{ddim}",
"df.out_period @ dor_skims.time_period",
f"df.out_period{time_rel} dor_skims.time_period",
f"df._orig_col_name -> od_skims.{odim}",
f"df._dest_col_name -> od_skims.{ddim}",
),
@@ -525,6 +530,15 @@ def new_flow(

cache_dir = state.filesystem.get_sharrow_cache_dir()
logger.debug(f"flow.cache_dir: {cache_dir}")
predigitized_time_periods = False
if "out_period" in choosers and "in_period" in choosers:
if (
choosers["out_period"].dtype == "category"
and choosers["in_period"].dtype == "category"
):
choosers["out_period_code"] = choosers["out_period"].cat.codes
choosers["in_period_code"] = choosers["in_period"].cat.codes
predigitized_time_periods = True
skims_mapping_ = skims_mapping(
state,
orig_col_name,
@@ -534,6 +548,7 @@ def new_flow(
parking_col_name=parking_col_name,
zone_layer=zone_layer,
primary_origin_col_name=primary_origin_col_name,
predigitized_time_periods=predigitized_time_periods,
)
if size_term_mapping is None:
size_term_mapping = {}
@@ -774,6 +789,9 @@ def apply_flow(
it ever again, but having a reference to it available later can be useful
in debugging and tracing. Flows are cached and reused anyway, so it is
generally not important to delete this at any point to free resources.
tree : sharrow.DataTree
The tree data used to compute the flow result. It is seperate from the
flow to prevent it from being cached with the flow.
"""
if sh is None:
return None, None
@@ -800,7 +818,7 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, None
return None, None, None
else:
raise
with logtime(f"{flow.name}.load", trace_label or ""):
@@ -822,7 +840,9 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, flow
tree = flow.tree
flow.tree = None
return None, flow, tree
raise
except Exception as err:
logger.error(f"error in apply_flow: {err!s}")
@@ -833,4 +853,6 @@ def apply_flow(
# Detecting compilation activity when in production mode is a bug
# that should be investigated.
tracing.timing_notes.add(f"compiled:{flow.name}")
return flow_result, flow
tree = flow.tree
flow.tree = None
return flow_result, flow, tree
13 changes: 8 additions & 5 deletions activitysim/core/interaction_simulate.py
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ def replace_in_index_level(mi, level, *repls):

timelogger.mark("sharrow preamble", True, logger, trace_label)

sh_util, sh_flow = apply_flow(
sh_util, sh_flow, sh_tree = apply_flow(
state,
spec_sh,
df,
@@ -187,10 +187,13 @@ def replace_in_index_level(mi, level, *repls):
index=df.index if extra_data is None else None,
)
chunk_sizer.log_df(trace_label, "sh_util", None) # hand off to caller
if sharrow_enabled != "test":
# if not testing sharrow, we are done with this object now.
del sh_util

timelogger.mark("sharrow flow", True, logger, trace_label)
else:
sh_util, sh_flow = None, None
sh_util, sh_flow, sh_tree = None, None, None
timelogger.mark("sharrow flow", False)

if (
@@ -404,9 +407,9 @@ def to_series(x):
if sh_flow is not None and trace_rows is not None and trace_rows.any():
assert type(trace_rows) == np.ndarray
sh_utility_fat = sh_flow.load_dataarray(
# sh_flow.tree.replace_datasets(
# df=df.iloc[trace_rows],
# ),
sh_tree.replace_datasets(
df=df.iloc[trace_rows],
),
dtype=np.float32,
)
sh_utility_fat = sh_utility_fat[trace_rows, :]
24 changes: 21 additions & 3 deletions activitysim/core/los.py
Original file line number Diff line number Diff line change
@@ -845,7 +845,9 @@ def get_tappairs3d(self, otap, dtap, dim3, key):

return s.values

def skim_time_period_label(self, time_period, fillna=None):
def skim_time_period_label(
self, time_period, fillna=None, as_cat=False, broadcast_to=None
):
"""
convert time period times to skim time period labels (e.g. 9 -> 'AM')
@@ -873,6 +875,14 @@ def skim_time_period_label(self, time_period, fillna=None):
assert 0 == model_time_window_min % period_minutes
total_periods = model_time_window_min / period_minutes

try:
time_label_dtype = self.skim_dicts["taz"].time_label_dtype
except (KeyError, AttributeError):
# if the "taz" skim_dict is missing, or if using old SkimDict
# instead of SkimDataset, this labeling shortcut is unavailable.
time_label_dtype = str
as_cat = False

# FIXME - eventually test and use np version always?
if np.isscalar(time_period):
bin = (
@@ -888,6 +898,12 @@ def skim_time_period_label(self, time_period, fillna=None):
result = self.skim_time_periods["labels"].get(bin, default=default)
else:
result = self.skim_time_periods["labels"][bin]
if broadcast_to is not None:
result = pd.Series(
data=result,
index=broadcast_to,
dtype=time_label_dtype if as_cat else str,
)
else:
result = pd.cut(
time_period,
@@ -898,8 +914,10 @@ def skim_time_period_label(self, time_period, fillna=None):
if fillna is not None:
default = self.skim_time_periods["labels"][fillna]
result = result.fillna(default)
result = result.astype(str)

if as_cat:
result = result.astype(time_label_dtype)
else:
result = result.astype(str)
return result

def get_tazs(self, state):
6 changes: 3 additions & 3 deletions activitysim/core/simulate.py
Original file line number Diff line number Diff line change
@@ -536,7 +536,7 @@ def eval_utilities(
locals_dict.update(state.get_global_constants())
if locals_d is not None:
locals_dict.update(locals_d)
sh_util, sh_flow = apply_flow(
sh_util, sh_flow, sh_tree = apply_flow(
state,
spec_sh,
choosers,
@@ -652,7 +652,7 @@ def eval_utilities(
if sh_flow is not None:
try:
data_sh = sh_flow.load(
sh_flow.tree.replace_datasets(
sh_tree.replace_datasets(
df=choosers.iloc[offsets],
),
dtype=np.float32,
@@ -731,7 +731,7 @@ def eval_utilities(
)
print(f"{sh_util.shape=}")
print(misses)
_sh_flow_load = sh_flow.load()
_sh_flow_load = sh_flow.load(sh_tree)
print("possible problematic expressions:")
for expr_n, expr in enumerate(exprs):
closeness = np.isclose(
30 changes: 25 additions & 5 deletions activitysim/core/skim_dataset.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,10 @@ def __init__(self, dataset):
self.time_map = {
j: i for i, j in enumerate(self.dataset.indexes["time_period"])
}
self.time_label_dtype = pd.api.types.CategoricalDtype(
self.dataset.indexes["time_period"],
ordered=True,
)
self.usage = set() # track keys of skims looked up

@property
@@ -184,6 +188,10 @@ def __init__(self, dataset, orig_key, dest_key, time_key=None, *, time_map=None)
}
else:
self.time_map = time_map
self.time_label_dtype = pd.api.types.CategoricalDtype(
self.dataset.indexes["time_period"],
ordered=True,
)

@property
def odim(self):
@@ -246,6 +254,11 @@ def set_df(self, df):
):
logger.info(f"natural use for time_period={self.time_key}")
positions["time_period"] = df[self.time_key]
elif (
df[self.time_key].dtype == "category"
and df[self.time_key].dtype == self.time_label_dtype
):
positions["time_period"] = df[self.time_key].cat.codes
else:
logger.info(f"vectorize lookup for time_period={self.time_key}")
positions["time_period"] = pd.Series(
@@ -257,11 +270,18 @@ def set_df(self, df):
self.positions = {}
for k, v in positions.items():
try:
self.positions[k] = v.astype(int)
except TypeError:
# possibly some missing values that are not relevant,
# fill with zeros to continue.
self.positions[k] = v.fillna(0).astype(int)
is_int = np.issubdtype(v.dtype, np.integer)
except Exception:
is_int = False
if is_int:
self.positions[k] = v
else:
try:
self.positions[k] = v.astype(int)
except TypeError:
# possibly some missing values that are not relevant,
# fill with zeros to continue.
self.positions[k] = v.fillna(0).astype(int)
else:
self.positions = pd.DataFrame(positions).astype(int)

1 change: 1 addition & 0 deletions conda-environments/github-actions-tests.yml
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ dependencies:
- platformdirs = 3.2.*
- psutil = 5.9.*
- pyarrow = 11.*
- pydantic = 1.10.*
- pypyr = 5.8.*
- pytables >= 3.5.1,<3.7 # orca's constraint
- pytest = 7.2.*