Skip to content

Commit

Permalink
add monotonic check and re-interpolate for all dates
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanychu90 committed Feb 21, 2024
1 parent aace0fc commit 090b699
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 68 deletions.
4 changes: 2 additions & 2 deletions _shared_utils/shared_utils/rt_dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@

def get_week(month: Literal["apr2023", "oct2023"], exclude_wed: bool) -> list:
if exclude_wed:
return [v for k, v in DATES.items() if month in k]
else:
return [v for k, v in DATES.items() if month in k and not k.endswith(month)]
else:
return [v for k, v in DATES.items() if month in k]


apr_week = get_week(month="apr2023", exclude_wed=False)
Expand Down
1 change: 0 additions & 1 deletion rt_segment_speeds/scripts/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ speeds_pipeline:
make export

export:
python handle_common_errors.py
python average_speeds.py

download_roads:
Expand Down
13 changes: 9 additions & 4 deletions rt_segment_speeds/scripts/average_speeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

OPERATOR_COLS = [
"schedule_gtfs_dataset_key",
#"name", # from schedule
#"organization_source_record_id", "organization_name", # from dim_organizations
#"base64_url", "caltrans_district",
]

SHAPE_STOP_COLS = [
Expand Down Expand Up @@ -123,6 +120,9 @@ def concatenate_trip_segment_speeds(
"""
SPEED_FILE = dict_inputs["stage4"]
MAX_SPEED = dict_inputs["max_speed"]
METERS_CUTOFF = dict_inputs["min_meters_elapsed"]
MAX_TRIP_SECONDS = dict_inputs["max_trip_minutes"] * 60
MIN_TRIP_SECONDS = dict_inputs["min_trip_minutes"] * 60

df = pd.concat([
pd.read_parquet(
Expand All @@ -132,7 +132,12 @@ def concatenate_trip_segment_speeds(
"trip_instance_key", "speed_mph",
"meters_elapsed", "sec_elapsed",
"time_of_day"]),
filters = [[("speed_mph", "<=", MAX_SPEED)]]
filters = [[
("speed_mph", "<=", MAX_SPEED),
("meters_elapsed", ">=", METERS_CUTOFF),
("sec_elapsed", ">=", MIN_TRIP_SECONDS),
("sec_elapsed", "<=", MAX_TRIP_SECONDS)
]]
).assign(
service_date = pd.to_datetime(analysis_date)
) for analysis_date in analysis_date_list],
Expand Down
5 changes: 3 additions & 2 deletions rt_segment_speeds/scripts/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ stop_segments:
route_dir_multi_summary: "rollup_multiday/speeds_route_dir"
segments_file: "segment_options/shape_stop_segments"
timestamp_col: "location_timestamp_local"
time_min_cutoff: 10
min_trip_minutes: 10
max_trip_minutes: 180
max_speed: 80
min_meters_elapsed: 1609
rt_stop_times:
stage1: "vp_usable"
stage2: "nearest/nearest_vp_rt_stop_times"
Expand All @@ -28,4 +30,3 @@ road_segments:
segments_file: "road_segments"
segment_identifier_cols: ["linearid", "mtfcc", "segment_sequence"]
timestamp_col: "location_timestamp_local"
time_min_cutoff: 10
127 changes: 112 additions & 15 deletions rt_segment_speeds/scripts/interpolate_stop_arrival.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from loguru import logger

from segment_speed_utils import helpers
from segment_speed_utils import array_utils, helpers, segment_calcs, wrangle_shapes
from segment_speed_utils.project_vars import (SEGMENT_GCS, PROJECT_CRS,
CONFIG_PATH
)
Expand All @@ -30,15 +30,105 @@ def project_points_onto_shape(
stop_meters = shape_geometry.project(stop_geometry)

points = [shapely.Point(p) for p in vp_coords_trio.coords]
xp = np.asarray([vp_coords_trio.project(p) for p in points])
projected_points = np.asarray([vp_coords_trio.project(p) for p in points])

yp = timestamp_arr.astype("datetime64[s]").astype("float64")

interpolated_arrival = np.interp(stop_position, xp, yp)
interpolated_arrival = wrangle_shapes.interpolate_stop_arrival_time(
stop_position, projected_points, timestamp_arr)

return stop_meters, interpolated_arrival


def stop_and_arrival_time_arrays_by_trip(
df: pd.DataFrame,
) -> pd.DataFrame:
"""
For stops that violated the monotonically increasing condition,
set those arrival_times to NaT again.
Now, look across stops and interpolate again, using stop_meters.
"""
# Add columns with the trip's stop_meters and arrival_times
# for only correctly interpolated values
df_arrays = (df[df.arrival_time.notna()]
.groupby("trip_instance_key")
.agg({
"stop_meters": lambda x: list(x),
"arrival_time": lambda x: list(x)
}).rename(columns = {
"stop_meters": "stop_meters_arr",
"arrival_time": "arrival_time_arr"
}).reset_index()
)

df2 = pd.merge(
df,
df_arrays,
on = "trip_instance_key",
how = "inner"
)

# Use correct values to fill in the missing arrival times
df2 = df2.assign(
arrival_time = df2.apply(
lambda x: wrangle_shapes.interpolate_stop_arrival_time(
x.stop_meters, x.stop_meters_arr, x.arrival_time_arr
), axis=1
)
).drop(columns = ["stop_meters_arr", "arrival_time_arr"])

return df2


def enforce_monotonicity_and_interpolate_across_stops(
df: pd.DataFrame
) -> pd.DataFrame:
"""
Do a check to make sure stop arrivals are all monotonically increasing.
If it fails the check in a window of 3, and the center
position is not increasing, we will interpolate again using
surrounding observations.
"""
df = segment_calcs.convert_timestamp_to_seconds(
df, ["arrival_time"])

df = array_utils.rolling_window_make_array(
df,
window = 3, rolling_col = "arrival_time_sec"
)

# Subset to trips that have at least 1 obs that violates monotonicity
trips_with_one_false = (df.groupby("trip_instance_key")
.agg({"arrival_time_sec_monotonic": "min"})
.reset_index()
.query('arrival_time_sec_monotonic==0')
.trip_instance_key
)

# Set arrival times to NaT if it's not monotonically increasing
mask = df.arrival_time_sec_monotonic == False
df.loc[mask, 'arrival_time'] = np.nan

no_fix = df[~df.trip_instance_key.isin(trips_with_one_false)]
fix1 = df[df.trip_instance_key.isin(trips_with_one_false)]
fix1 = stop_and_arrival_time_arrays_by_trip(fix1)

drop_me = [
"arrival_time_sec",
"rolling_arrival_time_sec", "arrival_time_sec_monotonic"
]

fixed_df = pd.concat(
[no_fix, fix1], axis=0
).drop(
columns = drop_me
).sort_values(
["trip_instance_key", "stop_sequence"]
).reset_index(drop=True)

del no_fix, fix1, df

return fixed_df


def interpolate_stop_arrivals(
analysis_date: str,
dict_inputs: dict
Expand Down Expand Up @@ -96,21 +186,28 @@ def interpolate_stop_arrivals(
results = gdf.assign(
stop_meters = stop_meters_series,
arrival_time = stop_arrival_series,
).astype({"arrival_time": "datetime64[s]"})[
["trip_instance_key", "shape_array_key",
)[["trip_instance_key", "shape_array_key",
"stop_sequence", "stop_id",
"stop_meters",
"arrival_time"
]]

del gdf

"stop_meters", "arrival_time"]
].sort_values(
["trip_instance_key", "stop_sequence"]
).reset_index(drop=True)

time1 = datetime.datetime.now()
logger.info(f"get stop arrivals {analysis_date}: {time1 - start}")

results = enforce_monotonicity_and_interpolate_across_stops(results)

results.to_parquet(
f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}.parquet"
)


del gdf, results

end = datetime.datetime.now()
logger.info(f"get stop arrivals {analysis_date}: {end - start}")
logger.info(f"interpolate arrivals execution time: {analysis_date}: {end - start}")

return


if __name__ == "__main__":
Expand Down
36 changes: 23 additions & 13 deletions rt_segment_speeds/scripts/stop_arrivals_to_speed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def attach_operator_natural_identifiers(
) -> pd.DataFrame:
"""
For each gtfs_dataset_key-shape_array_key combination,
re-attach the natural identifiers and organizational identifiers.
re-attach the natural identifiers.
Return a df with all the identifiers we need during downstream
aggregations, such as by route-direction.
"""
Expand All @@ -29,13 +29,18 @@ def attach_operator_natural_identifiers(
get_pandas = True
)

# Get crosswalk from schedule_gtfs_dataset_key to organization
crosswalk = helpers.import_schedule_gtfs_key_organization_crosswalk(
analysis_date,
).drop(columns = "itp_id")

# Add time-of-day, which is associated with trip_instance_key
time_of_day = gtfs_schedule_wrangling.get_trip_time_buckets(analysis_date)
sched_time_of_day = gtfs_schedule_wrangling.get_trip_time_buckets(
analysis_date
).rename(
columns = {"time_of_day": "schedule_time_of_day"})

# If trip isn't in schedule, use vp to derive
vp_time_of_day = gtfs_schedule_wrangling.rt_time_of_day(
analysis_date
).rename(
columns = {"time_of_day": "vp_time_of_day"})


trip_used_for_shape = pd.read_parquet(
f"{SEGMENT_GCS}segment_options/"
Expand All @@ -60,16 +65,21 @@ def attach_operator_natural_identifiers(
stop_pair,
on = ["shape_array_key", "stop_sequence"]
).merge(
time_of_day,
sched_time_of_day,
on = "trip_instance_key",
how = "inner"
).merge(
crosswalk,
on = "schedule_gtfs_dataset_key",
how = "left"
).merge(
vp_time_of_day,
on = "trip_instance_key",
how = "inner"
)

del crosswalk, shape_identifiers, time_of_day
df_with_natural_ids = df_with_natural_ids.assign(
time_of_day = df_with_natural_ids.schedule_time_of_day.fillna(
df_with_natural_ids.vp_time_of_day)
).drop(columns = ["schedule_time_of_day", "vp_time_of_day"])

del shape_identifiers, time_of_day

return df_with_natural_ids

Expand Down
2 changes: 1 addition & 1 deletion rt_segment_speeds/segment_speed_utils/array_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def rolling_window_make_array(
) -> pd.DataFrame:
# https://stackoverflow.com/questions/47482009/pandas-rolling-window-to-return-an-array
df[f"rolling_{rolling_col}"] = [
np.asarray(window)for window in
np.asarray(window) for window in
df.groupby("trip_instance_key")[rolling_col].rolling(
window = window, center=True)
]
Expand Down
42 changes: 12 additions & 30 deletions rt_segment_speeds/segment_speed_utils/wrangle_shapes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
* https://gis.stackexchange.com/questions/416284/splitting-multiline-or-linestring-into-equal-segments-of-particular-length-using
* https://stackoverflow.com/questions/62053253/how-to-split-a-linestring-to-segments
"""
import dask.array as da
import dask.dataframe as dd
import dask_geopandas as dg
import geopandas as gpd
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -182,32 +179,17 @@ def vp_as_gdf(
return vp_gdf


def project_vp_onto_segment_geometry(
vp: pd.DataFrame,
segment_gdf: gpd.GeoDataFrame,
grouping_cols: list
) -> pd.DataFrame:
def interpolate_stop_arrival_time(
stop_position: float,
shape_meters_arr: np.ndarray,
timestamp_arr: np.ndarray
) -> float:
"""
Project vp onto either shape_geometry or
road_geometry.
Interpolate the arrival time given the stop meters position.
Cast datetimes into floats and cast back as datetime.
"""
segment_gdf = segment_gdf.rename(columns = {"geometry": "line_geometry"})

vp_gdf = vp_as_gdf(vp)

gdf = pd.merge(
vp_gdf,
segment_gdf,
on = grouping_cols,
how = "inner"
)

gdf = gdf.assign(
shape_meters = gdf.line_geometry.project(gdf.geometry)
)

vp_projected_result = gdf[
["vp_idx"] + grouping_cols + ["shape_meters"]
].drop_duplicates()

return vp_projected_result
timestamp_arr = np.asarray(timestamp_arr).astype("datetime64[s]").astype("float64")

return np.interp(
stop_position, np.asarray(shape_meters_arr), timestamp_arr
).astype("datetime64[s]")

0 comments on commit 090b699

Please sign in to comment.