From 090b6993f5d5649ce1dc32a8393389c55bdf3bfc Mon Sep 17 00:00:00 2001 From: tiffanychu90 Date: Wed, 21 Feb 2024 00:53:10 +0000 Subject: [PATCH] add monotonic check and re-interpolate for all dates --- _shared_utils/shared_utils/rt_dates.py | 4 +- rt_segment_speeds/scripts/Makefile | 1 - rt_segment_speeds/scripts/average_speeds.py | 13 +- rt_segment_speeds/scripts/config.yml | 5 +- .../scripts/interpolate_stop_arrival.py | 127 +++++++++++++++--- .../scripts/stop_arrivals_to_speed.py | 36 +++-- .../segment_speed_utils/array_utils.py | 2 +- .../segment_speed_utils/wrangle_shapes.py | 42 ++---- 8 files changed, 162 insertions(+), 68 deletions(-) diff --git a/_shared_utils/shared_utils/rt_dates.py b/_shared_utils/shared_utils/rt_dates.py index 939fd3a6f4..396ce1d4d5 100644 --- a/_shared_utils/shared_utils/rt_dates.py +++ b/_shared_utils/shared_utils/rt_dates.py @@ -62,9 +62,9 @@ def get_week(month: Literal["apr2023", "oct2023"], exclude_wed: bool) -> list: if exclude_wed: - return [v for k, v in DATES.items() if month in k] - else: return [v for k, v in DATES.items() if month in k and not k.endswith(month)] + else: + return [v for k, v in DATES.items() if month in k] apr_week = get_week(month="apr2023", exclude_wed=False) diff --git a/rt_segment_speeds/scripts/Makefile b/rt_segment_speeds/scripts/Makefile index e8e3129862..14827b7df2 100644 --- a/rt_segment_speeds/scripts/Makefile +++ b/rt_segment_speeds/scripts/Makefile @@ -20,7 +20,6 @@ speeds_pipeline: make export export: - python handle_common_errors.py python average_speeds.py download_roads: diff --git a/rt_segment_speeds/scripts/average_speeds.py b/rt_segment_speeds/scripts/average_speeds.py index c85d2cfefe..c08434cf90 100644 --- a/rt_segment_speeds/scripts/average_speeds.py +++ b/rt_segment_speeds/scripts/average_speeds.py @@ -20,9 +20,6 @@ OPERATOR_COLS = [ "schedule_gtfs_dataset_key", - #"name", # from schedule - #"organization_source_record_id", "organization_name", # from dim_organizations - #"base64_url", "caltrans_district", ] SHAPE_STOP_COLS = [ @@ -123,6 +120,9 @@ def concatenate_trip_segment_speeds( """ SPEED_FILE = dict_inputs["stage4"] MAX_SPEED = dict_inputs["max_speed"] + METERS_CUTOFF = dict_inputs["min_meters_elapsed"] + MAX_TRIP_SECONDS = dict_inputs["max_trip_minutes"] * 60 + MIN_TRIP_SECONDS = dict_inputs["min_trip_minutes"] * 60 df = pd.concat([ pd.read_parquet( @@ -132,7 +132,12 @@ def concatenate_trip_segment_speeds( "trip_instance_key", "speed_mph", "meters_elapsed", "sec_elapsed", "time_of_day"]), - filters = [[("speed_mph", "<=", MAX_SPEED)]] + filters = [[ + ("speed_mph", "<=", MAX_SPEED), + ("meters_elapsed", ">=", METERS_CUTOFF), + ("sec_elapsed", ">=", MIN_TRIP_SECONDS), + ("sec_elapsed", "<=", MAX_TRIP_SECONDS) + ]] ).assign( service_date = pd.to_datetime(analysis_date) ) for analysis_date in analysis_date_list], diff --git a/rt_segment_speeds/scripts/config.yml b/rt_segment_speeds/scripts/config.yml index a772fba516..6de1d3a683 100644 --- a/rt_segment_speeds/scripts/config.yml +++ b/rt_segment_speeds/scripts/config.yml @@ -11,8 +11,10 @@ stop_segments: route_dir_multi_summary: "rollup_multiday/speeds_route_dir" segments_file: "segment_options/shape_stop_segments" timestamp_col: "location_timestamp_local" - time_min_cutoff: 10 + min_trip_minutes: 10 + max_trip_minutes: 180 max_speed: 80 + min_meters_elapsed: 1609 rt_stop_times: stage1: "vp_usable" stage2: "nearest/nearest_vp_rt_stop_times" @@ -28,4 +30,3 @@ road_segments: segments_file: "road_segments" segment_identifier_cols: ["linearid", "mtfcc", "segment_sequence"] timestamp_col: "location_timestamp_local" - time_min_cutoff: 10 diff --git a/rt_segment_speeds/scripts/interpolate_stop_arrival.py b/rt_segment_speeds/scripts/interpolate_stop_arrival.py index 06802d5f11..f423cc5d8b 100644 --- a/rt_segment_speeds/scripts/interpolate_stop_arrival.py +++ b/rt_segment_speeds/scripts/interpolate_stop_arrival.py @@ -10,7 +10,7 @@ from loguru import logger -from segment_speed_utils import helpers +from segment_speed_utils import array_utils, helpers, segment_calcs, wrangle_shapes from segment_speed_utils.project_vars import (SEGMENT_GCS, PROJECT_CRS, CONFIG_PATH ) @@ -30,15 +30,105 @@ def project_points_onto_shape( stop_meters = shape_geometry.project(stop_geometry) points = [shapely.Point(p) for p in vp_coords_trio.coords] - xp = np.asarray([vp_coords_trio.project(p) for p in points]) + projected_points = np.asarray([vp_coords_trio.project(p) for p in points]) - yp = timestamp_arr.astype("datetime64[s]").astype("float64") - - interpolated_arrival = np.interp(stop_position, xp, yp) + interpolated_arrival = wrangle_shapes.interpolate_stop_arrival_time( + stop_position, projected_points, timestamp_arr) return stop_meters, interpolated_arrival +def stop_and_arrival_time_arrays_by_trip( + df: pd.DataFrame, +) -> pd.DataFrame: + """ + For stops that violated the monotonically increasing condition, + set those arrival_times to NaT again. + Now, look across stops and interpolate again, using stop_meters. + """ + # Add columns with the trip's stop_meters and arrival_times + # for only correctly interpolated values + df_arrays = (df[df.arrival_time.notna()] + .groupby("trip_instance_key") + .agg({ + "stop_meters": lambda x: list(x), + "arrival_time": lambda x: list(x) + }).rename(columns = { + "stop_meters": "stop_meters_arr", + "arrival_time": "arrival_time_arr" + }).reset_index() + ) + + df2 = pd.merge( + df, + df_arrays, + on = "trip_instance_key", + how = "inner" + ) + + # Use correct values to fill in the missing arrival times + df2 = df2.assign( + arrival_time = df2.apply( + lambda x: wrangle_shapes.interpolate_stop_arrival_time( + x.stop_meters, x.stop_meters_arr, x.arrival_time_arr + ), axis=1 + ) + ).drop(columns = ["stop_meters_arr", "arrival_time_arr"]) + + return df2 + + +def enforce_monotonicity_and_interpolate_across_stops( + df: pd.DataFrame +) -> pd.DataFrame: + """ + Do a check to make sure stop arrivals are all monotonically increasing. + If it fails the check in a window of 3, and the center + position is not increasing, we will interpolate again using + surrounding observations. + """ + df = segment_calcs.convert_timestamp_to_seconds( + df, ["arrival_time"]) + + df = array_utils.rolling_window_make_array( + df, + window = 3, rolling_col = "arrival_time_sec" + ) + + # Subset to trips that have at least 1 obs that violates monotonicity + trips_with_one_false = (df.groupby("trip_instance_key") + .agg({"arrival_time_sec_monotonic": "min"}) + .reset_index() + .query('arrival_time_sec_monotonic==0') + .trip_instance_key + ) + + # Set arrival times to NaT if it's not monotonically increasing + mask = df.arrival_time_sec_monotonic == False + df.loc[mask, 'arrival_time'] = np.nan + + no_fix = df[~df.trip_instance_key.isin(trips_with_one_false)] + fix1 = df[df.trip_instance_key.isin(trips_with_one_false)] + fix1 = stop_and_arrival_time_arrays_by_trip(fix1) + + drop_me = [ + "arrival_time_sec", + "rolling_arrival_time_sec", "arrival_time_sec_monotonic" + ] + + fixed_df = pd.concat( + [no_fix, fix1], axis=0 + ).drop( + columns = drop_me + ).sort_values( + ["trip_instance_key", "stop_sequence"] + ).reset_index(drop=True) + + del no_fix, fix1, df + + return fixed_df + + def interpolate_stop_arrivals( analysis_date: str, dict_inputs: dict @@ -96,21 +186,28 @@ def interpolate_stop_arrivals( results = gdf.assign( stop_meters = stop_meters_series, arrival_time = stop_arrival_series, - ).astype({"arrival_time": "datetime64[s]"})[ - ["trip_instance_key", "shape_array_key", + )[["trip_instance_key", "shape_array_key", "stop_sequence", "stop_id", - "stop_meters", - "arrival_time" - ]] - - del gdf - + "stop_meters", "arrival_time"] + ].sort_values( + ["trip_instance_key", "stop_sequence"] + ).reset_index(drop=True) + + time1 = datetime.datetime.now() + logger.info(f"get stop arrivals {analysis_date}: {time1 - start}") + + results = enforce_monotonicity_and_interpolate_across_stops(results) + results.to_parquet( f"{SEGMENT_GCS}{STOP_ARRIVALS_FILE}.parquet" ) - + + del gdf, results + end = datetime.datetime.now() - logger.info(f"get stop arrivals {analysis_date}: {end - start}") + logger.info(f"interpolate arrivals execution time: {analysis_date}: {end - start}") + + return if __name__ == "__main__": diff --git a/rt_segment_speeds/scripts/stop_arrivals_to_speed.py b/rt_segment_speeds/scripts/stop_arrivals_to_speed.py index 8be7e7a8e0..b1eb069ac7 100644 --- a/rt_segment_speeds/scripts/stop_arrivals_to_speed.py +++ b/rt_segment_speeds/scripts/stop_arrivals_to_speed.py @@ -16,7 +16,7 @@ def attach_operator_natural_identifiers( ) -> pd.DataFrame: """ For each gtfs_dataset_key-shape_array_key combination, - re-attach the natural identifiers and organizational identifiers. + re-attach the natural identifiers. Return a df with all the identifiers we need during downstream aggregations, such as by route-direction. """ @@ -29,13 +29,18 @@ def attach_operator_natural_identifiers( get_pandas = True ) - # Get crosswalk from schedule_gtfs_dataset_key to organization - crosswalk = helpers.import_schedule_gtfs_key_organization_crosswalk( - analysis_date, - ).drop(columns = "itp_id") - # Add time-of-day, which is associated with trip_instance_key - time_of_day = gtfs_schedule_wrangling.get_trip_time_buckets(analysis_date) + sched_time_of_day = gtfs_schedule_wrangling.get_trip_time_buckets( + analysis_date + ).rename( + columns = {"time_of_day": "schedule_time_of_day"}) + + # If trip isn't in schedule, use vp to derive + vp_time_of_day = gtfs_schedule_wrangling.rt_time_of_day( + analysis_date + ).rename( + columns = {"time_of_day": "vp_time_of_day"}) + trip_used_for_shape = pd.read_parquet( f"{SEGMENT_GCS}segment_options/" @@ -60,16 +65,21 @@ def attach_operator_natural_identifiers( stop_pair, on = ["shape_array_key", "stop_sequence"] ).merge( - time_of_day, + sched_time_of_day, on = "trip_instance_key", - how = "inner" - ).merge( - crosswalk, - on = "schedule_gtfs_dataset_key", how = "left" + ).merge( + vp_time_of_day, + on = "trip_instance_key", + how = "inner" ) - del crosswalk, shape_identifiers, time_of_day + df_with_natural_ids = df_with_natural_ids.assign( + time_of_day = df_with_natural_ids.schedule_time_of_day.fillna( + df_with_natural_ids.vp_time_of_day) + ).drop(columns = ["schedule_time_of_day", "vp_time_of_day"]) + + del shape_identifiers, time_of_day return df_with_natural_ids diff --git a/rt_segment_speeds/segment_speed_utils/array_utils.py b/rt_segment_speeds/segment_speed_utils/array_utils.py index 5900f77443..295aae47ea 100644 --- a/rt_segment_speeds/segment_speed_utils/array_utils.py +++ b/rt_segment_speeds/segment_speed_utils/array_utils.py @@ -13,7 +13,7 @@ def rolling_window_make_array( ) -> pd.DataFrame: # https://stackoverflow.com/questions/47482009/pandas-rolling-window-to-return-an-array df[f"rolling_{rolling_col}"] = [ - np.asarray(window)for window in + np.asarray(window) for window in df.groupby("trip_instance_key")[rolling_col].rolling( window = window, center=True) ] diff --git a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py index 374ceff1f7..6becb97036 100644 --- a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py +++ b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py @@ -10,9 +10,6 @@ * https://gis.stackexchange.com/questions/416284/splitting-multiline-or-linestring-into-equal-segments-of-particular-length-using * https://stackoverflow.com/questions/62053253/how-to-split-a-linestring-to-segments """ -import dask.array as da -import dask.dataframe as dd -import dask_geopandas as dg import geopandas as gpd import numpy as np import pandas as pd @@ -182,32 +179,17 @@ def vp_as_gdf( return vp_gdf -def project_vp_onto_segment_geometry( - vp: pd.DataFrame, - segment_gdf: gpd.GeoDataFrame, - grouping_cols: list -) -> pd.DataFrame: +def interpolate_stop_arrival_time( + stop_position: float, + shape_meters_arr: np.ndarray, + timestamp_arr: np.ndarray +) -> float: """ - Project vp onto either shape_geometry or - road_geometry. + Interpolate the arrival time given the stop meters position. + Cast datetimes into floats and cast back as datetime. """ - segment_gdf = segment_gdf.rename(columns = {"geometry": "line_geometry"}) - - vp_gdf = vp_as_gdf(vp) - - gdf = pd.merge( - vp_gdf, - segment_gdf, - on = grouping_cols, - how = "inner" - ) - - gdf = gdf.assign( - shape_meters = gdf.line_geometry.project(gdf.geometry) - ) - - vp_projected_result = gdf[ - ["vp_idx"] + grouping_cols + ["shape_meters"] - ].drop_duplicates() - - return vp_projected_result \ No newline at end of file + timestamp_arr = np.asarray(timestamp_arr).astype("datetime64[s]").astype("float64") + + return np.interp( + stop_position, np.asarray(shape_meters_arr), timestamp_arr + ).astype("datetime64[s]") \ No newline at end of file