diff --git a/gtfs_funnel/Makefile b/gtfs_funnel/Makefile index de9530582..a60b094da 100644 --- a/gtfs_funnel/Makefile +++ b/gtfs_funnel/Makefile @@ -1,4 +1,4 @@ -download_gtfs_data_one_day: +download_gtfs_data: # make sure to update update_vars.py for dates to download python download_trips.py python download_stops.py @@ -10,4 +10,6 @@ download_gtfs_data_one_day: preprocess: python stop_times_with_direction.py python vp_keep_usable.py - python vp_direction.py \ No newline at end of file + python vp_direction.py + python cleanup.py + \ No newline at end of file diff --git a/gtfs_funnel/README.md b/gtfs_funnel/README.md index b68eae8d1..478a80a8d 100644 --- a/gtfs_funnel/README.md +++ b/gtfs_funnel/README.md @@ -4,4 +4,4 @@ Use `update_vars` and input one or several days to download. 1. **Schedule data**: download data for [trips](./download_trips.py), [stops](./download_stops.py), [shapes](./download_shapes.py), and [stop times](./download_stop_times.py) and cache parquets in GCS 1. **Vehicle positions data**: download [RT vehicle positions](./download_vehicle_positions.py) -1. Use the `Makefile` and download schedule and RT data. In terminal: `make download_gtfs_data_one_day` \ No newline at end of file +1. Use the `Makefile` and download schedule and RT data. In terminal: `make download_gtfs_data` \ No newline at end of file diff --git a/gtfs_funnel/logs/find_vp_direction.log b/gtfs_funnel/logs/find_vp_direction.log index b37b8fa7b..68d0d5bb0 100644 --- a/gtfs_funnel/logs/find_vp_direction.log +++ b/gtfs_funnel/logs/find_vp_direction.log @@ -9,3 +9,52 @@ 2023-10-12 11:21:52.344 | INFO | __main__::176 - export vp direction: 0:05:29.299659 2023-10-12 11:23:14.557 | INFO | __main__::186 - export usable vp with direction: 0:01:22.212409 2023-10-12 11:23:14.558 | INFO | __main__::187 - execution time: 0:06:51.512068 +2023-10-19 10:58:26.750 | INFO | __main__::184 - Analysis date: 2023-09-13 +2023-10-19 11:01:08.229 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:41.478652 +2023-10-19 11:04:05.857 | INFO | __main__:attach_prior_vp_add_direction:125 - np vectorize arrays for direction: 0:02:57.627657 +2023-10-19 11:04:25.727 | INFO | __main__::191 - export vp direction: 0:05:58.976162 +2023-10-19 11:05:26.722 | INFO | __main__::196 - export usable vp with direction: 0:01:00.995301 +2023-10-19 11:05:26.723 | INFO | __main__::197 - execution time: 0:06:59.971463 +2023-10-19 11:05:26.724 | INFO | __main__::184 - Analysis date: 2023-10-11 +2023-10-19 11:08:10.013 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:43.288529 +2023-10-19 11:10:57.486 | INFO | __main__:attach_prior_vp_add_direction:125 - np vectorize arrays for direction: 0:02:47.473068 +2023-10-19 11:11:17.743 | INFO | __main__::191 - export vp direction: 0:05:51.017843 +2023-10-19 11:12:14.833 | INFO | __main__::196 - export usable vp with direction: 0:00:57.090460 +2023-10-19 11:12:14.834 | INFO | __main__::197 - execution time: 0:06:48.108303 +2023-10-19 11:22:48.570 | INFO | __main__::185 - Analysis date: 2023-03-15 +2023-10-19 11:44:08.820 | INFO | __main__::185 - Analysis date: 2023-03-15 +2023-10-19 11:46:41.490 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:32.668923 +2023-10-19 11:49:18.408 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:36.918447 +2023-10-19 11:49:36.829 | INFO | __main__::192 - export vp direction: 0:05:28.008511 +2023-10-19 11:50:34.563 | INFO | __main__::197 - export usable vp with direction: 0:00:57.733907 +2023-10-19 11:50:34.565 | INFO | __main__::198 - execution time: 0:06:25.742418 +2023-10-19 11:50:34.566 | INFO | __main__::185 - Analysis date: 2023-04-12 +2023-10-19 11:53:00.392 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:25.825681 +2023-10-19 11:55:43.433 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:43.040656 +2023-10-19 11:56:02.076 | INFO | __main__::192 - export vp direction: 0:05:27.509401 +2023-10-19 11:56:58.366 | INFO | __main__::197 - export usable vp with direction: 0:00:56.290053 +2023-10-19 11:56:58.368 | INFO | __main__::198 - execution time: 0:06:23.799454 +2023-10-19 11:56:58.368 | INFO | __main__::185 - Analysis date: 2023-05-17 +2023-10-19 11:59:23.853 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:25.485009 +2023-10-19 12:02:10.887 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:47.034093 +2023-10-19 12:02:28.048 | INFO | __main__::192 - export vp direction: 0:05:29.680081 +2023-10-19 12:03:24.619 | INFO | __main__::197 - export usable vp with direction: 0:00:56.570424 +2023-10-19 12:03:24.620 | INFO | __main__::198 - execution time: 0:06:26.250505 +2023-10-19 12:03:24.620 | INFO | __main__::185 - Analysis date: 2023-06-14 +2023-10-19 12:05:48.202 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:23.581493 +2023-10-19 12:08:28.397 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:40.195186 +2023-10-19 12:08:45.600 | INFO | __main__::192 - export vp direction: 0:05:20.979952 +2023-10-19 12:09:41.253 | INFO | __main__::197 - export usable vp with direction: 0:00:55.653037 +2023-10-19 12:09:41.254 | INFO | __main__::198 - execution time: 0:06:16.632989 +2023-10-19 12:09:41.254 | INFO | __main__::185 - Analysis date: 2023-07-12 +2023-10-19 12:12:23.972 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:42.717672 +2023-10-19 12:15:14.864 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:50.891639 +2023-10-19 12:15:32.063 | INFO | __main__::192 - export vp direction: 0:05:50.808333 +2023-10-19 12:16:37.518 | INFO | __main__::197 - export usable vp with direction: 0:01:05.455225 +2023-10-19 12:16:37.519 | INFO | __main__::198 - execution time: 0:06:56.263558 +2023-10-19 12:16:37.519 | INFO | __main__::185 - Analysis date: 2023-08-15 +2023-10-19 12:19:21.523 | INFO | __main__:attach_prior_vp_add_direction:89 - persist vp gddf: 0:02:44.003497 +2023-10-19 12:22:02.828 | INFO | __main__:attach_prior_vp_add_direction:126 - np vectorize arrays for direction: 0:02:41.304747 +2023-10-19 12:22:21.129 | INFO | __main__::192 - export vp direction: 0:05:43.609000 +2023-10-19 12:23:18.532 | INFO | __main__::197 - export usable vp with direction: 0:00:57.403234 +2023-10-19 12:23:18.533 | INFO | __main__::198 - execution time: 0:06:41.012234 diff --git a/gtfs_funnel/logs/usable_rt_vp.log b/gtfs_funnel/logs/usable_rt_vp.log index dc9c77395..892d0700b 100644 --- a/gtfs_funnel/logs/usable_rt_vp.log +++ b/gtfs_funnel/logs/usable_rt_vp.log @@ -18,3 +18,19 @@ 2023-10-13 10:38:23.748 | INFO | __main__:attach_prior_vp_add_direction:88 - persist vp gddf: 0:04:24.538800 2023-10-13 10:39:00.233 | INFO | __main__:attach_prior_vp_add_direction:114 - np vectorize arrays for direction: 0:00:36.484908 2023-10-13 10:39:07.270 | INFO | __main__::181 - export vp direction: 0:05:08.060546 +2023-10-19 10:21:27.377 | INFO | __main__::161 - Analysis date: 2023-09-13 +2023-10-19 10:22:59.586 | INFO | __main__::171 - pare down vp: 0:01:32.208062 +2023-10-19 10:22:59.586 | INFO | __main__::161 - Analysis date: 2023-10-11 +2023-10-19 10:24:21.940 | INFO | __main__::171 - pare down vp: 0:02:54.562217 +2023-10-19 11:34:28.126 | INFO | __main__::161 - Analysis date: 2023-03-15 +2023-10-19 11:36:28.520 | INFO | __main__::171 - pare down vp: 0:02:00.393163 +2023-10-19 11:36:28.521 | INFO | __main__::161 - Analysis date: 2023-04-12 +2023-10-19 11:37:58.174 | INFO | __main__::171 - pare down vp: 0:03:30.047838 +2023-10-19 11:37:58.177 | INFO | __main__::161 - Analysis date: 2023-05-17 +2023-10-19 11:39:35.480 | INFO | __main__::171 - pare down vp: 0:05:07.353337 +2023-10-19 11:39:35.481 | INFO | __main__::161 - Analysis date: 2023-06-14 +2023-10-19 11:41:06.197 | INFO | __main__::171 - pare down vp: 0:06:38.070240 +2023-10-19 11:41:06.197 | INFO | __main__::161 - Analysis date: 2023-07-12 +2023-10-19 11:42:34.062 | INFO | __main__::171 - pare down vp: 0:08:05.936015 +2023-10-19 11:42:34.063 | INFO | __main__::161 - Analysis date: 2023-08-15 +2023-10-19 11:43:55.229 | INFO | __main__::171 - pare down vp: 0:09:27.102851 diff --git a/gtfs_funnel/stop_times_with_direction.py b/gtfs_funnel/stop_times_with_direction.py index 94e75383d..57a4679fb 100644 --- a/gtfs_funnel/stop_times_with_direction.py +++ b/gtfs_funnel/stop_times_with_direction.py @@ -11,7 +11,7 @@ from calitp_data_analysis import utils from shared_utils import rt_utils -from segment_speed_utils import helpers +from segment_speed_utils import helpers, wrangle_shapes from segment_speed_utils.project_vars import RT_SCHED_GCS, PROJECT_CRS @@ -139,12 +139,25 @@ def assemble_stop_times_with_direction(analysis_date: str): prior_geom = other_stops.prior_geometry.compute() current_geom = other_stops.geometry.compute() - + + # Create a column with readable direction like westbound, eastbound, etc stop_direction = np.vectorize( rt_utils.primary_cardinal_direction)(prior_geom, current_geom) + # Create a column with normalized direction vector + # Add this because some bus can travel in southeasterly direction, + # but it's categorized as southbound or eastbound depending + # on whether south or east value is larger. + # Keeping the normalized x/y direction allows us to distinguish a bit better later + direction_vector = wrangle_shapes.get_direction_vector(prior_geom, current_geom) + normalized_vector = wrangle_shapes.get_normalized_vector(direction_vector) + other_stops_no_geom = other_stops_no_geom.assign( - stop_primary_direction = stop_direction + stop_primary_direction = stop_direction, + # since we can't save tuples, let's assign x, y normalized direction vector + # as 2 columns + stop_dir_xnorm = normalized_vector[0], + stop_dir_ynorm = normalized_vector[1] ) scheduled_stop_times_with_direction = pd.concat( @@ -165,8 +178,6 @@ def assemble_stop_times_with_direction(analysis_date: str): f"stop_times_direction_{analysis_date}" ) - - end = datetime.datetime.now() print(f"execution time: {end - start}") diff --git a/gtfs_funnel/update_vars.py b/gtfs_funnel/update_vars.py index 54a47fdd6..7e64b39e0 100644 --- a/gtfs_funnel/update_vars.py +++ b/gtfs_funnel/update_vars.py @@ -2,7 +2,9 @@ from pathlib import Path from shared_utils import rt_dates -months = ["sep", "oct"] +months = [ + "sep", "oct" +] analysis_date_list = [ rt_dates.DATES[f"{m}2023"] for m in months diff --git a/gtfs_funnel/vp_direction.py b/gtfs_funnel/vp_direction.py index 28a5c0976..c573454ea 100644 --- a/gtfs_funnel/vp_direction.py +++ b/gtfs_funnel/vp_direction.py @@ -18,11 +18,11 @@ from loguru import logger from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils import helpers, segment_calcs +from segment_speed_utils import helpers, segment_calcs, wrangle_shapes from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS from shared_utils import rt_utils -fs = gcsfs.GCSFileSystem() +fs = gcsfs.GCSFileSystem() def attach_prior_vp_add_direction( analysis_date: str, @@ -56,7 +56,7 @@ def attach_prior_vp_add_direction( # calculated in projected CRS vp_gddf = dg.from_dask_dataframe( vp2, - geometry = dg.points_from_xy(vp2, x="x", y="y", crs=WGS84) + geometry = dg.points_from_xy(vp2, x="x", y="y") ).set_crs(WGS84).to_crs(PROJECT_CRS) vp_ddf = vp_gddf.assign( @@ -81,34 +81,46 @@ def attach_prior_vp_add_direction( ).query('prior_vp_idx >= min_vp_idx')[ ["vp_idx", "prior_x", "prior_y", "x", "y"] ].reset_index(drop=True) - - full_df = full_df.persist() + + keep_cols = ["vp_idx", "prior_x", "prior_y", "x", "y"] + full_df = full_df[keep_cols].compute() time1 = datetime.datetime.now() logger.info(f"persist vp gddf: {time1 - time0}") - def column_into_array(df: dd.DataFrame, col: str) -> np.ndarray: - return df[col].compute().to_numpy() - - vp_indices = column_into_array(full_df, "vp_idx") - prior_geom_x = column_into_array(full_df, "prior_x") - prior_geom_y = column_into_array(full_df, "prior_y") - current_geom_x = column_into_array(full_df, "x") - current_geom_y = column_into_array(full_df, "y") + vp_indices = full_df.vp_idx.to_numpy() + distance_east = full_df.x - full_df.prior_x + distance_north = full_df.y - full_df.prior_y - distance_east = current_geom_x - prior_geom_x - distance_north = current_geom_y - prior_geom_y + # Get the normalized direction vector split into x and y columns + normalized_vector = wrangle_shapes.get_normalized_vector( + (distance_east, distance_north) + ) - direction_result = np.vectorize( - rt_utils.cardinal_definition_rules)(distance_east, distance_north) - # Stack our results and convert to df - results_array = np.column_stack((vp_indices, direction_result)) + results_array = np.column_stack(( + vp_indices, + normalized_vector[0], + normalized_vector[1] + )) vp_direction = pd.DataFrame( results_array, - columns = ["vp_idx", "vp_primary_direction"] - ).astype({"vp_idx": "int64"}) + columns = ["vp_idx", "vp_dir_xnorm", "vp_dir_ynorm"] + ).astype({ + "vp_idx": "int64", + "vp_dir_xnorm": "float", + "vp_dir_ynorm": "float" + }) + + # Get a readable direction (westbound, eastbound) + vp_direction = vp_direction.assign( + vp_primary_direction = vp_direction.apply( + lambda x: + rt_utils.cardinal_definition_rules(x.vp_dir_xnorm, x.vp_dir_ynorm), + axis=1 + ) + ) time2 = datetime.datetime.now() logger.info(f"np vectorize arrays for direction: {time2 - time1}") @@ -168,7 +180,6 @@ def add_direction_to_usable_vp( format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO") - for analysis_date in analysis_date_list: logger.info(f"Analysis date: {analysis_date}") diff --git a/rt_scheduled_v_ran/01_scheduled_stop_times.ipynb b/rt_scheduled_v_ran/01_scheduled_stop_times.ipynb index fd8745210..88e5dd029 100644 --- a/rt_scheduled_v_ran/01_scheduled_stop_times.ipynb +++ b/rt_scheduled_v_ran/01_scheduled_stop_times.ipynb @@ -13,7 +13,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "bf867567-ec40-47a0-8938-f0b8382fcd43", "metadata": {}, "outputs": [], @@ -31,7 +31,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "id": "fd11f4d2-2cdd-42b7-9030-f9e699b4dcfa", "metadata": {}, "outputs": [], @@ -47,22 +47,10 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "bdaf55b7-13c6-4b58-b187-4cead10238dc", "metadata": {}, - "outputs": [ - { - "ename": "AssertionError", - "evalue": "", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mAssertionError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[3], line 6\u001b[0m\n\u001b[1;32m 3\u001b[0m n_rows \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mlen\u001b[39m(scheduled_stop_times)\n\u001b[1;32m 4\u001b[0m expected_unique_rows \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mlen\u001b[39m(scheduled_stop_times[trip_stop_cols]\u001b[38;5;241m.\u001b[39mdrop_duplicates())\n\u001b[0;32m----> 6\u001b[0m \u001b[38;5;28;01massert\u001b[39;00m n_rows \u001b[38;5;241m==\u001b[39m expected_unique_rows \n", - "\u001b[0;31mAssertionError\u001b[0m: " - ] - } - ], + "outputs": [], "source": [ "trip_stop_cols = [\"trip_instance_key\", \"stop_id\", \"stop_sequence\"]\n", "\n", @@ -82,188 +70,20 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "id": "d3bef815-dfa3-46bb-bb80-e99a6c1e2752", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "120" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "n_rows - expected_unique_rows" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "8b2ba568-5edf-47e4-a0de-89c3e21909ad", "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
trip_instance_keystop_idstop_sequencegeometrystop_primary_direction
86407044c988a45bd7a196e569ef66b4a6b55265737160POINT (248215.002 -382920.471)Southbound
86407144c988a45bd7a196e569ef66b4a6b55265737160POINT (248215.002 -382920.471)Unknown
86407244c988a45bd7a196e569ef66b4a6b55231188751POINT (250743.139 -382426.021)Southbound
86407344c988a45bd7a196e569ef66b4a6b55231188751POINT (250743.139 -382426.021)Eastbound
86407444c988a45bd7a196e569ef66b4a6b55240286131POINT (246428.935 -422741.402)Southbound
..................
2653401d2d28bff624c982d19050356737400d065737162POINT (248215.002 -382920.471)Northbound
2653402d2d28bff624c982d19050356737400d031234902POINT (247931.788 -431248.536)Southbound
2653403d2d28bff624c982d19050356737400d031234902POINT (247931.788 -431248.536)Unknown
2653404d2d28bff624c982d19050356737400d031230823POINT (249410.572 -431475.496)Southbound
2653405d2d28bff624c982d19050356737400d031230823POINT (249410.572 -431475.496)Eastbound
\n", - "

240 rows × 5 columns

\n", - "
" - ], - "text/plain": [ - " trip_instance_key stop_id stop_sequence \\\n", - "864070 44c988a45bd7a196e569ef66b4a6b552 6573716 0 \n", - "864071 44c988a45bd7a196e569ef66b4a6b552 6573716 0 \n", - "864072 44c988a45bd7a196e569ef66b4a6b552 3118875 1 \n", - "864073 44c988a45bd7a196e569ef66b4a6b552 3118875 1 \n", - "864074 44c988a45bd7a196e569ef66b4a6b552 4028613 1 \n", - "... ... ... ... \n", - "2653401 d2d28bff624c982d19050356737400d0 6573716 2 \n", - "2653402 d2d28bff624c982d19050356737400d0 3123490 2 \n", - "2653403 d2d28bff624c982d19050356737400d0 3123490 2 \n", - "2653404 d2d28bff624c982d19050356737400d0 3123082 3 \n", - "2653405 d2d28bff624c982d19050356737400d0 3123082 3 \n", - "\n", - " geometry stop_primary_direction \n", - "864070 POINT (248215.002 -382920.471) Southbound \n", - "864071 POINT (248215.002 -382920.471) Unknown \n", - "864072 POINT (250743.139 -382426.021) Southbound \n", - "864073 POINT (250743.139 -382426.021) Eastbound \n", - "864074 POINT (246428.935 -422741.402) Southbound \n", - "... ... ... \n", - "2653401 POINT (248215.002 -382920.471) Northbound \n", - "2653402 POINT (247931.788 -431248.536) Southbound \n", - "2653403 POINT (247931.788 -431248.536) Unknown \n", - "2653404 POINT (249410.572 -431475.496) Southbound \n", - "2653405 POINT (249410.572 -431475.496) Eastbound \n", - "\n", - "[240 rows x 5 columns]" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "scheduled_stop_times[\n", " scheduled_stop_times.duplicated(subset=trip_stop_cols, \n", @@ -272,120 +92,10 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "id": "deacadab-d329-498c-bc79-560da9afc126", "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
feed_keyschedule_gtfs_dataset_keynameregional_feed_typeservice_datetrip_start_date_pacifictrip_idtrip_instance_keyroute_keyroute_id...route_descdirection_idshape_array_keyshape_idtrip_first_departure_datetime_pacifictrip_last_arrival_datetime_pacificservice_hourstrip_start_date_local_tztrip_first_departure_datetime_local_tztrip_last_arrival_datetime_local_tz
08caabd5e9263c45f86e5bb3bbfd36c128eecb796518dafd3c1b971a99f8b8252Victor Valley GMV ScheduleNone2023-09-132023-09-131510244c988a45bd7a196e569ef66b4a6b552ca911b413b9186fe599863322ca89b273220...Barstow - Victorville - San Bernardino0.0764e97bc51e230a94dac11b8f05a8b8a193122023-09-13 08:00:002023-09-13 09:54:001.92023-09-132023-09-13 08:00:002023-09-13 09:54:00
\n", - "

1 rows × 23 columns

\n", - "
" - ], - "text/plain": [ - " feed_key schedule_gtfs_dataset_key \\\n", - "0 8caabd5e9263c45f86e5bb3bbfd36c12 8eecb796518dafd3c1b971a99f8b8252 \n", - "\n", - " name regional_feed_type service_date \\\n", - "0 Victor Valley GMV Schedule None 2023-09-13 \n", - "\n", - " trip_start_date_pacific trip_id trip_instance_key \\\n", - "0 2023-09-13 15102 44c988a45bd7a196e569ef66b4a6b552 \n", - "\n", - " route_key route_id ... \\\n", - "0 ca911b413b9186fe599863322ca89b27 3220 ... \n", - "\n", - " route_desc direction_id \\\n", - "0 Barstow - Victorville - San Bernardino 0.0 \n", - "\n", - " shape_array_key shape_id \\\n", - "0 764e97bc51e230a94dac11b8f05a8b8a 19312 \n", - "\n", - " trip_first_departure_datetime_pacific trip_last_arrival_datetime_pacific \\\n", - "0 2023-09-13 08:00:00 2023-09-13 09:54:00 \n", - "\n", - " service_hours trip_start_date_local_tz \\\n", - "0 1.9 2023-09-13 \n", - "\n", - " trip_first_departure_datetime_local_tz trip_last_arrival_datetime_local_tz \n", - "0 2023-09-13 08:00:00 2023-09-13 09:54:00 \n", - "\n", - "[1 rows x 23 columns]" - ] - }, - "execution_count": 6, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "one_trip = \"44c988a45bd7a196e569ef66b4a6b552\"\n", "\n", @@ -400,7 +110,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "id": "4ad9646b-3ff7-43e2-bcbc-df2300224d43", "metadata": {}, "outputs": [], @@ -420,24 +130,10 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "id": "f5af099f-ae4f-4ba4-b699-8800acfecc90", "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
Make this Notebook Trusted to load map: File -> Trust Notebook
" - ], - "text/plain": [ - "" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# Plot these on a map\n", "scheduled_stop_times[\n", @@ -448,12 +144,103 @@ " )" ] }, + { + "cell_type": "markdown", + "id": "47699c33-6404-4ad6-9f3c-2c5b20293998", + "metadata": {}, + "source": [ + "## Spot checking" + ] + }, { "cell_type": "code", "execution_count": null, "id": "c13cfe7b-2a90-4f21-8791-af38f910d2e2", "metadata": {}, "outputs": [], + "source": [ + "ok_stop_times = scheduled_stop_times[~scheduled_stop_times.duplicated(\n", + " trip_stop_cols)]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e34e70d8-abaa-4ba4-bc80-246e9c27b2cf", + "metadata": {}, + "outputs": [], + "source": [ + "sample_10_trips = ok_stop_times[\n", + " [\"trip_instance_key\"]].drop_duplicates().sample(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10d8cc8a-2352-462c-b411-5ac6b6284905", + "metadata": {}, + "outputs": [], + "source": [ + "sample_10_trips.trip_instance_key.unique()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "80a1c73a-8bef-43f9-829d-4dfa321f7392", + "metadata": {}, + "outputs": [], + "source": [ + "sample_10_trips_list = ['81e873ee8252a8a0877cc983e57a6b29',\n", + " '6189b77fba24e1ecc69f7da11c643434',\n", + " '65d8444657dd5902ca05d7bda31c6922',\n", + " 'c41dc1d746e48f2b47dbdce466c0d221',\n", + " '0db09e8871638928aa84611685de44bd',\n", + " 'd276f8f018790f8bc378a785063a08ad',\n", + " '10a7d41d663609a4488f946d638281ad',\n", + " 'ffee4aee8f3d7693429e7a342296b8fc',\n", + " '18160e8844c2870cd823587a287a8b71',\n", + " 'd3f339b7bd23d62ff231a7c1107545f1']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "869992ae-371e-4a00-a3b5-81c8ca897be1", + "metadata": {}, + "outputs": [], + "source": [ + "def plot_stops_by_direction(gdf: gpd.GeoDataFrame, one_trip: str):\n", + " gdf2 = gdf[gdf.trip_instance_key==one_trip].reset_index(drop=True)\n", + " \n", + " print(f\"trip_instance_key: {one_trip}\")\n", + " \n", + " m = gdf2.explore(\n", + " \"stop_primary_direction\", \n", + " categorical = True,\n", + " tiles = \"CartoDB Positron\"\n", + " )\n", + " \n", + " display(m) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cd77f70b-2c6f-4454-914b-8beb766dea64", + "metadata": {}, + "outputs": [], + "source": [ + "for t in sample_10_trips_list:\n", + " plot_stops_by_direction(ok_stop_times, t)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26aebd15-7660-4723-a69a-a036c2a8cd9c", + "metadata": {}, + "outputs": [], "source": [] } ], diff --git a/rt_segment_speeds/23_project_all_vp_explore.ipynb b/rt_segment_speeds/23_project_all_vp_explore.ipynb new file mode 100644 index 000000000..190bc613d --- /dev/null +++ b/rt_segment_speeds/23_project_all_vp_explore.ipynb @@ -0,0 +1,280 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "c016d7f0-6e04-4707-9d48-1a6b14bfd50c", + "metadata": {}, + "source": [ + "# Explore `projected_shape_meters`\n", + "\n", + "* Now that `map_partitions` gets us `shape_meters` in 5 min, let's use it as much as we can\n", + "* Spot check that if `loop_or_inlining==0`, we can just use this and go on\n", + "* `loop_or_inlining==1` can pose challenges, so figure out a way to bring in direction (either through normalized x, y direction vector) or readable direction\n", + "* Loop or inlining segments have proven hard to cut successfully overall, and the last bit may never be fixed. If we can use direction, it may be a bit more robust.\n", + "* Speed can be calculated either within a segment or using endpoints, and we should opt for a simpler, streamlined approach that's performant. \n", + "* For normal shapes, we might be able to fill in the RT `stop_times` table while we're at it." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "73ced776-d521-4467-beb4-8d67d147aea4", + "metadata": {}, + "outputs": [], + "source": [ + "import altair as alt\n", + "import dask.dataframe as dd\n", + "import geopandas as gpd\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from segment_speed_utils import helpers\n", + "from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS\n", + "from shared_utils import rt_dates\n", + "\n", + "analysis_date = rt_dates.DATES[\"sep2023\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c289f689-c0fe-4ea2-9e14-4e80f77566ae", + "metadata": {}, + "outputs": [], + "source": [ + "# Get RT trips\n", + "rt_trips = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}\",\n", + " columns = [\"trip_instance_key\"]\n", + ").drop_duplicates()\n", + "\n", + "# Find the shape_array_key for RT trips\n", + "trip_to_shape = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"shape_array_key\"],\n", + " get_pandas = True\n", + ").merge(\n", + " rt_trips,\n", + " on = \"trip_instance_key\",\n", + " how = \"inner\"\n", + ")\n", + "\n", + "# Find whether it's loop or inlining\n", + "shapes_loop_inlining = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}stop_segments_{analysis_date}.parquet\",\n", + " columns = [\"shape_array_key\", \"loop_or_inlining\"]\n", + ").drop_duplicates().merge(\n", + " trip_to_shape,\n", + " on = \"shape_array_key\",\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1b0ba620-e24e-4197-b665-c6111ad03307", + "metadata": {}, + "outputs": [], + "source": [ + "ok_trips = (shapes_loop_inlining[\n", + " shapes_loop_inlining.loop_or_inlining==0]\n", + " .sample(25).trip_instance_key.tolist()\n", + " )\n", + "\n", + "ok_trips" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b95e35da-cf34-4696-80a1-0ab3a9e99ebf", + "metadata": {}, + "outputs": [], + "source": [ + "loopy_trips = (shapes_loop_inlining[\n", + " shapes_loop_inlining.loop_or_inlining==1]\n", + " .sample(25).trip_instance_key.tolist()\n", + " )\n", + "\n", + "loopy_trips" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "276cab58-94ff-4f72-a651-88a4f1ea890a", + "metadata": {}, + "outputs": [], + "source": [ + "subset_trips = ok_trips + loopy_trips\n", + "\n", + "projected_shape_meters = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet\"\n", + ")\n", + "\n", + "vp = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}\",\n", + " filters = [[(\"trip_instance_key\", \"in\", subset_trips)]]\n", + ").merge(\n", + " projected_shape_meters,\n", + " on = \"vp_idx\",\n", + " how = \"inner\"\n", + ").drop(columns = \"location_timestamp\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9d8f339f-f832-45f2-9f2c-f96125799a38", + "metadata": {}, + "outputs": [], + "source": [ + "def plot_shape_meters(df: pd.DataFrame, one_trip: str):\n", + " \"\"\"\n", + " Plot how the projected shape meters looks for one trip.\n", + " \n", + " vp_idx is ordered by timestamp, use as x.\n", + " \"\"\"\n", + " subset_df = df[df.trip_instance_key==one_trip]\n", + " \n", + " print(f\"{subset_df.gtfs_dataset_name.iloc[0]}\")\n", + " print(f\"trip_instance_key: {one_trip}, trip_id: {subset_df.trip_id.iloc[0]}\")\n", + " \n", + " chart = (alt.Chart(subset_df)\n", + " .mark_line()\n", + " .encode(\n", + " x=\"vp_idx\",\n", + " y=\"shape_meters:Q\"\n", + " )\n", + " )\n", + " \n", + " display(chart)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6aaf6652-307b-41ba-9059-72ae010b7928", + "metadata": {}, + "outputs": [], + "source": [ + "for t in ok_trips:\n", + " plot_shape_meters(vp, t)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "284fb053-ffc9-411a-ab46-82f6cfb08af1", + "metadata": {}, + "outputs": [], + "source": [ + "for t in loopy_trips:\n", + " plot_shape_meters(vp, t)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ced6dbff-008e-4ccf-b284-cc1d79d3e801", + "metadata": {}, + "outputs": [], + "source": [ + "speed = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}speeds_comparison_{analysis_date}.parquet\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8243b13f-323b-4b0f-8fff-75cd2837d165", + "metadata": {}, + "outputs": [], + "source": [ + "trip = \"10096002510743-JUNE23\"\n", + "speed[speed.trip_id==trip]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "394956b9-a02c-4b7d-b604-3775c4e55a51", + "metadata": {}, + "outputs": [], + "source": [ + "metro_trip = helpers.import_scheduled_trips(\n", + " analysis_date,\n", + " columns = [\"trip_instance_key\", \"trip_id\"],\n", + " filters = [[(\"trip_id\", \"==\", trip)]],\n", + " get_pandas = True\n", + ")\n", + "\n", + "trip_key = metro_trip.trip_instance_key.iloc[0]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5c5b8f1d-6571-4072-a9ea-dc1c36173453", + "metadata": {}, + "outputs": [], + "source": [ + "vp_pared = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_pared_stops_{analysis_date}\",\n", + " filters = [[(\"trip_instance_key\", \"==\", trip_key)]])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c83f420e-99a2-40d8-b3ef-5d13f68b4226", + "metadata": {}, + "outputs": [], + "source": [ + "# 27:13, 27:56 * 29:14, 29:52, * 30:13\n", + "vp_pared[vp_pared.stop_sequence==36]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "145ac1ce-93a7-4abb-b72b-977541a98163", + "metadata": {}, + "outputs": [], + "source": [ + "metro_trip" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a950ad3-a1bd-4f6f-b7ff-6ee95ffbf8e2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rt_segment_speeds/24_plot_interpolation_results.ipynb b/rt_segment_speeds/24_plot_interpolation_results.ipynb new file mode 100644 index 000000000..917f0e824 --- /dev/null +++ b/rt_segment_speeds/24_plot_interpolation_results.ipynb @@ -0,0 +1,312 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cd8110e6-c29e-4e00-af33-367c1f9d9e63", + "metadata": {}, + "source": [ + "## Plot `nearest_vp_to_stop` and `interpolate_stop_arrival` results" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "05cdbdbe-5e9b-486f-8878-0c11819ed4eb", + "metadata": {}, + "outputs": [], + "source": [ + "import dask.dataframe as dd\n", + "import folium\n", + "import geopandas as gpd\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from segment_speed_utils import helpers, segment_calcs\n", + "from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS\n", + "from shared_utils import rt_dates\n", + "\n", + "analysis_date = rt_dates.DATES[\"sep2023\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5f95a14b-a169-4183-9626-f1c1add93263", + "metadata": {}, + "outputs": [], + "source": [ + "test_trips = [\n", + " 'e23a83e19843f71f6599cb302e23ae6e',\n", + " '0adf85083a66f635dd3edcbdf0a5d8da',\n", + " '73c4533f88c0759a4817902ae45df1c0',\n", + " '70ec3122f3971fd94a50402f76b6336c',\n", + " '1a7599df4fcd547d9b9c423345c08a0f',\n", + " '2f567724fe306d15bd213c913f47027e',\n", + " 'e029d4c256171e2e476a4cad574f6685',\n", + " '9a4c7a548deb282384e63bf98ac991d7',\n", + " 'db3ce71b08df1598db06615d7ed0b77f',\n", + " '01365dc998719fc064b259ba4c1476de'\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3b8aab4-ac9b-4659-b605-fe6ab68f88c6", + "metadata": {}, + "outputs": [], + "source": [ + "stop_arrivals_interp = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}stop_arrivals_{analysis_date}.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f4251c9-d112-440a-be6b-a26f509e3462", + "metadata": {}, + "outputs": [], + "source": [ + "stop_arrivals_interp = segment_calcs.convert_timestamp_to_seconds(\n", + " stop_arrivals_interp, \n", + " [\"arrival_time\"]\n", + ").drop(columns = [\n", + " \"nearest_location_timestamp_local\", \n", + " \"subseq_location_timestamp_local\", \n", + " \"arrival_time\"\n", + "])" + ] + }, + { + "cell_type": "markdown", + "id": "2449e28e-215d-4559-88a9-a25e56d0c325", + "metadata": {}, + "source": [ + "### Merge in interpolated stop arrivals with vp and stop geometry" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b634687-1bf2-4a2b-8bd0-b82f02d6bdc3", + "metadata": {}, + "outputs": [], + "source": [ + "vp = pd.read_parquet(\n", + " f\"{SEGMENT_GCS}vp_usable_{analysis_date}/\",\n", + " filters = [[(\"trip_instance_key\", \"in\", test_trips)]],\n", + " columns = [\"vp_idx\", \"x\", \"y\", \"location_timestamp_local\"]\n", + ").pipe(segment_calcs.convert_timestamp_to_seconds, \n", + " [\"location_timestamp_local\"]).drop(columns = \"location_timestamp_local\")\n", + "\n", + "vp_gdf = gpd.GeoDataFrame(\n", + " vp,\n", + " geometry = gpd.points_from_xy(vp.x, vp.y),\n", + " crs = \"EPSG:4326\"\n", + ").to_crs(PROJECT_CRS).drop(columns = [\"x\", \"y\"])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47c409bd-d53a-4d95-bf4c-2753a7d52724", + "metadata": {}, + "outputs": [], + "source": [ + "stops_projected = gpd.read_parquet(\n", + " f\"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet\",\n", + " columns = [\"shape_array_key\", \"stop_sequence\", \"stop_id\", \n", + " \"stop_geometry\", \n", + " \"loop_or_inlining\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "962795df-cf2c-4d09-81d2-6c624c254afb", + "metadata": {}, + "outputs": [], + "source": [ + "# Merge selected vp with stops_projected\n", + "gdf = pd.merge(\n", + " stops_projected,\n", + " stop_arrivals_interp,\n", + " on = [\"shape_array_key\", \"stop_sequence\", \"stop_id\"],\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fdd1746-d3b4-4085-a120-3a3875a7e105", + "metadata": {}, + "outputs": [], + "source": [ + "gdf2 = pd.merge(\n", + " gdf,\n", + " vp_gdf.rename(columns = {\n", + " \"vp_idx\": \"nearest_vp_idx\",\n", + " \"location_timestamp_local_sec\": \"nearest_sec\",\n", + " \"geometry\": \"nearest_vp_geometry\"\n", + " }),\n", + " on = \"nearest_vp_idx\",\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "75220a58-63aa-47f4-ab2e-a7dcbb8159f6", + "metadata": {}, + "outputs": [], + "source": [ + "gdf3 = pd.merge(\n", + " gdf2,\n", + " vp_gdf.rename(columns = {\n", + " \"vp_idx\": \"subseq_vp_idx\",\n", + " \"location_timestamp_local_sec\": \"subseq_sec\",\n", + " \"geometry\": \"subseq_vp_geometry\"\n", + " }),\n", + " on = \"subseq_vp_idx\",\n", + " how = \"inner\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5436f8b8-9402-48a5-9bc7-0271dc59be5f", + "metadata": {}, + "outputs": [], + "source": [ + "def plot_one_trip(gdf: gpd.GeoDataFrame, one_trip: str):\n", + " subset_gdf = gdf[gdf.trip_instance_key==one_trip]\n", + " \n", + " m = subset_gdf.set_geometry(\"stop_geometry\").explore(\n", + " categorical = True,\n", + " legend = False,\n", + " color = \"black\",\n", + " tiles = \"CartoDB Positron\",\n", + " name=\"Stops\"\n", + " )\n", + "\n", + " m = subset_gdf.set_geometry(\"nearest_vp_geometry\").explore(\n", + " m=m, \n", + " color=\"orange\", \n", + " name=\"nearest vp\"\n", + " )\n", + "\n", + " m = subset_gdf.set_geometry(\"subseq_vp_geometry\").explore(\n", + " m=m, \n", + " color=\"yellow\", \n", + " name=\"subseq vp\"\n", + " )\n", + " # this is completely optional\n", + " folium.LayerControl().add_to(m)\n", + "\n", + " return m" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a572625-24dd-4d12-9f4a-98e433cad075", + "metadata": {}, + "outputs": [], + "source": [ + "for t in test_trips:\n", + " print(f\"trip_instance_key: {t}\")\n", + " m = plot_one_trip(gdf3, t)\n", + " display(m)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2499aa75-0bdf-41b4-82da-29e9afaea5b7", + "metadata": {}, + "outputs": [], + "source": [ + "# Select one stop to look at\n", + "test_map = plot_one_trip(gdf3[gdf3.stop_sequence==68], test_trips[0])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fa847ab8-564e-49a0-b553-8c7d29e0b91c", + "metadata": {}, + "outputs": [], + "source": [ + "test_map" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "844654fa-a002-42ad-a6b1-fe26bd2ba3c9", + "metadata": {}, + "outputs": [], + "source": [ + "def look_at_arrival_time(df, one_trip):\n", + " subset_df = df[(df.trip_instance_key==one_trip)\n", + " ].sort_values(\"stop_sequence\").reset_index()\n", + " \n", + " cols = [\"stop_sequence\", \n", + " \"nearest_vp\", \"arrival_time\", \"subseq_vp\"]\n", + " \n", + " subset_df = subset_df.assign(\n", + " nearest_vp = pd.to_datetime(subset_df.nearest_sec, unit=\"s\").dt.time,\n", + " arrival_time = pd.to_datetime(subset_df.arrival_time_sec, unit=\"s\").dt.time,\n", + " subseq_vp = pd.to_datetime(subset_df.subseq_sec, unit=\"s\").dt.time,\n", + " )\n", + " \n", + " display(subset_df[cols])\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d0f0d52-1cfd-457b-a139-b50774b5809b", + "metadata": {}, + "outputs": [], + "source": [ + "for t in test_trips:\n", + " look_at_arrival_time(gdf3, t)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "38a2d09f-be2b-4c65-a8d8-0697c6a9c257", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rt_segment_speeds/logs/interpolate_stop_arrival.log b/rt_segment_speeds/logs/interpolate_stop_arrival.log new file mode 100644 index 000000000..a105b2850 --- /dev/null +++ b/rt_segment_speeds/logs/interpolate_stop_arrival.log @@ -0,0 +1,3 @@ +2023-10-24 13:53:30.034 | INFO | __main__::122 - set up df with nearest / subseq vp info: 0:01:06.871897 +2023-10-24 13:54:09.757 | INFO | __main__::127 - interpolate stop arrival: 0:00:39.722841 +2023-10-24 13:54:17.449 | INFO | __main__::133 - execution time: 0:01:54.286578 diff --git a/rt_segment_speeds/logs/nearest_vp.log b/rt_segment_speeds/logs/nearest_vp.log new file mode 100644 index 000000000..51e6ecf91 --- /dev/null +++ b/rt_segment_speeds/logs/nearest_vp.log @@ -0,0 +1,4 @@ +2023-10-24 12:14:34.459 | INFO | __main__::203 - Analysis date: 2023-09-13 +2023-10-24 12:18:28.007 | INFO | __main__::231 - map partitions to transform vp: 0:03:53.547561 +2023-10-24 12:18:28.823 | INFO | __main__::262 - map partitions to find nearest vp to stop: 0:00:00.816213 +2023-10-24 12:20:42.144 | INFO | __main__::293 - execution time: 0:06:07.685094 diff --git a/rt_segment_speeds/logs/prep_stop_segments.log b/rt_segment_speeds/logs/prep_stop_segments.log index c425c48bf..2598c728c 100644 --- a/rt_segment_speeds/logs/prep_stop_segments.log +++ b/rt_segment_speeds/logs/prep_stop_segments.log @@ -1,30 +1,3 @@ -2023-07-25 13:30:34.630 | INFO | __main__::290 - Analysis date: 2023-05-17 -2023-07-25 13:40:12.848 | INFO | __main__::297 - Prep stop segment df: 0:09:38.217245 -2023-07-25 13:41:01.595 | INFO | __main__::307 - execution time: 0:10:26.964487 -2023-07-25 14:39:19.717 | INFO | __main__::290 - Analysis date: 2023-06-14 -2023-07-25 14:49:25.861 | INFO | __main__::297 - Prep stop segment df: 0:10:06.141854 -2023-07-25 14:50:12.947 | INFO | __main__::307 - execution time: 0:10:53.228185 -2023-07-26 12:01:47.353 | INFO | __main__::290 - Analysis date: 2023-04-12 -2023-07-26 12:19:11.448 | INFO | __main__::297 - Prep stop segment df: 0:17:24.080713 -2023-07-26 12:20:13.208 | INFO | __main__::307 - execution time: 0:18:25.839838 -2023-07-26 13:31:19.892 | INFO | __main__::290 - Analysis date: 2023-03-15 -2023-07-26 13:45:57.782 | INFO | __main__::297 - Prep stop segment df: 0:14:37.886934 -2023-07-26 13:46:51.502 | INFO | __main__::307 - execution time: 0:15:31.607489 -2023-08-18 13:30:49.419 | INFO | __main__::289 - Analysis date: 2023-08-16 -2023-08-18 13:34:25.954 | INFO | __main__::296 - Prep stop segment df: 0:03:36.533806 -2023-08-18 13:35:19.571 | INFO | __main__::306 - execution time: 0:04:30.150851 -2023-08-24 13:35:34.698 | INFO | __main__::289 - Analysis date: 2023-08-15 -2023-08-24 13:39:47.732 | INFO | __main__::296 - Prep stop segment df: 0:04:13.013586 -2023-08-24 13:40:30.563 | INFO | __main__::306 - execution time: 0:04:55.845243 -2023-09-14 11:58:08.269 | INFO | __main__::289 - Analysis date: 2023-09-13 -2023-09-14 12:03:22.186 | INFO | __main__::296 - Prep stop segment df: 0:05:13.901987 -2023-09-14 12:04:13.962 | INFO | __main__::306 - execution time: 0:06:05.677351 -2023-10-12 10:32:55.776 | INFO | __main__::289 - Analysis date: 2023-10-11 -2023-10-12 10:38:32.223 | INFO | __main__::296 - Prep stop segment df: 0:05:36.430784 -2023-10-12 10:39:25.490 | INFO | __main__::306 - execution time: 0:06:29.697506 -2023-10-17 12:05:56.464 | INFO | __main__::247 - Analysis date: 2023-10-11 -2023-10-17 12:06:28.249 | INFO | __main__::254 - Prep stop segment df: 0:00:31.784438 -2023-10-17 12:06:33.946 | INFO | __main__::262 - execution time: 0:00:37.481256 2023-10-17 13:14:46.275 | INFO | __main__::256 - Analysis date: 2023-03-15 2023-10-17 13:15:24.496 | INFO | __main__::263 - Prep stop segment df: 0:00:38.214426 2023-10-17 13:15:29.098 | INFO | __main__::271 - execution time: 0:00:42.816073 @@ -43,6 +16,9 @@ 2023-10-17 13:17:48.639 | INFO | __main__::256 - Analysis date: 2023-08-15 2023-10-17 13:18:36.553 | INFO | __main__::263 - Prep stop segment df: 0:00:47.913322 2023-10-17 13:18:41.395 | INFO | __main__::271 - execution time: 0:00:52.755389 -2023-10-17 13:18:41.396 | INFO | __main__::256 - Analysis date: 2023-09-13 -2023-10-17 13:19:31.686 | INFO | __main__::263 - Prep stop segment df: 0:00:50.290227 -2023-10-17 13:19:36.847 | INFO | __main__::271 - execution time: 0:00:55.451357 +2023-10-24 10:03:59.640 | INFO | __main__::252 - Analysis date: 2023-09-13 +2023-10-24 10:04:30.868 | INFO | __main__::259 - Prep stop segment df: 0:00:31.227214 +2023-10-24 10:04:36.329 | INFO | __main__::267 - execution time: 0:00:36.688195 +2023-10-24 10:04:36.330 | INFO | __main__::252 - Analysis date: 2023-10-11 +2023-10-24 10:05:12.397 | INFO | __main__::259 - Prep stop segment df: 0:00:36.067067 +2023-10-24 10:05:18.498 | INFO | __main__::267 - execution time: 0:00:42.167738 diff --git a/rt_segment_speeds/scripts/A1_sjoin_vp_segments.py b/rt_segment_speeds/scripts/A1_sjoin_vp_segments.py index a50d57e35..f5b0653cc 100644 --- a/rt_segment_speeds/scripts/A1_sjoin_vp_segments.py +++ b/rt_segment_speeds/scripts/A1_sjoin_vp_segments.py @@ -16,19 +16,10 @@ from loguru import logger from calitp_data_analysis.geography_utils import WGS84 -from segment_speed_utils import helpers +from segment_speed_utils import helpers, wrangle_shapes from segment_speed_utils.project_vars import (analysis_date, SEGMENT_GCS, CONFIG_PATH, PROJECT_CRS) -ALL_DIRECTIONS = ["Northbound", "Southbound", "Eastbound", "Westbound"] -OPPOSITE_DIRECTIONS = { - "Northbound": "Southbound", - "Southbound": "Northbound", - "Eastbound": "Westbound", - "Westbound": "Eastbound", -} - - def add_grouping_col_to_vp( vp_file_name: str, analysis_date: str, @@ -138,8 +129,8 @@ def stage_direction_results( segment_identifier_cols: list, direction: str ): - opposite = OPPOSITE_DIRECTIONS[direction] - keep_vp = [d for d in ALL_DIRECTIONS if d != opposite] + ["Unknown"] + opposite = wrangle_shapes.OPPOSITE_DIRECTIONS[direction] + keep_vp = [d for d in wrangle_shapes.ALL_DIRECTIONS if d != opposite] + ["Unknown"] # Keep all directions of vp except the ones running in opposite direction # Esp since buses make turns, a northbound segment can be @@ -229,7 +220,7 @@ def sjoin_vp_to_segments( GROUPING_COL, SEGMENT_IDENTIFIER_COLS, one_direction - ).persist() for one_direction in ALL_DIRECTIONS + ).persist() for one_direction in wrangle_shapes.ALL_DIRECTIONS ] diff --git a/rt_segment_speeds/scripts/Makefile b/rt_segment_speeds/scripts/Makefile index 6137d65e8..baf76147b 100644 --- a/rt_segment_speeds/scripts/Makefile +++ b/rt_segment_speeds/scripts/Makefile @@ -18,6 +18,12 @@ speeds_pipeline: python C2_triangulate_vp.py python C3_trip_route_speed.py +new_pipeline: + python prep_stop_segments.py + python shapely_project_vp.py + python shapely_interpolate.py + #python A3_valid_vehicle_positions.py + download_roads: #pip install esridump diff --git a/rt_segment_speeds/scripts/interpolate_stop_arrival.py b/rt_segment_speeds/scripts/interpolate_stop_arrival.py new file mode 100644 index 000000000..0d4aefc05 --- /dev/null +++ b/rt_segment_speeds/scripts/interpolate_stop_arrival.py @@ -0,0 +1,135 @@ +""" +Interpolate stop arrival. +""" +import dask.dataframe as dd +import datetime +import numpy as np +import pandas as pd +import sys + +from loguru import logger + +from segment_speed_utils import helpers, segment_calcs +from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS +from shared_utils import rt_dates + +analysis_date = rt_dates.DATES["sep2023"] + + +def attach_vp_shape_meters_with_timestamp( + analysis_date: str, **kwargs +) -> pd.DataFrame: + """ + """ + # shape_meters is here + vp_projected = pd.read_parquet( + f"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet", + **kwargs + ) + + # location_timestamp_local is here, and needs to be converted to seconds + vp_usable = pd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}/", + columns = ["vp_idx", "location_timestamp_local"], + **kwargs, + ) + + vp_info = pd.merge( + vp_projected, + vp_usable, + on = "vp_idx", + how = "inner" + ) + + return vp_info + + +def get_stop_arrivals(df: pd.DataFrame) -> pd.DataFrame: + """ + Apply np.interp to df. + df must be set up so that a given stop is populated with its + own stop_meters, as well as columns for nearest and subseq + shape_meters / location_timestamp_local_sec. + """ + x_col = "shape_meters" + y_col = "location_timestamp_local" + + stop_arrival_series = [] + for row in df.itertuples(): + + xp = np.asarray([ + getattr(row, f"nearest_{x_col}"), + getattr(row, f"subseq_{x_col}") + ]) + + yp = np.asarray([ + getattr(row, f"nearest_{y_col}"), + getattr(row, f"subseq_{y_col}") + ]).astype("datetime64[s]").astype("float64") + + stop_position = getattr(row, "stop_meters") + interpolated_arrival = np.interp(stop_position, xp, yp) + stop_arrival_series.append(interpolated_arrival) + + df = df.assign( + arrival_time = stop_arrival_series, + ).astype({"arrival_time": "datetime64[s]"}) + + return df + + +if __name__ == "__main__": + + LOG_FILE = "../logs/interpolate_stop_arrival.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + analysis_date = rt_dates.DATES["sep2023"] + + logger.info(f"Analysis date: {analysis_date}") + + start = datetime.datetime.now() + + vp_pared = pd.read_parquet( + f"{SEGMENT_GCS}projection/nearest_vp_normal_{analysis_date}.parquet", + ) + + subset_vp = np.union1d( + vp_pared.nearest_vp_idx.unique(), + vp_pared.subseq_vp_idx.unique() + ) + + vp_info = attach_vp_shape_meters_with_timestamp( + analysis_date, + filters = [[("vp_idx", "in", subset_vp)]] + ) + + vp_with_nearest_info = pd.merge( + vp_pared, + vp_info.add_prefix("nearest_"), + on = "nearest_vp_idx", + how = "inner" + ) + + df = pd.merge( + vp_with_nearest_info, + vp_info.add_prefix("subseq_"), + on = "subseq_vp_idx", + how = "inner" + ) + + time1 = datetime.datetime.now() + logger.info(f"set up df with nearest / subseq vp info: {time1 - start}") + + stop_arrivals_df = get_stop_arrivals(df) + + time2 = datetime.datetime.now() + logger.info(f"interpolate stop arrival: {time2 - time1}") + + stop_arrivals_df.to_parquet( + f"{SEGMENT_GCS}stop_arrivals_{analysis_date}.parquet") + + end = datetime.datetime.now() + logger.info(f"execution time: {end - start}") diff --git a/rt_segment_speeds/scripts/nearest_vp_to_stop.py b/rt_segment_speeds/scripts/nearest_vp_to_stop.py new file mode 100644 index 000000000..da00c7223 --- /dev/null +++ b/rt_segment_speeds/scripts/nearest_vp_to_stop.py @@ -0,0 +1,276 @@ +""" +Handle normal vs loopy shapes separately. + +For normal shapes, find the nearest vp_idx before a stop, +and the vp_idx after. +""" +import dask.dataframe as dd +import datetime +import numpy as np +import pandas as pd +import sys + +from loguru import logger + +from segment_speed_utils import helpers, segment_calcs +from segment_speed_utils.project_vars import SEGMENT_GCS, PROJECT_CRS +from shared_utils import rt_dates + + +def rt_trips_to_shape(analysis_date: str) -> pd.DataFrame: + """ + Filter down trip_instance_keys from schedule to + trips present in vp. + Provide shape_array_key associated with trip_instance_key. + """ + # Get RT trips + rt_trips = pd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}", + columns = ["trip_instance_key"] + ).drop_duplicates() + + # Find the shape_array_key for RT trips + trip_to_shape = helpers.import_scheduled_trips( + analysis_date, + columns = ["trip_instance_key", "shape_array_key"], + get_pandas = True + ).merge( + rt_trips, + on = "trip_instance_key", + how = "inner" + ) + + # Find whether it's loop or inlining + shapes_loop_inlining = pd.read_parquet( + f"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet", + columns = ["shape_array_key", "loop_or_inlining"] + ).drop_duplicates().merge( + trip_to_shape, + on = "shape_array_key", + how = "inner" + ) + + return shapes_loop_inlining + + +def vp_with_shape_meters( + analysis_date: str, + subset_trips: list +) -> dd.DataFrame: + """ + Subset vp_usable down based on list of trip_instance_keys. + For these trips, attach the projected shape meters. + """ + vp = dd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}", + filters = [[("trip_instance_key", "in", subset_trips)]], + columns = ["trip_instance_key", "vp_idx", + "location_timestamp_local"] + ) + + vp = segment_calcs.convert_timestamp_to_seconds( + vp, ["location_timestamp_local"]).drop(columns = "location_timestamp_local") + + projected_shape_meters = pd.read_parquet( + f"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet", + ) + + vp_with_projection = dd.merge( + vp, + projected_shape_meters, + on = "vp_idx", + how = "inner" + ) + + return vp_with_projection + + +def transform_vp(vp: dd.DataFrame) -> dd.DataFrame: + """ + For each trip, transform vp from long to wide, + so each row is one trip. + Store vp_idx and shape_meters as lists. + """ + trip_shape_cols = ["trip_instance_key", "shape_array_key"] + + trip_info = ( + vp + .groupby(trip_shape_cols, + observed=True, group_keys=False) + .agg({ + "vp_idx": lambda x: list(x), + "shape_meters": lambda x: list(x)}) + .reset_index() + .rename(columns = { + "vp_idx": "vp_idx_arr", + "shape_meters": "shape_meters_arr"}) + ) + + return trip_info + + +def find_vp_nearest_stop_position( + df: dd.DataFrame, +) -> dd.DataFrame: + """ + Once we've attached where each shape has stop cutpoints (stop_meters), + for each trip_instance_key, we want to find where the nearest + vp_idx is to that particular stop. + + We have array of vp_idx and vp_shape_meters. + Go through each row and find the nearest vp_shape_meters is + to stop_meters, and save that vp_idx value. + """ + trip_shape_cols = ["trip_instance_key", "shape_array_key"] + + nearest_vp_idx = [] + subseq_vp_idx = [] + + # https://github.com/cal-itp/data-analyses/blob/main/rt_delay/rt_analysis/rt_parser.py#L270-L271 + # Don't forget to subtract 1 for proper index + for row in df.itertuples(): + idx = np.searchsorted( + getattr(row, "shape_meters_arr"), + getattr(row, "stop_meters"), + side="right" + # want our stop_meters value to be < vp_shape_meters, + # side = "left" would be stop_meters <= vp_shape_meters + ) + + # For the next value, if there's nothing to index into, + # just set it to the same position + # if we set subseq_value = getattr(row, )[idx], we might not get a consecutive vp + nearest_value = getattr(row, "vp_idx_arr")[idx-1] + subseq_value = nearest_value + 1 + + nearest_vp_idx.append(nearest_value) + subseq_vp_idx.append(subseq_value) + + + result = df[trip_shape_cols + ["stop_sequence", "stop_id", "stop_meters"]] + + # Now assign the nearest vp for each trip that's nearest to + # a given stop + # Need to find the one after the stop later + result = result.assign( + nearest_vp_idx = nearest_vp_idx, + subseq_vp_idx = subseq_vp_idx, + ) + + return result + + +def fix_out_of_bound_results( + df: pd.DataFrame, + analysis_date: str +) -> pd.DataFrame: + + # Merge in usable bounds + usable_bounds = dd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}" + ).pipe(segment_calcs.get_usable_vp_bounds_by_trip) + + results_with_bounds = pd.merge( + df, + usable_bounds, + on = "trip_instance_key", + how = "inner" + ) + + correct_results = results_with_bounds.query('subseq_vp_idx <= max_vp_idx') + incorrect_results = results_with_bounds.query('subseq_vp_idx > max_vp_idx') + incorrect_results = incorrect_results.assign( + subseq_vp_idx = incorrect_results.nearest_vp_idx + ) + + fixed_results = pd.concat( + [correct_results, incorrect_results], + axis=0 + ).drop(columns = ["min_vp_idx", "max_vp_idx"]).sort_index() + + return fixed_results + + +if __name__ == "__main__": + + LOG_FILE = "../logs/nearest_vp.log" + logger.add(LOG_FILE, retention="3 months") + logger.add(sys.stderr, + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + level="INFO") + + analysis_date = rt_dates.DATES["sep2023"] + + logger.info(f"Analysis date: {analysis_date}") + + start = datetime.datetime.now() + + normal_shape_trips = rt_trips_to_shape(analysis_date).query('loop_or_inlining==0') + normal_trip_keys = normal_shape_trips.trip_instance_key.tolist() + normal_shapes = normal_shape_trips.shape_array_key.unique().tolist() + + vp = vp_with_shape_meters( + analysis_date, + normal_trip_keys + ).merge( + normal_shape_trips, + on = "trip_instance_key", + how = "inner" + ) + + vp_wide = vp.map_partitions( + transform_vp, + meta = {"trip_instance_key": "object", + "shape_array_key": "object", + "vp_idx_arr": "object", + "shape_meters_arr": "object" + }, + align_dataframes = False + ).persist() + + time1 = datetime.datetime.now() + logger.info(f"map partitions to transform vp: {time1 - start}") + + stops_projected = pd.read_parquet( + f"{SEGMENT_GCS}stops_projected_{analysis_date}.parquet", + filters = [[("shape_array_key", "in", normal_shapes)]], + columns = ["shape_array_key", "stop_sequence", "stop_id", "shape_meters"] + ).rename(columns = {"shape_meters": "stop_meters"}) + + existing_stop_cols = stops_projected[ + ["shape_array_key", "stop_sequence", "stop_id", "stop_meters"]].dtypes.to_dict() + existing_vp_cols = vp_wide[["trip_instance_key"]].dtypes.to_dict() + + vp_to_stop = dd.merge( + vp_wide, + stops_projected, + on = "shape_array_key", + how = "inner" + ) + + result = vp_to_stop.map_partitions( + find_vp_nearest_stop_position, + meta = { + **existing_vp_cols, + **existing_stop_cols, + "nearest_vp_idx": "int64", + "subseq_vp_idx": "int64", + }, + align_dataframes = False, + ) + + time2 = datetime.datetime.now() + logger.info(f"map partitions to find nearest vp to stop: {time2 - time1}") + + result = result.compute() + + fixed_results = fix_out_of_bound_results(result, analysis_date) + + fixed_results.to_parquet( + f"{SEGMENT_GCS}projection/nearest_vp_normal_{analysis_date}.parquet") + + end = datetime.datetime.now() + logger.info(f"execution time: {end - start}") + + # https://stackoverflow.com/questions/10226551/whats-the-most-pythonic-way-to-calculate-percentage-changes-on-a-list-of-numbers + diff --git a/rt_segment_speeds/scripts/prep_stop_segments.py b/rt_segment_speeds/scripts/prep_stop_segments.py index 3c84989e9..0f3b739e4 100644 --- a/rt_segment_speeds/scripts/prep_stop_segments.py +++ b/rt_segment_speeds/scripts/prep_stop_segments.py @@ -18,8 +18,8 @@ from typing import Union from calitp_data_analysis import utils -from segment_speed_utils import (helpers, gtfs_schedule_wrangling, - wrangle_shapes) +from segment_speed_utils import helpers, gtfs_schedule_wrangling + from segment_speed_utils.project_vars import (SEGMENT_GCS, RT_SCHED_GCS, PROJECT_CRS) diff --git a/rt_segment_speeds/scripts/shapely_project_vp.py b/rt_segment_speeds/scripts/shapely_project_vp.py new file mode 100644 index 000000000..a8c93df95 --- /dev/null +++ b/rt_segment_speeds/scripts/shapely_project_vp.py @@ -0,0 +1,101 @@ +""" +If we project all vp against shape geometry +take a look. +""" +import dask.dataframe as dd +import datetime +import geopandas as gpd +import pandas as pd +import sys + +from loguru import logger + +from shared_utils import rt_dates +from segment_speed_utils import helpers +from segment_speed_utils.project_vars import (SEGMENT_GCS, + PROJECT_CRS) + +analysis_date = rt_dates.DATES["sep2023"] + +def project_vp_to_shape( + vp: dd.DataFrame, + shapes: gpd.GeoDataFrame +): + shapes = shapes.rename(columns = {"geometry": "shape_geometry"}) + + vp_gdf = gpd.GeoDataFrame( + vp, + geometry = gpd.points_from_xy(vp.x, vp.y), + crs = "EPSG:4326" + ).to_crs(PROJECT_CRS).drop(columns = ["x", "y"]) + + gdf = pd.merge( + vp_gdf, + shapes, + on = "shape_array_key", + how = "inner" + ) + + gdf = gdf.assign( + shape_meters = gdf.shape_geometry.project(gdf.geometry) + ) + + vp_projected_result = gdf[["vp_idx", "shape_meters"]] + + return vp_projected_result + + +if __name__ == "__main__": + + start = datetime.datetime.now() + + trips = helpers.import_scheduled_trips( + analysis_date, + columns = ["trip_instance_key", "shape_array_key"], + get_pandas = True + ) + + vp = dd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}", + columns = ["trip_instance_key", "vp_idx", "x", "y"] + ).merge( + trips, + on = "trip_instance_key", + how = "inner" + ) + + subset_shapes = pd.read_parquet( + f"{SEGMENT_GCS}vp_usable_{analysis_date}", + columns = ["trip_instance_key"] + ).drop_duplicates().merge( + trips, + on = "trip_instance_key", + how = "inner" + ).shape_array_key.unique().tolist() + + shapes = helpers.import_scheduled_shapes( + analysis_date, + columns = ["shape_array_key", "geometry"], + filters = [[("shape_array_key", "in", subset_shapes)]], + get_pandas = True, + crs = PROJECT_CRS + ) + + results = vp.map_partitions( + project_vp_to_shape, + shapes, + meta = {"vp_idx": "int64", + "shape_meters": "float64"}, + align_dataframes = False + ) + + time1 = datetime.datetime.now() + logger.info(f"map partitions: {time1 - start}") + + df = results.compute() + df.to_parquet( + f"{SEGMENT_GCS}projection/vp_projected_{analysis_date}.parquet") + + end = datetime.datetime.now() + logger.info(f"compute and export: {end - time1}") + diff --git a/rt_segment_speeds/segment_speed_utils/helpers.py b/rt_segment_speeds/segment_speed_utils/helpers.py index f85d51c28..eed71d340 100644 --- a/rt_segment_speeds/segment_speed_utils/helpers.py +++ b/rt_segment_speeds/segment_speed_utils/helpers.py @@ -10,21 +10,15 @@ import datetime import gcsfs import geopandas as gpd -import intake import pandas as pd import yaml -from pathlib import Path from typing import Literal, Union from segment_speed_utils.project_vars import (SEGMENT_GCS, COMPILED_CACHED_VIEWS, PROJECT_CRS) from calitp_data_analysis import utils -CATALOG_PATH = Path("data-analyses/_shared_utils/shared_utils/shared_data_catalog.yml") - -catalog = intake.open_catalog(f"{Path.home().joinpath(CATALOG_PATH)}") - fs = gcsfs.GCSFileSystem() def get_parameters( @@ -238,7 +232,15 @@ def remove_shapes_outside_ca( FlixBus is another like Amtrak, with far flung routes. """ - us_states = catalog.us_states.read() + # Can't get relative path working within importable segment_speed_utils + #us_states = catalog.us_states.read() + # https://github.com/cal-itp/data-analyses/blob/main/_shared_utils/shared_utils/shared_data_catalog.yml + us_states = gpd.read_file( + "https://services.arcgis.com/ue9rwulIoeLEI9bj/" + "arcgis/rest/services/US_StateBoundaries/FeatureServer/0/" + "query?outFields=*&where=1%3D1&f=geojson" + ) + border_states = ["CA", "NV", "AZ", "OR"] diff --git a/rt_segment_speeds/segment_speed_utils/project_vars.py b/rt_segment_speeds/segment_speed_utils/project_vars.py index 5da093a04..792b9ac29 100644 --- a/rt_segment_speeds/segment_speed_utils/project_vars.py +++ b/rt_segment_speeds/segment_speed_utils/project_vars.py @@ -10,6 +10,7 @@ analysis_date = rt_dates.DATES["oct2023"] analysis_date_list = [ + rt_dates.DATES["sep2023"], rt_dates.DATES["oct2023"] ] diff --git a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py index 420afc195..a5d7f89a8 100644 --- a/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py +++ b/rt_segment_speeds/segment_speed_utils/wrangle_shapes.py @@ -23,6 +23,14 @@ from shared_utils import rt_utils from segment_speed_utils.project_vars import PROJECT_CRS +ALL_DIRECTIONS = ["Northbound", "Southbound", "Eastbound", "Westbound"] + +OPPOSITE_DIRECTIONS = { + "Northbound": "Southbound", + "Southbound": "Northbound", + "Eastbound": "Westbound", + "Westbound": "Eastbound", +} def interpolate_projected_points( shape_geometry: shapely.geometry.LineString,