diff --git a/_shared_utils/requirements.txt b/_shared_utils/requirements.txt index b5b7faff7..d0f037083 100644 --- a/_shared_utils/requirements.txt +++ b/_shared_utils/requirements.txt @@ -1,4 +1,5 @@ -e . gtfs-segments==0.1.0 pyairtable==2.2.2 -great-tables==0.3.1 +great-tables==0.4.0 +polars==0.20.16 diff --git a/gtfs_digest/extra_cleaning.py b/gtfs_digest/extra_cleaning.py new file mode 100644 index 000000000..b27893639 --- /dev/null +++ b/gtfs_digest/extra_cleaning.py @@ -0,0 +1,28 @@ +# operators that need route names parsed +# probably just keep the long name portion +# also do titlecase? this is finicky, because some are CC (which we don't want titlecase) +operators_only_route_long_name = [ + "Antelope Valley Transit Authority Schedule", + "Bay Area 511 ACE Schedule", + "Bay Area 511 Caltrain Schedule", + "Bay Area 511 Emery Go-Round Schedule", + "Bay Area 511 Petaluma Schedule", + "Beach Cities GMV Schedule", + "Bear Schedule", + "Commerce Schedule", + "Elk Grove Schedule", + "Humboldt Schedule", + "LA DOT Schedule", + "Lawndale Beat GMV Schedule", + "Redding Schedule", + "Redwood Coast Schedule", + "Santa Maria Schedule", + "StanRTA Schedule", + "VCTC GMV Schedule", + "Victor Valley GMV Schedule", + "Visalia Schedule", + "Yolobus Schedule", +] + +# BruinBus Schedule - nothing shows up in route stats +# Why does "StanRTA Schedule" and "Tahoe Transportation District Schedule" appear to have similar route names? \ No newline at end of file diff --git a/gtfs_funnel/logs/download_data.log b/gtfs_funnel/logs/download_data.log index 9c4705a65..a08d08e94 100644 --- a/gtfs_funnel/logs/download_data.log +++ b/gtfs_funnel/logs/download_data.log @@ -318,3 +318,4 @@ 2024-03-14 11:43:06.291 | INFO | __main__:download_one_day:29 - # operators to run: 172 2024-03-14 11:43:06.291 | INFO | __main__:download_one_day:33 - *********** Download st data *********** 2024-03-14 11:44:27.599 | INFO | __main__:download_one_day:56 - execution time: 0:01:22.625555 +2024-03-19 16:50:51.742 | INFO | __main__:download_one_year:35 - execution time: 0:00:43.062868 diff --git a/gtfs_funnel/route_typologies.py b/gtfs_funnel/schedule_stats_by_route_direction.py similarity index 100% rename from gtfs_funnel/route_typologies.py rename to gtfs_funnel/schedule_stats_by_route_direction.py diff --git a/gtfs_funnel/stop_times_with_direction.py b/gtfs_funnel/stop_times_with_direction.py index e88c3a89d..63926ae65 100644 --- a/gtfs_funnel/stop_times_with_direction.py +++ b/gtfs_funnel/stop_times_with_direction.py @@ -22,15 +22,16 @@ def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame: """ stops = helpers.import_scheduled_stops( analysis_date, - columns = ["feed_key", "stop_id", "geometry"], + columns = ["feed_key", "stop_id", "stop_name", "geometry"], crs = PROJECT_CRS, get_pandas = True ) stop_times = helpers.import_scheduled_stop_times( analysis_date, - columns = ["feed_key", "trip_id", "stop_id", "stop_sequence"] - ).compute() + columns = ["feed_key", "trip_id", "stop_id", "stop_sequence"], + get_pandas = True + ) trips = helpers.import_scheduled_trips( analysis_date, @@ -53,7 +54,7 @@ def prep_scheduled_stop_times(analysis_date: str) -> gpd.GeoDataFrame: stops, on = ["feed_key", "stop_id"], how = "inner" - ).drop(columns = ["feed_key", "trip_id"]) + ).drop(columns = ["trip_id"]) st_with_stop = gpd.GeoDataFrame( st_with_stop, geometry = "geometry", crs = PROJECT_CRS) @@ -83,13 +84,15 @@ def get_projected_stop_meters( return gdf -def find_prior_stop( +def find_prior_subseq_stop( stop_times: gpd.GeoDataFrame, trip_stop_cols: list ) -> gpd.GeoDataFrame: """ For trip-stop, find the previous stop (using stop sequence). Attach the previous stop's geometry. + This will determine the direction for the stop (it's from prior stop). + Add in subseq stop information too. """ prior_stop = stop_times[trip_stop_cols].sort_values( trip_stop_cols).reset_index(drop=True) @@ -105,6 +108,9 @@ def find_prior_stop( subseq_stop_id = (prior_stop.groupby("trip_instance_key") .stop_id .shift(-1)), + subseq_stop_name = (prior_stop.groupby("trip_instance_key") + .stop_name + .shift(-1)) ) # Merge in prior stop geom as a separate column so we can @@ -133,9 +139,11 @@ def find_prior_stop( ).astype({ "prior_stop_sequence": "Int64", "subseq_stop_sequence": "Int64" - }).fillna({"subseq_stop_id": ""}) - - + }).fillna({ + "subseq_stop_id": "", + "subseq_stop_name": "" + }) + # Create stop pair with underscores, since stop_id # can contain hyphens stop_times_with_prior_geom = stop_times_with_prior_geom.assign( @@ -143,8 +151,13 @@ def find_prior_stop( lambda x: str(x.stop_id) + "__" + str(x.subseq_stop_id), axis=1, - ) - ).drop(columns = ["subseq_stop_id"]) + ), + stop_pair_name = stop_times_with_prior_geom.apply( + lambda x: + x.stop_name + "__" + x.subseq_stop_name, + axis=1, + ), + ).drop(columns = ["subseq_stop_id", "subseq_stop_name"]) return stop_times_with_prior_geom @@ -167,9 +180,10 @@ def assemble_stop_times_with_direction( scheduled_stop_times = prep_scheduled_stop_times(analysis_date) - trip_stop_cols = ["trip_instance_key", "stop_sequence", "stop_id"] + trip_stop_cols = ["trip_instance_key", "stop_sequence", + "stop_id", "stop_name"] - scheduled_stop_times2 = find_prior_stop( + scheduled_stop_times2 = find_prior_subseq_stop( scheduled_stop_times, trip_stop_cols ) @@ -195,11 +209,6 @@ def assemble_stop_times_with_direction( rt_utils.primary_cardinal_direction)(prior_geom, current_geom) stop_distance = prior_geom.distance(current_geom) - # Create a column with normalized direction vector - # Add this because some bus can travel in southeasterly direction, - # but it's categorized as southbound or eastbound depending - # on whether south or east value is larger. - other_stops_no_geom = other_stops_no_geom.assign( stop_primary_direction = stop_direction, stop_meters = stop_distance, @@ -224,10 +233,7 @@ def assemble_stop_times_with_direction( end = datetime.datetime.now() print(f"execution time: {end - start}") - - del scheduled_stop_times, scheduled_stop_times2 - del other_stops_no_geom, scheduled_stop_times_with_direction, df - + return diff --git a/rt_segment_speeds/logs/cut_road_segments.log b/rt_segment_speeds/logs/cut_road_segments.log index e8a75dda2..fa0ce465a 100644 --- a/rt_segment_speeds/logs/cut_road_segments.log +++ b/rt_segment_speeds/logs/cut_road_segments.log @@ -1,6 +1,5 @@ -2023-12-11 18:49:41.963 | INFO | __main__::178 - execution time: 0:02:12.275031 -2023-12-12 12:38:25.692 | INFO | __main__::282 - cut local roads: 0:13:31.749643 -2023-12-12 12:38:25.740 | INFO | __main__::285 - execution time: 0:13:31.797462 -2023-12-12 12:43:57.178 | INFO | __main__::261 - cut primary/secondary roads: 0:03:51.547346 -2023-12-12 13:01:51.518 | INFO | __main__::61 - concatenate road segments: 0:00:51.244157 -2023-12-12 14:35:58.485 | INFO | __main__::62 - concatenate road segments: 0:00:45.026472 +2024-03-21 16:53:55.732 | INFO | __main__::236 - cut primary/secondary roads: 0:00:25.924757 +2024-03-21 17:03:42.871 | INFO | __main__::255 - cut local roads: 0:09:47.138963 +2024-03-21 17:03:42.871 | INFO | __main__::258 - execution time: 0:10:13.064427 +2024-03-22 14:00:01.019 | INFO | __main__::184 - execution time: 0:05:50.663597 +2024-03-22 14:19:25.092 | INFO | __main__::196 - execution time: 0:08:45.006011 diff --git a/rt_segment_speeds/scripts/cut_road_segments.py b/rt_segment_speeds/scripts/cut_road_segments.py index a5d46ea2c..1c5078221 100644 --- a/rt_segment_speeds/scripts/cut_road_segments.py +++ b/rt_segment_speeds/scripts/cut_road_segments.py @@ -1,30 +1,24 @@ """ Cut road segments. """ -import dask.dataframe as dd -import dask_geopandas as dg +#import dask.dataframe as dd import datetime import geopandas as gpd import pandas as pd import sys -from dask import delayed +from dask import delayed, compute from loguru import logger -from typing import Literal from calitp_data_analysis.sql import to_snakecase -from segment_speed_utils import helpers from segment_speed_utils.project_vars import (SHARED_GCS, PROJECT_CRS, ROAD_SEGMENT_METERS ) from calitp_data_analysis import geography_utils, utils -from shared_utils import rt_utils, geog_utils_to_add +from shared_utils import rt_utils -""" -TIGER -""" -def load_roads(filtering: tuple) -> gpd.GeoDataFrame: +def load_roads(**kwargs) -> gpd.GeoDataFrame: """ Load roads based on what you filter for MTFCC values (road types). Do some basic cleaning/dissolving. @@ -43,8 +37,8 @@ def load_roads(filtering: tuple) -> gpd.GeoDataFrame: # https://stackoverflow.com/questions/56522977/using-predicates-to-filter-rows-from-pyarrow-parquet-parquetdataset df = gpd.read_parquet( f"{SHARED_GCS}all_roads_2020_state06.parquet", - filters = filtering, columns=["LINEARID", "MTFCC", "FULLNAME", "geometry"], + **kwargs, ).to_crs(PROJECT_CRS).pipe(to_snakecase) # If a road has mutliple rows but the same @@ -92,122 +86,45 @@ def load_roads(filtering: tuple) -> gpd.GeoDataFrame: ) return df2 - - -""" -Primary/Secondary Roads -""" -def cut_primary_secondary_roads( - roads: gpd.GeoDataFrame, - segment_length_meters: int -) -> gpd.GeoDataFrame: - """ - Cut all primary and secondary roads. - Add a primary cardinal direction for the segment. - """ - roads_segmented = geography_utils.cut_segments( - roads, - group_cols = ["linearid", "mtfcc", "fullname"], - segment_distance = segment_length_meters, - ).sort_values( - ["linearid", "segment_sequence"] - ).reset_index(drop=True) - roads_segmented2 = add_segment_direction(roads_segmented) - - return roads_segmented2 - -""" -Local Roads (base) -""" def cut_segments( - gdf: gpd.GeoDataFrame, - group_cols: list, - segment_length_meters: int -) -> gpd.GeoDataFrame: - gdf["segment_geometry"] = gdf.apply( - lambda x: - geography_utils.create_segments(x.geometry, int(segment_length_meters)), - axis=1, - ) - - gdf2 = geog_utils_to_add.explode_segments( - gdf, - group_cols, - segment_col = "segment_geometry", - )[group_cols + ["segment_sequence", "geometry"] - ].set_geometry("geometry").set_crs(PROJECT_CRS) - - return gdf2 - - -def cut_local_roads( - roads: gpd.GeoDataFrame, + roads: gpd.GeoDataFrame, group_cols: list, segment_length_meters: int ) -> gpd.GeoDataFrame: - # If roads are less than our segment length - # keep these intact...they are freebies, already segmented for us (1 segment) short_roads = roads[ roads.road_length <= segment_length_meters - ].drop(columns = "road_length").reset_index(drop=True) - - short_roads = short_roads.assign( + ].drop(columns = "road_length").assign( segment_sequence = 0 - ).astype({"segment_sequence": "int16"}) - - long_roads = roads[ + ).astype({"segment_sequence": "int16"}).reset_index(drop=True) + + gdf = roads[ roads.road_length > segment_length_meters ].drop(columns = "road_length").reset_index(drop=True) - - - long_roads = long_roads.repartition(npartitions=50) - time1 = datetime.datetime.now() - - # Concatenate the short roads and the segmented roads - road_dtypes = long_roads[group_cols].dtypes.to_dict() + gdf["segment_geometry"] = gdf.apply( + lambda x: + geography_utils.create_segments(x.geometry, int(segment_length_meters)), + axis=1, + ) - segmented_long_roads = long_roads.map_partitions( - cut_segments, + gdf2 = geography_utils.explode_segments( + gdf, group_cols, - segment_length_meters, - meta = { - **road_dtypes, - "segment_sequence": "int16", - "geometry": "geometry" - }, - align_dataframes = False - ).persist() + segment_col = "segment_geometry", + )[ + group_cols + ["segment_sequence", "geometry"] + ].set_geometry("geometry").set_crs(PROJECT_CRS) - print("first persist") - print(segmented_long_roads.dtypes) - - gdf = dd.multi.concat( - [short_roads, segmented_long_roads], - axis=0 + segmented = pd.concat( + [short_roads, gdf2], axis=0 ).reset_index(drop=True) - gdf = gdf.repartition(npartitions=20) - - gdf_dtypes = gdf.dtypes.to_dict() + segmented = gpd.GeoDataFrame(segmented, geometry="geometry", crs=PROJECT_CRS) - gdf2 = gdf.map_partitions( - add_segment_direction, - meta = { - **gdf_dtypes, - "origin": "geometry", - "destination": "geometry", - "primary_direction": "object" - } - ).persist() - - time2 = datetime.datetime.now() - print(f"map partitions to cut segments and add direction: {time2 - time1}") - - return gdf2 + return segmented def add_segment_direction(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: @@ -217,6 +134,7 @@ def add_segment_direction(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: one running on other side, we need to distinguish between these 2 rows with same linearid-mtfcc-fullname. """ + df = df.dropna(subset="geometry") df = rt_utils.add_origin_destination(df) df = df.assign( @@ -228,8 +146,33 @@ def add_segment_direction(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: ) return df - + +def cut_road_segments( + group_cols: list, + segment_length_meters: int, + road_segment_str: str, + **kwargs +): + roads = delayed(load_roads)(**kwargs) + + road_segments = delayed(cut_segments)( + roads, + group_cols, + segment_length_meters + ).pipe(add_segment_direction) + + road_segments = compute(road_segments)[0] + + utils.geoparquet_gcs_export( + road_segments, + SHARED_GCS, + f"segmented_roads_{road_segment_str}_2020" + ) + + return + + if __name__ == '__main__': LOG_FILE = "../logs/cut_road_segments.log" @@ -242,42 +185,12 @@ def add_segment_direction(df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: road_cols = ["linearid", "mtfcc", "fullname"] - # Always cut all the primary and secondary roads - road_type = "primarysecondary" - road_type_values = ["S1100", "S1200"] - - roads = load_roads(filtering = [("MTFCC", "in", road_type_values)]) - primary_secondary_roads = cut_primary_secondary_roads( - roads, ROAD_SEGMENT_METERS) - - utils.geoparquet_gcs_export( - primary_secondary_roads, - SHARED_GCS, - f"segmented_roads_2020_{road_type}" - ) - - del roads, primary_secondary_roads - time1 = datetime.datetime.now() - logger.info(f"cut primary/secondary roads: {time1 - start}") - - road_type = "local" - road_type_values = ["S1400"] - roads = delayed(load_roads)(filtering = [("MTFCC", "in", road_type_values)]) - - local_gddf = dd.from_delayed(roads).repartition(npartitions=50) - - local_roads = cut_local_roads( - local_gddf, road_cols, ROAD_SEGMENT_METERS - ).compute() - - utils.geoparquet_gcs_export( - local_roads, - SHARED_GCS, - f"segmented_roads_2020_{road_type}" - ) - - time2 = datetime.datetime.now() - logger.info(f"cut local roads: {time2 - time1}") + cut_road_segments(road_cols, ROAD_SEGMENT_METERS, "onekm") + + ''' + TWO_MILES_IN_METERS = 1_609*2 + cut_road_segments(road_cols, TWO_MILES_IN_METERS, "twomile") + ''' end = datetime.datetime.now() logger.info(f"execution time: {end - start}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/nearest_vp_to_road.py b/rt_segment_speeds/scripts/nearest_vp_to_road.py index fe69029e3..b2b85f4b9 100644 --- a/rt_segment_speeds/scripts/nearest_vp_to_road.py +++ b/rt_segment_speeds/scripts/nearest_vp_to_road.py @@ -1,247 +1,273 @@ """ """ -import dask.dataframe as dd import datetime import geopandas as gpd -import numpy as np import pandas as pd +import shapely -from segment_speed_utils import helpers, wrangle_shapes -from segment_speed_utils.project_vars import (SEGMENT_GCS, - CONFIG_PATH, PROJECT_CRS) - - -def filter_long_sjoin_results( - sjoin_long: dd.DataFrame, - sjoin_wide: pd.DataFrame, -) -> dd.DataFrame: - """ - Long results contain all sjoins. - Wide is filtered down to road linearids that have at least - 2 vp_idx for that trip_instance_key on the road. - """ - keep_cols = sjoin_long.columns.tolist() - - df = dd.merge( - sjoin_long, - sjoin_wide, - on = ["trip_instance_key", "linearid", "mtfcc"], - how = "inner" - )[keep_cols].drop_duplicates().reset_index(drop=True) - - return df +from dask import delayed, compute +from segment_speed_utils import helpers, neighbor, segment_calcs, wrangle_shapes +from segment_speed_utils.project_vars import SEGMENT_GCS, SHARED_GCS, PROJECT_CRS +import interpolate_stop_arrival -def merge_vp_to_crosswalk( - analysis_date: str, -) -> dd.DataFrame: - """ - """ - sjoin_long = dd.read_parquet( - f"{SEGMENT_GCS}vp_sjoin/vp_road_segments_{analysis_date}", +road_id_cols = ["linearid", "mtfcc", "primary_direction"] +segment_identifier_cols = road_id_cols + ["segment_sequence"] +segment_identifier_cols2 = ['linearid', 'mtfcc', + 'stop_primary_direction', 'segment_sequence'] +test_shapes = [ + 'e3435a4b882913d92a12563910f7193d', + 'dfae5639b824ed6ad87ed753575f5381', + '626724031caabc3abcf3f183f4c9c718', + 'b63ae7a27ecb677ac93c0373245ea21b', + '85a03eed08934ed37f204e9aae6cbd36', + '20af512ed1ada757a3b49beb36b623ad', + 'a1999da4d09bc81548fe3e2b0fb458b4', + '1e7115aaac1fcb58c6509c7a90d9741c', + '3d437ea82b56e9827d15527ce41c716a', + '40469843deb94fbf61ef5f38bb76a137', + '04353cab33c0b31d8e9575ace6f9f6da', + 'f165d1399f8880fc0011eb75b740ba24', + 'ad15c10f6bc86dd6d3c02bfe6daf7ad9', + '71b06c07dabcecf71ddc5a8f3638ccf1', + '4e2b8555bc9936d711c8748694d67a3e', + '2e00363dba250fae30b7c14596f18907', + '2fd717cc5df1495434b8170e294137a6', + '60057141822cc9d9ac4795a83b2f25f1', + '5c0a268639c22fc58c8c842741cf68e3', + '67bc9cc93630ea8a22783cd59d11d46b', + '81e5d878737a777ffea18b0c08f58118', + 'a91351f71fc4d2c0b457e1701a3ade64', + '67af448db16c21d09812f58cf065aac5', + 'bd66e7d4ffae3bc36888cfddaf5e2e44', + 'b9b803a342d42f72bbe25042f7328385' +] + +def get_shape_road_crosswalk( + analysis_date: str, **kwargs +) -> pd.DataFrame: + + # shapes to road crosswalk + shape_road_crosswalk = pd.read_parquet( + f"{SEGMENT_GCS}roads_staging/" + f"shape_road_crosswalk_{analysis_date}.parquet", + ) + + # link shapes to trip + shape_to_trip = helpers.import_scheduled_trips( + analysis_date, + columns = ["trip_instance_key", "shape_array_key"], + **kwargs + #filters = [[("shape_array_key", "==", one_shape)]] ) - sjoin_wide = pd.read_parquet( - f"{SEGMENT_GCS}vp_sjoin/vp_road_segments_wide_{analysis_date}.parquet", + shape_road_crosswalk = shape_road_crosswalk.merge( + shape_to_trip, + on = "shape_array_key", + how = "inner" ) - # these are the vps joined to roads that we will narrow down - # for interpolation - sjoin_vp_roads = filter_long_sjoin_results(sjoin_long, sjoin_wide) + return shape_road_crosswalk + + +def make_road_stops_long( + shape_road_crosswalk: pd.DataFrame +) -> gpd.GeoDataFrame: - # Pull the vp info for ones that join to road segments - vp_usable = dd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}", - columns = ["trip_instance_key", "vp_idx", "x", "y"], + road_segments = gpd.read_parquet( + f"{SHARED_GCS}road_segments/", + columns = segment_identifier_cols + ["geometry"], ) - vp_with_roads = dd.merge( - vp_usable, - sjoin_vp_roads, - on = ["trip_instance_key", "vp_idx"], + road_segments2 = pd.merge( + road_segments, + shape_road_crosswalk, + on = ["linearid", "mtfcc", "segment_sequence"], how = "inner" ) - return vp_with_roads + # Beginning of road segment (1st coord) + road_segments0 = road_segments2.assign( + geometry = road_segments2.apply( + lambda x: shapely.Point(x.geometry.coords[0]), + axis=1), + ).assign(stop_type=0) + # End of road segment (last coord) + road_segments1 = road_segments2.assign( + geometry = road_segments2.apply( + lambda x: shapely.Point(x.geometry.coords[-1]), + axis=1), + ).assign(stop_type=1) -def expand_road_segments( - roads: pd.DataFrame, - cols_to_unpack: list -) -> pd.DataFrame: - """ - Convert roads from wide to long. - This set up makes roads segment endpoints look like bus stops. - Once we use roads full line geometry to project, we can - drop it and get it in a long format. - """ - roads_long = (roads.drop(columns = "geometry") - .explode(cols_to_unpack) - .reset_index(drop=True) - ) - - return roads_long - - -def make_wide(vp_with_projected: pd.DataFrame): - - vp_projected_wide = (vp_with_projected - .groupby(["trip_instance_key"] + road_id_cols) - .agg({ - "vp_idx": lambda x: list(x), - "shape_meters": lambda x: list(x)}) - .reset_index() - .rename(columns = { - "vp_idx": "vp_idx_arr", - "shape_meters": "shape_meters_arr" - }) - ) - - #https://stackoverflow.com/questions/42099024/pandas-pivot-table-rename-columns - narrowed_down_roads = (vp_with_projected - .pivot_table(index = ["trip_instance_key"] + road_id_cols, - aggfunc = {"shape_meters": ["min", "max"]}) - .pipe(lambda x: - x.set_axis(map('_'.join, x), axis=1) - # if all column names are strings - # ('_'.join(map(str, c)) for c in x) - # if some column names are not strings - ) - .reset_index() - ) - - vp_projected_wide2 = pd.merge( - vp_projected_wide, - narrowed_down_roads, - on = ["trip_instance_key"] + road_id_cols, - how = "inner" - ) + # Make long, similar to how stop_times is set up + # For roads, each segment has beginning and end stop + # We want to interpolate arrival times for both + # to calculate speed + road_segments_long = pd.concat( + [road_segments0, road_segments1], + axis=0 + ).sort_values( + ["linearid", "segment_sequence", "stop_type"] + ).rename( + columns = {"primary_direction": "stop_primary_direction"} + ).reset_index(drop=True) - return vp_projected_wide2 + return road_segments_long +def merge_nn_with_shape(results2): -if __name__ == "__main__": + results2 = results2.assign( + stop_geometry = results2.stop_geometry.to_crs(PROJECT_CRS), + vp_coords_trio = results2.vp_coords_trio.to_crs(PROJECT_CRS) + ) - from segment_speed_utils.project_vars import analysis_date + shapes = helpers.import_scheduled_shapes( + analysis_date, + columns = ["shape_array_key", "geometry"], + crs = PROJECT_CRS + ).dropna(subset="geometry") + + gdf = pd.merge( + results2, + shapes.rename(columns = {"geometry": "shape_geometry"}), + on = "shape_array_key", + how = "inner" + ) - start = datetime.datetime.now() + stop_meters_series = [] + stop_arrival_series = [] - road_id_cols = ["linearid", "mtfcc"] - segment_identifier_cols = road_id_cols + ["segment_sequence"] + for row in gdf.itertuples(): + + stop_meters, interpolated_arrival = interpolate_stop_arrival.project_points_onto_shape( + getattr(row, "stop_geometry"), + getattr(row, "vp_coords_trio"), + getattr(row, "shape_geometry"), + getattr(row, "location_timestamp_local_trio") + ) + + stop_meters_series.append(stop_meters) + stop_arrival_series.append(interpolated_arrival) + + results2 = gdf.assign( + stop_meters = stop_meters_series, + arrival_time = stop_arrival_series, + )[segment_identifier_cols2 + [ + "trip_instance_key", "shape_array_key", + "stop_type", + "stop_meters", "arrival_time"] + ].sort_values( + segment_identifier_cols2 + ["trip_instance_key", "stop_type", ] + ).reset_index(drop=True) - vp = merge_vp_to_crosswalk(analysis_date) - vp = vp.repartition(npartitions=150) + return results2 + + +def quick_calculate_speeds(results2): + grouped_df = results2.groupby(segment_identifier_cols2 + + ["trip_instance_key"]) + + min_arrival = grouped_df.agg({"arrival_time": "min"}).reset_index() + max_arrival = grouped_df.agg({"arrival_time": "max"}).reset_index() - subset_roads = vp.linearid.unique().compute().tolist() + # If min/max arrival are the same, remove + # The same trio of vp is attached to the road segment's + # beginning and end + min_max_arrival = pd.merge( + min_arrival, + max_arrival, + on = segment_identifier_cols2 + ["trip_instance_key"] + ).query('arrival_time_x != arrival_time_y') - roads = gpd.read_parquet( - f"{SEGMENT_GCS}segments_staging/" - f"roads_with_cutpoints_wide_{analysis_date}.parquet", - filters = [[("linearid", "in", subset_roads)]] + results3 = pd.merge( + results2, + min_max_arrival[segment_identifier_cols2 + ["trip_instance_key"]], + on = segment_identifier_cols2 + ["trip_instance_key"], + how = "inner" ) - road_dtypes = vp[road_id_cols].dtypes.to_dict() - - vp_projected = vp.map_partitions( - wrangle_shapes.project_vp_onto_segment_geometry, - roads, - grouping_cols = road_id_cols, - meta = { - "vp_idx": "int", - **road_dtypes, - "shape_meters": "float", - }, - align_dataframes = False - ).persist() - - vp2 = vp[["vp_idx", "trip_instance_key"] + segment_identifier_cols] - - vp_with_projected = dd.merge( - vp2, - vp_projected, - on = ["vp_idx"] + road_id_cols, - how = "inner" - ).compute() + results3 = segment_calcs.convert_timestamp_to_seconds( + results3, ["arrival_time"] + ).sort_values( + segment_identifier_cols2 + ["trip_instance_key"] + ).reset_index(drop=True) + + trip_cols = segment_identifier_cols2 + ["trip_instance_key"] - vp_with_projected.to_parquet( - f"{SEGMENT_GCS}projection/vp_projected_roads_{analysis_date}.parquet" + results3 = results3.assign( + subseq_arrival_time_sec = (results3.groupby(trip_cols, + observed=True, group_keys=False) + .arrival_time_sec + .shift(-1) + ), + subseq_stop_meters = (results3.groupby(trip_cols, + observed=True, group_keys=False) + .stop_meters + .shift(-1) + ) ) - ''' - vp_projected_wide2 = make_wide(vp_with_projected) - - road_cutpoints = expand_road_segments( - roads, - ["road_meters_arr", "segment_sequence_arr", "road_direction_arr"] - ).rename(columns = { - "road_meters_arr": "road_meters", - "segment_sequence_arr": "segment_sequence", - "road_direction_arr": "primary_direction" - }) - - # Now merge road segments with each destination acting as the road's stop - # and merge on arrays of projected vp against that road - gdf = pd.merge( - road_cutpoints, - vp_projected_wide2, - on = road_id_cols, - how = "inner" - ).query( - 'road_meters >= shape_meters_min & road_meters <= shape_meters_max' - ).drop( - columns = ["shape_meters_min", "shape_meters_max"] + + speed = results3.assign( + meters_elapsed = results3.subseq_stop_meters - results3.stop_meters, + sec_elapsed = results3.subseq_arrival_time_sec - results3.arrival_time_sec, + ).pipe( + segment_calcs.derive_speed, + ("stop_meters", "subseq_stop_meters"), + ("arrival_time_sec", "subseq_arrival_time_sec") ) - nearest_vp_idx = [] - subseq_vp_idx = [] + return speed - for row in gdf.itertuples(): - this_stop_meters = getattr(row, "road_meters") - valid_shape_meters_array = getattr(row, "shape_meters_arr") - valid_vp_idx_array = np.asarray(getattr(row, "vp_idx_arr")) - - if ( - (this_stop_meters >= min(valid_shape_meters_array)) and - (this_stop_meters <= max(valid_shape_meters_array)) - ): - - idx = np.searchsorted( - valid_shape_meters_array, - this_stop_meters, - side="right" - # want our stop_meters value to be < vp_shape_meters, - # side = "left" would be stop_meters <= vp_shape_meters - ) - - # For the next value, if there's nothing to index into, - # just set it to the same position - # if we set subseq_value = getattr(row, )[idx], - # we might not get a consecutive vp - nearest_value = valid_vp_idx_array[idx-1] - subseq_value = nearest_value + 1 - - else: - nearest_value = np.nan - subseq_value = np.nan - - nearest_vp_idx.append(nearest_value) - subseq_vp_idx.append(subseq_value) - - result = gdf[segment_identifier_cols + [ - "primary_direction", "road_meters", - "trip_instance_key"]] - - # Now assign the nearest vp for each trip that's nearest to - # a given stop - # Need to find the one after the stop later - result = result.assign( - nearest_vp_idx = nearest_vp_idx, - subseq_vp_idx = subseq_vp_idx, +if __name__ == "__main__": + + #from segment_speed_utils.project_vars import analysis_date + from shared_utils import rt_dates + analysis_date = rt_dates.DATES["oct2023"] + + start = datetime.datetime.now() + + shape_road_crosswalk = get_shape_road_crosswalk( + analysis_date, + filters = [[("shape_array_key", "in", test_shapes)]] ) - result.to_parquet( - f"{SEGMENT_GCS}projection/nearest_vp_roads_{analysis_date}.parquet" + road_segments_long = make_road_stops_long(shape_road_crosswalk) + + gdf = neighbor.merge_stop_vp_for_nearest_neighbor( + road_segments_long, + analysis_date ) - ''' + + results = neighbor.add_nearest_neighbor_result(gdf, analysis_date) + #results = compute(results)[0] + + #utils.geoparquet_gcs_export( + # results, + # SEGMENT_GCS, + # f"roads_staging/nearest_{analysis_date}" + #) + + results2 = delayed(merge_nn_with_shape)(results) + #results2 = compute(results2)[0] + + #utils.geoparquet_gcs_export( + # results2, + # SEGMENT_GCS, + # f"roads_staging/interp_{analysis_date}" + #) + + speeds = delayed(quick_calculate_speeds)(results2) + + time1 = datetime.datetime.now() + print(f"delayed dfs: {time1 - start}") + + speeds = compute(speeds)[0] + + speeds.to_parquet( + f"{SEGMENT_GCS}roads_staging/" + f"test_speeds_{analysis_date}.parquet") + end = datetime.datetime.now() - print(f"execution time: {end - start}") \ No newline at end of file + print(f"test 25 shapes: {end - start}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/sjoin_vp_to_roads.py b/rt_segment_speeds/scripts/sjoin_vp_to_roads.py deleted file mode 100644 index cf909faa3..000000000 --- a/rt_segment_speeds/scripts/sjoin_vp_to_roads.py +++ /dev/null @@ -1,183 +0,0 @@ -import dask.dataframe as dd -import dask_geopandas as dg -import datetime -import geopandas as gpd -import pandas as pd - -from dask import delayed -from segment_speed_utils.project_vars import (SEGMENT_GCS, analysis_date, - SHARED_GCS, - PROJECT_CRS) -from segment_speed_utils import helpers, wrangle_shapes - - -def sjoin_vp_to_roads( - vp: dd.DataFrame, - roads: gpd.GeoDataFrame, - road_grouping_cols: list, -) -> dd.DataFrame: - - # Need to do similar thing as sjoin with shapes - # because vp can only join to certain road segments - # not all road segments that shape doesn't travel on - # https://github.com/cal-itp/data-analyses/blob/64afd3b9ed1e239a0ca4da486e2fe1e857b17dde/rt_segment_speeds/scripts/A1_sjoin_vp_segments.py - - vp_gdf = wrangle_shapes.vp_as_gdf(vp) - - results = gpd.sjoin( - vp_gdf, - roads, - how = "inner", - predicate = "within" - ).query( - "shape_array_key_left == shape_array_key_right" - )[ - ["vp_idx", "trip_instance_key"] + - road_grouping_cols - ].drop_duplicates().sort_values("vp_idx").reset_index(drop=True) - - return results - - -def make_wide( - full_results: pd.DataFrame, - road_cols: list -) -> pd.DataFrame: - results_wide = (full_results.groupby(["trip_instance_key"] + road_cols, - observed=True, group_keys=False) - .agg({ - "vp_idx": lambda x: list(set(x)), - "segment_sequence": "nunique", - }) - .reset_index() - .rename(columns = {"vp_idx": "vp_idx_arr"}) - ) - - # No point in keeping results where there are fewer than 2 vp for entire road - results_wide = results_wide.assign( - n_vp = results_wide.apply(lambda x: len(x.vp_idx_arr), axis=1) - ).query( - 'n_vp > 1 & segment_sequence > 1' - ).drop( - columns = ["n_vp", "segment_sequence"] - ).reset_index(drop=True) - - return results_wide - - -def import_vp(analysis_date: str): - vp = dd.read_parquet( - f"{SEGMENT_GCS}vp_usable_{analysis_date}", - columns = ["vp_idx", "trip_instance_key", "x", "y", - "vp_primary_direction"], - ) - - rt_trips = vp.trip_instance_key.unique().compute().tolist() - - trip_to_shape = helpers.import_scheduled_trips( - analysis_date, - columns = ["trip_instance_key", "shape_array_key"], - filters = [[("trip_instance_key", "in", rt_trips)]], - get_pandas = True - ) - - vp2 = dd.merge( - vp, - trip_to_shape, - on = "trip_instance_key", - how = "inner" - ).repartition(npartitions=25) - - return vp2 - - -def import_roads( - analysis_date: str, - road_grouping_cols: list, -) -> dg.GeoDataFrame: - - shape_road_crosswalk = pd.read_parquet( - f"{SEGMENT_GCS}shape_road_crosswalk_{analysis_date}.parquet", - ) - - road_segments = dg.read_parquet( - f"{SHARED_GCS}road_segments/", - columns = road_grouping_cols + ["primary_direction", "geometry"] - ).merge( - shape_road_crosswalk, - on = road_grouping_cols, - how = "inner" - ) - - road_segments = road_segments.assign( - geometry = road_segments.geometry.buffer(15) - ) - - road_segments = road_segments.repartition(npartitions=25) - - return road_segments - - -if __name__ == "__main__": - - start = datetime.datetime.now() - - road_cols = ["linearid", "mtfcc"] - segment_identifier_cols = road_cols + ["segment_sequence"] - - vp = import_vp(analysis_date).persist() - roads = import_roads(analysis_date, segment_identifier_cols).persist() - - vp_dfs = [ - vp[vp.vp_primary_direction == d] - for d in wrangle_shapes.ALL_DIRECTIONS - ] - - # Remove all the Unknowns (vps that aren't moving) - road_dfs = [ - roads[roads.primary_direction != - wrangle_shapes.OPPOSITE_DIRECTIONS[d] - ] for d in wrangle_shapes.ALL_DIRECTIONS - ] - - - vp_cols_dtypes = vp[["vp_idx", "trip_instance_key"]].dtypes.to_dict() - road_cols_dtypes = roads[segment_identifier_cols].dtypes.to_dict() - - results = [ - vp_subset.map_partitions( - sjoin_vp_to_roads, - road_subset, - segment_identifier_cols, - meta = { - **vp_cols_dtypes, - **road_cols_dtypes - }, - align_dataframes = False - ) for vp_subset, road_subset in zip(vp_dfs, road_dfs) - ] - print("map partitions") - - full_results = dd.multi.concat(results, axis=0).reset_index(drop=True) - full_results = full_results.repartition(npartitions=3).persist() - - time1 = datetime.datetime.now() - print(f"map partitions and persist full results: {time1 - start}") - - full_results.to_parquet( - f"{SEGMENT_GCS}vp_sjoin/vp_road_segments_{analysis_date}", - overwrite=True - ) - - full_results = delayed(pd.read_parquet)( - f"{SEGMENT_GCS}vp_sjoin/vp_road_segments_{analysis_date}" - ) - - results_wide = delayed(make_wide)(full_results, road_cols) - - results_wide.compute().to_parquet( - f"{SEGMENT_GCS}vp_sjoin/vp_road_segments_wide_{analysis_date}.parquet", - ) - - end = datetime.datetime.now() - print(f"execution time: {end - start}") \ No newline at end of file diff --git a/rt_segment_speeds/scripts/vp_on_road_segments.ipynb b/rt_segment_speeds/scripts/vp_on_road_segments.ipynb index 2d3e25dda..e31afbbca 100644 --- a/rt_segment_speeds/scripts/vp_on_road_segments.ipynb +++ b/rt_segment_speeds/scripts/vp_on_road_segments.ipynb @@ -7,1108 +7,902 @@ "metadata": {}, "outputs": [], "source": [ - "import dask.dataframe as dd\n", "import geopandas as gpd\n", "import numpy as np\n", "import pandas as pd\n", "\n", - "from segment_speed_utils import helpers, wrangle_shapes\n", - "from segment_speed_utils.project_vars import (SEGMENT_GCS, \n", - " CONFIG_PATH, PROJECT_CRS)\n", + "from shared_utils import rt_dates\n", + "from segment_speed_utils import (helpers, neighbor, \n", + " segment_calcs, wrangle_shapes\n", + " )\n", + "from segment_speed_utils.project_vars import SEGMENT_GCS, SHARED_GCS\n", "\n", - "from segment_speed_utils.project_vars import analysis_date\n", + "analysis_date = rt_dates.DATES[\"oct2023\"]\n", "\n", "road_id_cols = [\"linearid\", \"mtfcc\", \"primary_direction\"]\n", "segment_identifier_cols = road_id_cols + [\"segment_sequence\"]\n", "\n", - "test_trip = \"00139041e36b607c7e10cb7ec023e837\"" + "import nearest_vp_to_road" ] }, { "cell_type": "code", - "execution_count": null, - "id": "d6d09963-2e38-428a-a3d2-881e7725325f", + "execution_count": 2, + "id": "1716354f-0c49-4744-af8f-a49ce538ed71", "metadata": {}, "outputs": [], "source": [ - "df = pd.read_parquet(\n", - " f\"{SEGMENT_GCS}vp_sjoin/vp_road_segments_wide_{analysis_date}.parquet\",\n", - " columns = [\"trip_instance_key\"]\n", - ").drop_duplicates()" + "pd.read_parquet(\n", + " f\"{SEGMENT_GCS}roads_staging/\"\n", + " f\"shape_road_crosswalk_{analysis_date}.parquet\",\n", + ").shape_array_key.unique()[:25]" ] }, { "cell_type": "code", "execution_count": 2, - "id": "51e8ebcb-fb45-40ec-a461-352e46616627", + "id": "40082a7d-be96-490a-913e-4422162c75e6", "metadata": {}, "outputs": [], "source": [ - "test_trips = ['00062c6db9dbef9c80f5ada74b31e257',\n", - " '0009c78b48866a26d664ab00f67d1606',\n", - " '00139041e36b607c7e10cb7ec023e837',\n", - " 'ff780650b98209acf69a71a7dab2502c',\n", - " 'ff86898d6a8ff5df82912699133bf4b6',\n", - " 'ffb44943b394f891d2b2286bb3902305']" + "from dask import delayed, compute\n", + "import dask.dataframe as dd" ] }, { "cell_type": "code", "execution_count": 3, - "id": "251c4aad-b470-4ee6-b52d-f7b151dc46af", + "id": "9383ac86-71fb-4120-b97c-85ee4778ca36", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "(17079531, 5)" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "import nearest_vp_to_road\n", + "shape_road_crosswalk = nearest_vp_to_road.get_shape_road_crosswalk(\n", + " analysis_date, \n", + ")\n", "\n", - "vp = nearest_vp_to_road.merge_vp_to_crosswalk(\n", - " analysis_date,\n", - " filters = [[(\"trip_instance_key\", \"in\", test_trips)]]\n", - ")" + "shape_road_crosswalk.shape" ] }, { "cell_type": "code", "execution_count": 4, - "id": "b81717ec-0985-4fd4-8c7c-9326127851d3", + "id": "1a1e06b8-6634-4030-8e7a-5ce0e99e7906", "metadata": {}, "outputs": [], "source": [ - "subset_roads = vp.linearid.unique().compute().tolist()" + "road_segments_long = delayed(nearest_vp_to_road.make_road_stops_long)(\n", + " shape_road_crosswalk)" ] }, { "cell_type": "code", - "execution_count": 5, - "id": "e22e5754-9dac-447b-bfb5-dfb33e734552", + "execution_count": null, + "id": "aa06078a-98e7-47c5-b734-09bbd709eb15", "metadata": {}, "outputs": [], "source": [ - "road_segments = nearest_vp_to_road.expand_relevant_road_segments(\n", - " analysis_date,\n", - " segment_identifier_cols = segment_identifier_cols,\n", - " filters = [[(\"linearid\", \"in\", subset_roads)]],\n", - ")" + "road_segments_long = dd.from_delayed(road_segments_long)" ] }, { "cell_type": "code", - "execution_count": 21, - "id": "9c9bf41d-1e39-4581-9873-732a158463c3", + "execution_count": null, + "id": "8981b5ce-f106-4b77-9c61-0844ba78d3df", "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
linearidmtfccprimary_directionsegment_sequencedestinationgeometryroad_meters
011010947364633S1100Southbound0POINT (-217551.160 -31852.640)MULTILINESTRING ((-217311.864 -31424.886, -217...500.0
111010947364633S1100Southbound1POINT (-217576.341 -32351.779)MULTILINESTRING ((-217311.864 -31424.886, -217...1000.0
\n", - "
" - ], - "text/plain": [ - " linearid mtfcc primary_direction segment_sequence \\\n", - "0 11010947364633 S1100 Southbound 0 \n", - "1 11010947364633 S1100 Southbound 1 \n", - "\n", - " destination \\\n", - "0 POINT (-217551.160 -31852.640) \n", - "1 POINT (-217576.341 -32351.779) \n", - "\n", - " geometry road_meters \n", - "0 MULTILINESTRING ((-217311.864 -31424.886, -217... 500.0 \n", - "1 MULTILINESTRING ((-217311.864 -31424.886, -217... 1000.0 " - ] - }, - "execution_count": 21, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ - "road_segments.head(2)" + "road_segments_long" ] }, { "cell_type": "code", - "execution_count": 7, - "id": "5a9f7bb4-c767-4264-b24f-f4bd9689f18c", + "execution_count": null, + "id": "d60d6653-6d12-4f4f-9c52-ee9998bedca2", "metadata": {}, "outputs": [], - "source": [ - "vp = vp.repartition(npartitions=3)" - ] + "source": [] }, { "cell_type": "code", - "execution_count": 8, - "id": "fc2dc807-c97c-4a20-98da-fa97ad3835fc", + "execution_count": null, + "id": "ef152578-f434-4e47-8983-83b664284118", "metadata": {}, "outputs": [], "source": [ - "road_dtypes = vp[road_id_cols].dtypes.to_dict()\n", "\n", - "vp_projected = vp.map_partitions(\n", - " wrangle_shapes.project_vp_onto_segment_geometry,\n", - " road_segments,\n", - " grouping_cols = road_id_cols,\n", - " meta = {\n", - " \"vp_idx\": \"int64\",\n", - " **road_dtypes,\n", - " \"shape_meters\": \"float\"},\n", - " align_dataframes = False\n", - ").persist()\n", "\n", - "# Merge vp with road segment info \n", - "# with projected shape meters against the full road \n", - "df_with_projection = dd.merge(\n", - " vp,\n", - " vp_projected,\n", - " on = [\"vp_idx\"] + road_id_cols,\n", - " how = \"inner\"\n", - ").drop(columns = [\"x\", \"y\"]).compute()" + "\n", + "gdf = delayed(neighbor.merge_stop_vp_for_nearest_neighbor)(\n", + " road_segments_long, \n", + " analysis_date\n", + ")" ] }, { "cell_type": "code", - "execution_count": 23, - "id": "190949c6-8dd2-48a2-b4d9-872ef8c3a015", + "execution_count": null, + "id": "3a602d9b-e440-4f62-931f-2460da0e3f3a", "metadata": {}, "outputs": [], "source": [ - "df_with_projection_wide = (df_with_projection\n", - " .groupby([\"trip_instance_key\"] + road_id_cols)\n", - " .agg({\n", - " \"vp_idx\": lambda x: list(x),\n", - " \"shape_meters\": lambda x: list(x),\n", - " })\n", - " .reset_index()\n", - " .rename(columns = {\n", - " \"vp_idx\": \"vp_idx_arr\",\n", - " \"shape_meters\": \"shape_meters_arr\"\n", - " })\n", - " )" + "road_segments_long.shape" ] }, { "cell_type": "code", - "execution_count": 25, - "id": "c3a7a8c2-7da7-44b5-9129-0c1be76348d3", + "execution_count": null, + "id": "052f3192-2a7a-40f0-bdaf-ef60a69f0e17", "metadata": {}, "outputs": [], "source": [ - "df_with_projection_wide = df_with_projection_wide.assign(\n", - " min_shape_meters = df_with_projection_wide.apply(\n", - " lambda x: x.shape_meters_arr[0], axis=1),\n", - " max_shape_meters = df_with_projection_wide.apply(\n", - " lambda x: x.shape_meters_arr[-1], axis=1), \n", - ")" + "results = delayed(neighbor.add_nearest_neighbor_result)(\n", + " gdf, analysis_date)" ] }, { "cell_type": "code", - "execution_count": 28, - "id": "7b1cb352-a07b-4a00-8546-995801b51ef8", + "execution_count": null, + "id": "c2285565-c417-4b05-a2d3-aa9693bf21d5", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c234ba77-c074-4d54-8ad9-a27b902e901e", "metadata": {}, "outputs": [], "source": [ - "# Now merge road segments with each destination acting as the road's stop\n", - "# and merge on arrays of projected vp against that road\n", - "gdf = pd.merge(\n", - " road_segments,\n", - " df_with_projection_wide,\n", - " on = road_id_cols,\n", - " how = \"inner\"\n", - ")" + "df = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}roads_staging/\"\n", + " f\"test_speeds_{analysis_date}.parquet\"\n", + ") " ] }, { "cell_type": "code", - "execution_count": 29, - "id": "ad4f65d9-b281-4c39-9efc-2c92f1fc719c", + "execution_count": 6, + "id": "31d8d5b1-3e6b-45e1-b271-4e6916a97a86", "metadata": {}, "outputs": [ { "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
linearidmtfccprimary_directionsegment_sequencedestinationgeometryroad_meterstrip_instance_keyvp_idx_arrshape_meters_arrmin_shape_metersmax_shape_meters
011010947364633S1100Southbound0POINT (-217551.160 -31852.640)MULTILINESTRING ((-217311.864 -31424.886, -217...500.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]339.540351339.540351
111010947364633S1100Southbound1POINT (-217576.341 -32351.779)MULTILINESTRING ((-217311.864 -31424.886, -217...1000.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]339.540351339.540351
\n", - "
" - ], "text/plain": [ - " linearid mtfcc primary_direction segment_sequence \\\n", - "0 11010947364633 S1100 Southbound 0 \n", - "1 11010947364633 S1100 Southbound 1 \n", - "\n", - " destination \\\n", - "0 POINT (-217551.160 -31852.640) \n", - "1 POINT (-217576.341 -32351.779) \n", - "\n", - " geometry road_meters \\\n", - "0 MULTILINESTRING ((-217311.864 -31424.886, -217... 500.0 \n", - "1 MULTILINESTRING ((-217311.864 -31424.886, -217... 1000.0 \n", - "\n", - " trip_instance_key vp_idx_arr \\\n", - "0 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "1 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "\n", - " shape_meters_arr min_shape_meters \\\n", - "0 [339.54035083029106, 339.54035083029106] 339.540351 \n", - "1 [339.54035083029106, 339.54035083029106] 339.540351 \n", - "\n", - " max_shape_meters \n", - "0 339.540351 \n", - "1 339.540351 " + "" ] }, - "execution_count": 29, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" - } - ], - "source": [ - "gdf.head(2)" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "4b91114b-a8a9-4f2a-9c54-04a2b8606531", - "metadata": {}, - "outputs": [ + }, { "data": { + "image/png": "", "text/plain": [ - "(1000.0000000000001, [339.54035083029106, 339.54035083029106])" + "
" ] }, - "execution_count": 12, "metadata": {}, - "output_type": "execute_result" + "output_type": "display_data" } ], "source": [ - "n = 1\n", - "gdf.road_meters.iloc[n], gdf.shape_meters_arr.iloc[n]" + "df.speed_mph.hist(bins=range(0, 100, 5))" ] }, { "cell_type": "code", - "execution_count": 13, - "id": "adb3d2a4-e384-4657-8622-b6fdec81a7ff", + "execution_count": 7, + "id": "8797146e-6c3a-4dbf-93a7-7e0491d9f371", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "(5500.000000000003, [339.54035083029106, 339.54035083029106])" + "count 16549.000000\n", + "mean 62.695220\n", + "std 15.696857\n", + "min 50.008048\n", + "25% 55.952116\n", + "50% 60.287453\n", + "75% 63.596422\n", + "max 325.685493\n", + "Name: speed_mph, dtype: float64" ] }, - "execution_count": 13, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "n = 10\n", - "gdf.road_meters.iloc[n], gdf.shape_meters_arr.iloc[n]" + "df[(df.speed_mph >= 50) & (df.sec_elapsed >= 20)].speed_mph.describe()" ] }, { "cell_type": "code", - "execution_count": 14, - "id": "acc93032-b8c6-4d5b-930e-0ae422a250d7", + "execution_count": 12, + "id": "c6a1ec62-4d7b-4e1b-93a6-0c9549ec9810", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "(19999.999999999993, [339.54035083029106, 339.54035083029106])" + "" ] }, - "execution_count": 14, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "n = 39\n", - "gdf.road_meters.iloc[n], gdf.shape_meters_arr.iloc[n]" + "df[(df.meters_elapsed >= 1609/2) & \n", + " (df.sec_elapsed >= 40)].speed_mph.hist(bins=range(0, 100, 5))" ] }, { "cell_type": "code", "execution_count": 15, - "id": "6ad917a5-1e01-404c-a975-cac502890721", + "id": "693bbf06-56ff-477e-8f12-0ab962973ddb", "metadata": {}, "outputs": [ { "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
linearidmtfccprimary_directionsegment_sequencedestinationgeometryroad_meterstrip_instance_keyvp_idx_arrshape_meters_arr
011010947364633S1100Southbound0POINT (-217551.160 -31852.640)MULTILINESTRING ((-217311.864 -31424.886, -217...500.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]
111010947364633S1100Southbound1POINT (-217576.341 -32351.779)MULTILINESTRING ((-217311.864 -31424.886, -217...1000.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]
211010947364633S1100Southbound2POINT (-217512.017 -32844.758)MULTILINESTRING ((-217311.864 -31424.886, -217...1500.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]
311010947364633S1100Southbound3POINT (-217506.222 -33343.486)MULTILINESTRING ((-217311.864 -31424.886, -217...2000.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]
411010947364633S1100Southbound4POINT (-217595.334 -33833.011)MULTILINESTRING ((-217311.864 -31424.886, -217...2500.0ffb44943b394f891d2b2286bb3902305[12751727, 12751726][339.54035083029106, 339.54035083029106]
.................................
851108475858996S1200Southbound2POINT (-212979.148 -22668.487)MULTILINESTRING ((-213174.831 -21182.164, -213...1500.000062c6db9dbef9c80f5ada74b31e257[6357586, 6357587, 6357588][189.6522603479828, 293.39996228253057, 308.90...
861108475858996S1200Southbound3POINT (-212909.560 -23163.518)MULTILINESTRING ((-213174.831 -21182.164, -213...2000.000062c6db9dbef9c80f5ada74b31e257[6357586, 6357587, 6357588][189.6522603479828, 293.39996228253057, 308.90...
871108475858996S1200Southbound4POINT (-212845.047 -23658.934)MULTILINESTRING ((-213174.831 -21182.164, -213...2500.000062c6db9dbef9c80f5ada74b31e257[6357586, 6357587, 6357588][189.6522603479828, 293.39996228253057, 308.90...
881108475858996S1200Southbound5POINT (-212774.877 -24153.648)MULTILINESTRING ((-213174.831 -21182.164, -213...3000.000062c6db9dbef9c80f5ada74b31e257[6357586, 6357587, 6357588][189.6522603479828, 293.39996228253057, 308.90...
891108475858996S1200Southbound6POINT (-212657.358 -24637.736)MULTILINESTRING ((-213174.831 -21182.164, -213...3500.000062c6db9dbef9c80f5ada74b31e257[6357586, 6357587, 6357588][189.6522603479828, 293.39996228253057, 308.90...
\n", - "

90 rows × 10 columns

\n", - "
" - ], "text/plain": [ - " linearid mtfcc primary_direction segment_sequence \\\n", - "0 11010947364633 S1100 Southbound 0 \n", - "1 11010947364633 S1100 Southbound 1 \n", - "2 11010947364633 S1100 Southbound 2 \n", - "3 11010947364633 S1100 Southbound 3 \n", - "4 11010947364633 S1100 Southbound 4 \n", - ".. ... ... ... ... \n", - "85 1108475858996 S1200 Southbound 2 \n", - "86 1108475858996 S1200 Southbound 3 \n", - "87 1108475858996 S1200 Southbound 4 \n", - "88 1108475858996 S1200 Southbound 5 \n", - "89 1108475858996 S1200 Southbound 6 \n", - "\n", - " destination \\\n", - "0 POINT (-217551.160 -31852.640) \n", - "1 POINT (-217576.341 -32351.779) \n", - "2 POINT (-217512.017 -32844.758) \n", - "3 POINT (-217506.222 -33343.486) \n", - "4 POINT (-217595.334 -33833.011) \n", - ".. ... \n", - "85 POINT (-212979.148 -22668.487) \n", - "86 POINT (-212909.560 -23163.518) \n", - "87 POINT (-212845.047 -23658.934) \n", - "88 POINT (-212774.877 -24153.648) \n", - "89 POINT (-212657.358 -24637.736) \n", - "\n", - " geometry road_meters \\\n", - "0 MULTILINESTRING ((-217311.864 -31424.886, -217... 500.0 \n", - "1 MULTILINESTRING ((-217311.864 -31424.886, -217... 1000.0 \n", - "2 MULTILINESTRING ((-217311.864 -31424.886, -217... 1500.0 \n", - "3 MULTILINESTRING ((-217311.864 -31424.886, -217... 2000.0 \n", - "4 MULTILINESTRING ((-217311.864 -31424.886, -217... 2500.0 \n", - ".. ... ... \n", - "85 MULTILINESTRING ((-213174.831 -21182.164, -213... 1500.0 \n", - "86 MULTILINESTRING ((-213174.831 -21182.164, -213... 2000.0 \n", - "87 MULTILINESTRING ((-213174.831 -21182.164, -213... 2500.0 \n", - "88 MULTILINESTRING ((-213174.831 -21182.164, -213... 3000.0 \n", - "89 MULTILINESTRING ((-213174.831 -21182.164, -213... 3500.0 \n", - "\n", - " trip_instance_key vp_idx_arr \\\n", - "0 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "1 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "2 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "3 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - "4 ffb44943b394f891d2b2286bb3902305 [12751727, 12751726] \n", - ".. ... ... \n", - "85 00062c6db9dbef9c80f5ada74b31e257 [6357586, 6357587, 6357588] \n", - "86 00062c6db9dbef9c80f5ada74b31e257 [6357586, 6357587, 6357588] \n", - "87 00062c6db9dbef9c80f5ada74b31e257 [6357586, 6357587, 6357588] \n", - "88 00062c6db9dbef9c80f5ada74b31e257 [6357586, 6357587, 6357588] \n", - "89 00062c6db9dbef9c80f5ada74b31e257 [6357586, 6357587, 6357588] \n", - "\n", - " shape_meters_arr \n", - "0 [339.54035083029106, 339.54035083029106] \n", - "1 [339.54035083029106, 339.54035083029106] \n", - "2 [339.54035083029106, 339.54035083029106] \n", - "3 [339.54035083029106, 339.54035083029106] \n", - "4 [339.54035083029106, 339.54035083029106] \n", - ".. ... \n", - "85 [189.6522603479828, 293.39996228253057, 308.90... \n", - "86 [189.6522603479828, 293.39996228253057, 308.90... \n", - "87 [189.6522603479828, 293.39996228253057, 308.90... \n", - "88 [189.6522603479828, 293.39996228253057, 308.90... \n", - "89 [189.6522603479828, 293.39996228253057, 308.90... \n", - "\n", - "[90 rows x 10 columns]" + "count 34836.000000\n", + "mean 35.843176\n", + "std 24.241283\n", + "min 0.025925\n", + "25% 12.368664\n", + "50% 36.909153\n", + "75% 57.950581\n", + "max 294.128701\n", + "Name: speed_mph, dtype: float64" ] }, - "execution_count": 15, "metadata": {}, - "output_type": "execute_result" + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "AxesSubplot(0.125,0.11;0.775x0.77)\n" + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "gdf" + "cutoff = 30\n", + "subset = df[(df.meters_elapsed >= 1609/2) & \n", + " (df.sec_elapsed >= cutoff)]\n", + "display(subset.speed_mph.describe())\n", + "subset.speed_mph.hist(bins=range(0, 100, 5))" ] }, { "cell_type": "code", "execution_count": 16, - "id": "0c81a245-6f96-41a0-9243-a63547565705", - "metadata": {}, - "outputs": [], - "source": [ - "nearest_vp_idx = []\n", - "subseq_vp_idx = []\n", - "\n", - "for row in gdf.itertuples():\n", - " \n", - " this_stop_meters = getattr(row, \"road_meters\")\n", - " valid_shape_meters_array = getattr(row, \"shape_meters_arr\")\n", - " valid_vp_idx_array = np.asarray(getattr(row, \"vp_idx_arr\"))\n", - "\n", - " idx = np.searchsorted(\n", - " valid_shape_meters_array,\n", - " this_stop_meters,\n", - " side=\"right\" \n", - " # want our stop_meters value to be < vp_shape_meters,\n", - " # side = \"left\" would be stop_meters <= vp_shape_meters\n", - " )\n", - "\n", - " # For the next value, if there's nothing to index into, \n", - " # just set it to the same position\n", - " # if we set subseq_value = getattr(row, )[idx], \n", - " # we might not get a consecutive vp\n", - " nearest_value = valid_vp_idx_array[idx-1]\n", - " subseq_value = nearest_value + 1\n", - "\n", - " nearest_vp_idx.append(nearest_value)\n", - " subseq_vp_idx.append(subseq_value)" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "id": "b4dc8169-bdc3-4a57-aca5-e4266db8a190", + "id": "d3f9212b-2656-4540-a49a-5be333299e4b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "Index(['linearid', 'mtfcc', 'primary_direction', 'segment_sequence',\n", - " 'destination', 'geometry', 'road_meters', 'trip_instance_key',\n", - " 'vp_idx_arr', 'shape_meters_arr'],\n", - " dtype='object')" + "count 25461.000000\n", + "mean 26.101453\n", + "std 20.342335\n", + "min 0.025925\n", + "25% 9.173374\n", + "50% 18.607940\n", + "75% 44.616456\n", + "max 215.216123\n", + "Name: speed_mph, dtype: float64" ] }, - "execution_count": 17, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "gdf.columns" + "cutoff = 40\n", + "subset = df[(df.meters_elapsed >= 1609/2) & \n", + " (df.sec_elapsed >= cutoff)]\n", + "display(subset.speed_mph.describe())\n", + "subset.speed_mph.hist(bins=range(0, 100, 5))" ] }, { "cell_type": "code", - "execution_count": 20, - "id": "a046c68d-2de8-4190-8480-2763ef7acca9", + "execution_count": 17, + "id": "f7a5b1a6-0120-4c5d-bb7b-9bf0d728c748", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "[189.6522603479828, 293.39996228253057, 308.9056048713142]" + "count 21705.000000\n", + "mean 21.146308\n", + "std 17.523701\n", + "min 0.025925\n", + "25% 7.952368\n", + "50% 15.273050\n", + "75% 31.172054\n", + "max 162.082992\n", + "Name: speed_mph, dtype: float64" ] }, - "execution_count": 20, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 17, "metadata": {}, "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "gdf[gdf.linearid==\"1108475858996\"].shape_meters_arr.iloc[0]" + "cutoff = 45\n", + "subset = df[(df.meters_elapsed >= 1609/2) & \n", + " (df.sec_elapsed >= cutoff)]\n", + "display(subset.speed_mph.describe())\n", + "subset.speed_mph.hist(bins=range(0, 100, 5))" ] }, { "cell_type": "code", "execution_count": null, - "id": "37ab1ef4-9140-4690-ac97-b07a34fd78ef", + "id": "aa5db652-bb34-43f5-bbfe-9113593b988b", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3e93b92b-373c-44db-805e-8e98ec56919b", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1df84ba-c420-40f9-98df-4791b67ef4a4", + "metadata": {}, + "outputs": [], + "source": [ + "df.speed_mph.hist(bins=range(0, 100, 5))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0092cfd5-80e5-4f00-8ffd-89b804ddbb2d", + "metadata": {}, + "outputs": [], + "source": [ + "shape_road_crosswalk" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e9bff1f-fb34-4016-a9e0-d37daf0f3227", "metadata": {}, "outputs": [], "source": [ - "result = gdf[segment_identifier_cols + [\n", - " \"primary_direction\", \"fullname\", \"road_meters\", \n", - " \"trip_instance_key\"]]\n", + "shape_road_crosswalk = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}roads_staging/\"\n", + " f\"shape_road_crosswalk_{analysis_date}.parquet\",\n", + " filters = [[(\"linearid\", \"in\", two_roads)]]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "196c42ef-fbb3-410a-9cf7-ef560380b9b9", + "metadata": {}, + "outputs": [], + "source": [ + "one_shape = \"2419613ed3a3e49420a24f8d7efd3a4e\"\n", + "\n", + "shape_to_trip = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"shape_array_key\"],\n", + " filters = [[(\"shape_array_key\", \"==\", one_shape)]]\n", + ")\n", "\n", - "# Now assign the nearest vp for each trip that's nearest to\n", - "# a given stop\n", - "# Need to find the one after the stop later\n", - "result = result.assign(\n", - " nearest_vp_idx = nearest_vp_idx,\n", - " subseq_vp_idx = subseq_vp_idx,\n", + "one_trip = shape_to_trip.trip_instance_key[5]\n", + "one_trip" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11360899-7b76-4710-871f-818cf1e68082", + "metadata": {}, + "outputs": [], + "source": [ + "shape_road_crosswalk = shape_road_crosswalk.merge(\n", + " shape_to_trip,\n", + " on = \"shape_array_key\",\n", + " how = \"inner\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "f202317d-5ff2-4f97-bcf2-84384bc869c2", + "id": "d6d09963-2e38-428a-a3d2-881e7725325f", "metadata": {}, "outputs": [], "source": [ - "result.head()" + "vp_nn = gpd.read_parquet(\n", + " f\"{SEGMENT_GCS}condensed/vp_nearest_neighbor_{analysis_date}.parquet\",\n", + " filters = [[(\"trip_instance_key\", \"==\", one_trip)]]\n", + ")" ] }, { - "cell_type": "markdown", - "id": "907a3230-7e9f-4557-84fa-89c8c439077d", + "cell_type": "code", + "execution_count": null, + "id": "19ffa1f2-861c-4898-8906-dc3639221486", "metadata": {}, + "outputs": [], "source": [ - "once the sjoin is done and we know which trip_instance_keys are linked\n", - "to what road segments,\n", - "those roads should be expanded from 1 row to many rows.\n", - "go back to long for road segments, but it should be filtered down so that only the relevant cutpoints are included.\n", + "subset_shape = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"shape_array_key\"],\n", + " filters = [[(\"trip_instance_key\", \"==\", one_trip)]],\n", + " get_pandas = True\n", + ")\n", + "\n", + "shapes = helpers.import_scheduled_shapes(\n", + " analysis_date,\n", + " columns = [\"shape_array_key\", \"geometry\"],\n", + " filters = [[(\"shape_array_key\", \"in\", subset_shape.shape_array_key)]],\n", + " crs = \"EPSG:3310\"\n", + ").merge(subset_shape)\n", "\n", - "then, for vp, that entire array that attached onto the linearid, is put in as an input for search. also leave vp primary direction in.\n", - "filter that vp array down for just the direction we want, and find the \"nearest\" vp as that array has been projected against the road." + "shapes = shapes.assign(\n", + " geometry = shapes.geometry.buffer(25)\n", + ").to_crs(\"EPSG:4326\")" ] }, { "cell_type": "code", "execution_count": null, - "id": "197b1c39-4485-4e6b-b6b8-3795c841340f", + "id": "532418d9-0bf8-4ff3-8ae9-6d0f08c2770a", "metadata": {}, "outputs": [], "source": [ - "df = pd.read_parquet(\n", - " f\"{SEGMENT_GCS}nearest_vp_roads_{analysis_date}.parquet\"\n", - ")" + "shapes" ] }, { "cell_type": "code", "execution_count": null, - "id": "4f27fc9c-120c-46da-b9ef-e7222576d4ee", + "id": "a2cb0b70-89cd-445e-ba4e-a7610681ba40", "metadata": {}, "outputs": [], "source": [ - "subset_vp = np.union1d(\n", - " df.nearest_vp_idx.unique(), \n", - " df.subseq_vp_idx.unique()\n", - " )" + "road_segments = gpd.read_parquet(\n", + " f\"{SHARED_GCS}road_segments/\",\n", + " columns = segment_identifier_cols + [\"geometry\"],\n", + " filters = [[(\"linearid\", \"in\", two_roads)]]\n", + ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "7099bef9-a2d6-46f8-b336-7bdfa40dd7ad", + "id": "475f17f6-58e0-4a2d-b91f-a493beefd8bd", "metadata": {}, "outputs": [], "source": [ - "# modify this from interpolate_stop_arrivals\n", - "def attach_vp_shape_meters_with_timestamp(\n", - " analysis_date: str, **kwargs\n", - ") -> pd.DataFrame:\n", - " \"\"\"\n", - " \"\"\"\n", - " # shape_meters is here\n", - " vp_projected = pd.read_parquet(\n", - " f\"{SEGMENT_GCS}projection/vp_projected_roads_{analysis_date}.parquet\",\n", - " **kwargs\n", - " )\n", + "# We can substitute this step when we have already generated crosswalk\n", + "def sjoin_shape_to_road(shapes, roads):\n", + " \n", + " keep_cols = [\"shape_array_key\", \"trip_instance_key\", \n", + " \"linearid\", \n", + " \"mtfcc\", \"segment_sequence\"]\n", " \n", - " # location_timestamp_local is here, and needs to be converted to seconds\n", - " vp_usable = pd.read_parquet(\n", - " f\"{SEGMENT_GCS}vp_usable_{analysis_date}/\",\n", - " columns = [\"vp_idx\", \"location_timestamp_local\"],\n", - " **kwargs,\n", - " )\n", - "\n", - " vp_info = pd.merge(\n", - " vp_projected,\n", - " vp_usable,\n", - " on = \"vp_idx\",\n", - " how = \"inner\"\n", - " )\n", + " roads = roads.to_crs(shapes.crs)\n", + " \n", + " shapes_to_roads = gpd.sjoin(\n", + " shapes,\n", + " roads,\n", + " how = \"inner\",\n", + " predicate = \"intersects\"\n", + " )[keep_cols].drop_duplicates()\n", " \n", - " return vp_info" + " return shapes_to_roads\n", + "\n", + "#road_segments_sjoin = sjoin_shape_to_road(shapes, road_segments)" ] }, { "cell_type": "code", "execution_count": null, - "id": "9d4aa630-762c-4c47-b8c0-b90e81075c2b", + "id": "dbd38536-12dd-4c80-8bb6-9cb2838a6db1", "metadata": {}, "outputs": [], "source": [ - "vp_info = attach_vp_shape_meters_with_timestamp(\n", - " analysis_date, \n", - " filters = [[(\"vp_idx\", \"in\", subset_vp)]]\n", + "road_segments2 = pd.merge(\n", + " road_segments,\n", + " shape_road_crosswalk,\n", + " on = [\"linearid\", \"mtfcc\", \"segment_sequence\"], \n", + " how = \"inner\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "e439b456-d90a-49c3-8ed5-57401b53246e", + "id": "91cee88b-c60e-4c6a-b809-4d99ff31ebcf", "metadata": {}, "outputs": [], "source": [ - "road_trip_cols = road_id_cols + [\n", - " \"primary_direction\", \"trip_instance_key\"]\n", + "road_segments2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79a0e0e2-554f-47bf-9069-ec09a1963526", + "metadata": {}, + "outputs": [], + "source": [ + "import shapely\n", "\n", - "vp_with_nearest_info = pd.merge(\n", - " df,\n", - " vp_info.rename(columns = {\n", - " \"vp_idx\": \"nearest_vp_idx\",\n", - " \"shape_meters\": \"nearest_shape_meters\",\n", - " \"location_timestamp_local\": \"nearest_location_timestamp_local\"\n", - " }),\n", - " on = [\"nearest_vp_idx\"] + road_trip_cols,\n", - " how = \"inner\"\n", + "road_segments0 = road_segments2.assign(\n", + " geometry = road_segments2.apply(\n", + " lambda x: shapely.Point(x.geometry.coords[0]), \n", + " axis=1),\n", + ").assign(stop_type=0)\n", + "\n", + "road_segments1 = road_segments2.assign(\n", + " geometry = road_segments2.apply(\n", + " lambda x: shapely.Point(x.geometry.coords[-1]), \n", + " axis=1),\n", + ").assign(stop_type=1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "02ba0a84-5d1d-4255-ade1-fb96e1f87b69", + "metadata": {}, + "outputs": [], + "source": [ + "road_segments_long = pd.concat(\n", + " [road_segments0, road_segments1], \n", + " axis=0\n", + ").sort_values(\n", + " [\"linearid\", \"segment_sequence\", \"stop_type\"]\n", + ").rename(\n", + " columns = {\"primary_direction\": \"stop_primary_direction\"}\n", + ").reset_index(drop=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "34404271-3665-4c1c-b256-2557b0a3094a", + "metadata": {}, + "outputs": [], + "source": [ + "road_segments_long" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8cf6f7ea-015e-42e4-ade4-4ec1d59847e4", + "metadata": {}, + "outputs": [], + "source": [ + "from segment_speed_utils import neighbor\n", + "\n", + "gdf = neighbor.merge_stop_vp_for_nearest_neighbor(\n", + " road_segments_long, \n", + " analysis_date\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "329c055e-7071-4247-a4b9-fe7649acdb53", + "id": "bd51a842-5b37-4aee-a664-d3ce751421be", "metadata": {}, "outputs": [], "source": [ - "df2 = pd.merge(\n", - " vp_with_nearest_info,\n", - " vp_info.rename(columns = {\n", - " \"vp_idx\": \"subseq_vp_idx\",\n", - " \"shape_meters\": \"subseq_shape_meters\",\n", - " \"location_timestamp_local\": \"subseq_location_timestamp_local\"\n", - " }),\n", - " on = [\"subseq_vp_idx\"] + road_trip_cols,\n", - " how = \"inner\"\n", - ")\n" + "results = neighbor.add_nearest_neighbor_result(gdf, analysis_date)" ] }, { "cell_type": "code", "execution_count": null, - "id": "b7de2f78-3793-401e-b836-7dc3085b92e9", + "id": "fae3db53-a575-4e5d-810e-b7a81ff9908a", "metadata": {}, "outputs": [], "source": [ - "sjoin_results = pd.read_parquet(\n", - " f\"{SEGMENT_GCS}vp_sjoin/\"\n", - " f\"vp_road_segments_wide_{analysis_date}.parquet\",\n", - " #filters = [[(\"trip_instance_key\", \"in\", test_trips)]]\n", + "results2 = results.copy()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf01b87b-a45b-4e1e-8fbe-c7b4ad7b5237", + "metadata": {}, + "outputs": [], + "source": [ + "PROJECT_CRS = \"EPSG:3310\"\n", + "\n", + "results2 = results2.assign(\n", + " stop_geometry = results2.stop_geometry.to_crs(PROJECT_CRS),\n", + " vp_coords_trio = results2.vp_coords_trio.to_crs(PROJECT_CRS)\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "8163e08f-3bee-45f1-86e2-68c84d60789a", + "id": "ff9567a4-f988-4739-9df3-3806611976f3", "metadata": {}, "outputs": [], "source": [ - "sjoin_results.trip_instance_key.nunique()" + "shapes = helpers.import_scheduled_shapes(\n", + " analysis_date,\n", + " columns = [\"shape_array_key\", \"geometry\"],\n", + " crs = PROJECT_CRS\n", + ").dropna(subset=\"geometry\")\n", + "\n", + "gdf = pd.merge(\n", + " results2,\n", + " shapes.rename(columns = {\"geometry\": \"shape_geometry\"}),\n", + " on = \"shape_array_key\",\n", + " how = \"inner\"\n", + ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "4f7dfc71-0a3d-4a86-90c8-88fd79a3ee94", + "id": "3832f8fe-cfa0-4b3d-a542-04383f3c2ec3", "metadata": {}, "outputs": [], "source": [ - "one_road_id = \"1104475175563\"\n", - "sjoin_results[sjoin_results.linearid==one_road_id]" + "results2.dtypes" ] }, { "cell_type": "code", "execution_count": null, - "id": "787acf45-8592-47d9-9290-01122be138c4", + "id": "cc950435-0bd7-42bd-88de-a9cc1e18974d", "metadata": {}, "outputs": [], "source": [ - "df2.shape" + "import interpolate_stop_arrival" ] }, { "cell_type": "code", "execution_count": null, - "id": "5f68b8a2-52ac-4676-acf9-27fea9ebb438", + "id": "9c0d9294-563a-435a-a4b8-e502bf1856c6", "metadata": {}, "outputs": [], "source": [ - "df2.columns" + "segment_identifier_cols2 = ['linearid', 'mtfcc', \n", + " 'stop_primary_direction', 'segment_sequence']" ] }, { "cell_type": "code", "execution_count": null, - "id": "00c90d27-c2b0-44c4-a959-064d2ea227bd", + "id": "931aa474-dca9-4e4b-ab6a-b576ad5b052a", "metadata": {}, "outputs": [], "source": [ - "def get_stop_arrivals(df: pd.DataFrame) -> pd.DataFrame:\n", - " \"\"\"\n", - " Apply np.interp to df.\n", - " df must be set up so that a given stop is populated with its\n", - " own stop_meters, as well as columns for nearest and subseq \n", - " shape_meters / location_timestamp_local_sec.\n", - " \"\"\"\n", - " x_col = \"shape_meters\"\n", - " y_col = \"location_timestamp_local\"\n", - " \n", - " stop_arrival_series = []\n", - " for row in df.itertuples():\n", + "stop_meters_series = []\n", + "stop_arrival_series = []\n", + "for row in gdf.itertuples():\n", "\n", - " xp = np.asarray([\n", - " getattr(row, f\"nearest_{x_col}\"), \n", - " getattr(row, f\"subseq_{x_col}\")\n", - " ])\n", + " stop_meters, interpolated_arrival = interpolate_stop_arrival.project_points_onto_shape(\n", + " getattr(row, \"stop_geometry\"),\n", + " getattr(row, \"vp_coords_trio\"),\n", + " getattr(row, \"shape_geometry\"),\n", + " getattr(row, \"location_timestamp_local_trio\")\n", + " )\n", "\n", - " yp = np.asarray([\n", - " getattr(row, f\"nearest_{y_col}\"), \n", - " getattr(row, f\"subseq_{y_col}\")\n", - " ]).astype(\"datetime64[s]\").astype(\"float64\")\n", + " stop_meters_series.append(stop_meters)\n", + " stop_arrival_series.append(interpolated_arrival)\n", "\n", - " stop_position = getattr(row, \"road_meters\")\n", - " interpolated_arrival = np.interp(stop_position, xp, yp)\n", - " stop_arrival_series.append(interpolated_arrival)\n", - " \n", - " df = df.assign(\n", - " arrival_time = stop_arrival_series,\n", - " ).astype(\n", - " {\"arrival_time\": \"datetime64[s]\"}\n", - " ).sort_values(\n", - " [\"trip_instance_key\", \n", - " \"linearid\", \"mtfcc\"]\n", - " ).reset_index(drop=True)\n", - " \n", - " return df" + "results2 = gdf.assign(\n", + " stop_meters = stop_meters_series,\n", + " arrival_time = stop_arrival_series,\n", + ")[segment_identifier_cols2 + [\n", + " \"trip_instance_key\", \"shape_array_key\", \n", + " \"stop_type\",\n", + " \"stop_meters\", \"arrival_time\"]\n", + " ].sort_values(\n", + " segment_identifier_cols2 + [\"trip_instance_key\", \"stop_type\", ]\n", + ").reset_index(drop=True)" ] }, { "cell_type": "code", "execution_count": null, - "id": "7c6c8cc7-4c3d-4269-8f40-43a856d7f8b1", + "id": "54bcd8a0-ead6-4ac5-89f2-4cb5291ccda2", "metadata": {}, "outputs": [], "source": [ - "df3 = get_stop_arrivals(df2)" + "grouped_df = results2.groupby(segment_identifier_cols2 + \n", + " [\"trip_instance_key\"])\n", + "\n", + "min_arrival = grouped_df.agg({\"arrival_time\": \"min\"}).reset_index()\n", + "max_arrival = grouped_df.agg({\"arrival_time\": \"max\"}).reset_index()\n", + "\n", + "min_max_arrival = pd.merge(\n", + " min_arrival,\n", + " max_arrival,\n", + " on = segment_identifier_cols2 + [\"trip_instance_key\"]\n", + ").query('arrival_time_x != arrival_time_y')" ] }, { "cell_type": "code", "execution_count": null, - "id": "78c0370c-8c52-4369-bc10-d0713e91f44d", + "id": "9ee89cc4-2a7d-44fd-901a-37ac301898ec", "metadata": {}, "outputs": [], "source": [ - "df3[(df3.subseq_location_timestamp_local > df3.arrival_time) & \n", - " (df3.nearest_location_timestamp_local < df3.arrival_time)].shape" + "results3 = pd.merge(\n", + " results2,\n", + " min_max_arrival[segment_identifier_cols2 + [\"trip_instance_key\"]],\n", + " on = segment_identifier_cols2 + [\"trip_instance_key\"],\n", + " how = \"inner\"\n", + ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "70e79902-c2f6-4dfe-ba6d-69411ea013e7", + "id": "975614bf-ee0b-4a3e-9e5c-02345149d584", "metadata": {}, "outputs": [], "source": [ - "# Something weird is happening here" + "from segment_speed_utils import segment_calcs\n", + "\n", + "results3 = segment_calcs.convert_timestamp_to_seconds(\n", + " results3, [\"arrival_time\"]\n", + ").sort_values(segment_identifier_cols2 + [\"trip_instance_key\"]).reset_index(drop=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a067dea3-67f5-4faf-b5ca-b2eab8ee80b9", + "metadata": {}, + "outputs": [], + "source": [ + "trip_cols = segment_identifier_cols2 + [\"trip_instance_key\"]\n", + "results3 = results3.assign(\n", + " subseq_arrival_time_sec = (results3.groupby(trip_cols, \n", + " observed=True, group_keys=False)\n", + " .arrival_time_sec\n", + " .shift(-1)\n", + " ),\n", + " subseq_stop_meters = (results3.groupby(trip_cols, \n", + " observed=True, group_keys=False)\n", + " .stop_meters\n", + " .shift(-1)\n", + " )\n", + " )" ] }, { "cell_type": "code", "execution_count": null, - "id": "5628d473-5777-4c28-933c-e8736b1a318e", + "id": "05034a5e-cf9d-411d-9293-a1b67387f336", "metadata": {}, "outputs": [], "source": [ - "df3.shape" + "speed = results3.assign(\n", + " meters_elapsed = results3.subseq_stop_meters - results3.stop_meters, \n", + " sec_elapsed = results3.subseq_arrival_time_sec - results3.arrival_time_sec,\n", + ").pipe(\n", + " segment_calcs.derive_speed, \n", + " (\"stop_meters\", \"subseq_stop_meters\"), \n", + " (\"arrival_time_sec\", \"subseq_arrival_time_sec\")\n", + ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "4fb5f201-8abd-40db-9b57-bcbc21d26e27", + "id": "530215e0-bcdc-4b04-a487-1b63ee14034a", "metadata": {}, "outputs": [], "source": [ - "df3[[\"road_meters\", \"nearest_vp_idx\", \"subseq_vp_idx\", \"nearest_shape_meters\",\n", - " \"subseq_shape_meters\",\n", - " \"nearest_location_timestamp_local\",\n", - " \"subseq_location_timestamp_local\", \"arrival_time\"]]" + "speed.dropna().query(\n", + " 'meters_elapsed > 250 & sec_elapsed > 60'\n", + ").speed_mph.describe()" ] }, { "cell_type": "code", "execution_count": null, - "id": "b5f1328b-067a-484b-bbf3-87e16ac5c12d", + "id": "7a3230f5-57d6-41f2-9d3b-8e838ffc470c", "metadata": {}, "outputs": [], "source": [] diff --git a/rt_segment_speeds/segment_speed_utils/project_vars.py b/rt_segment_speeds/segment_speed_utils/project_vars.py index 79a2fec8c..1dd548ae3 100644 --- a/rt_segment_speeds/segment_speed_utils/project_vars.py +++ b/rt_segment_speeds/segment_speed_utils/project_vars.py @@ -18,4 +18,4 @@ PROJECT_CRS = "EPSG:3310" CONFIG_PATH = "./config.yml" ROAD_SEGMENT_METERS = 1_000 -SEGMENT_TYPES = ["stop_segments", "rt_stop_times"] \ No newline at end of file +SEGMENT_TYPES = ["stop_segments", "rt_stop_times"]