2
2
Find nearest_vp_idx to the stop position
3
3
using scipy KDTree.
4
4
"""
5
+ import dask .dataframe as dd
6
+ import dask_geopandas as dg
5
7
import datetime
6
8
import geopandas as gpd
7
9
import numpy as np
16
18
from segment_speed_utils import helpers , neighbor
17
19
from segment_speed_utils .project_vars import SEGMENT_GCS
18
20
21
+ stop_time_col_order = [
22
+ 'trip_instance_key' , 'shape_array_key' ,
23
+ 'stop_sequence' , 'stop_id' , 'stop_pair' ,
24
+ 'stop_primary_direction' , 'geometry'
25
+ ]
19
26
20
27
def add_nearest_neighbor_result (
21
28
gdf : gpd .GeoDataFrame ,
@@ -31,11 +38,11 @@ def add_nearest_neighbor_result(
31
38
f"{ SEGMENT_GCS } condensed/vp_condensed_{ analysis_date } .parquet" ,
32
39
columns = ["trip_instance_key" , "vp_idx" ,
33
40
"location_timestamp_local" ,
34
- "geometry" ]
41
+ "geometry" ],
35
42
).rename (columns = {
36
43
"vp_idx" : "trip_vp_idx" ,
37
44
"geometry" : "trip_geometry"
38
- })
45
+ }). set_geometry ( "trip_geometry" ). to_crs ( WGS84 )
39
46
40
47
gdf2 = pd .merge (
41
48
gdf ,
@@ -52,48 +59,40 @@ def add_nearest_neighbor_result(
52
59
coords_trio_series = []
53
60
54
61
# Iterate through and find the nearest_vp_idx, then surrounding trio
62
+ nearest_vp_idx = np .vectorize (neighbor .add_nearest_vp_idx )(
63
+ gdf2 .vp_geometry , gdf2 .stop_geometry , gdf2 .vp_idx
64
+ )
65
+
66
+ gdf2 = gdf2 .assign (
67
+ nearest_vp_idx = nearest_vp_idx ,
68
+ ).drop (
69
+ columns = ["vp_idx" , "vp_geometry" ]
70
+ )
71
+
55
72
for row in gdf2 .itertuples ():
56
- nearest_vp = neighbor .add_nearest_vp_idx (
57
- getattr (row , "geometry" ),
58
- getattr (row , "stop_geometry" ),
59
- getattr (row , "vp_idx" )
60
- )
61
-
62
- vp_idx_arr = np .asarray (getattr (row , "trip_vp_idx" ))
63
- timestamp_arr = np .asarray (getattr (row , "location_timestamp_local" ))
64
- coords_arr = np .asarray (getattr (row , "trip_geometry" ).coords )
65
-
66
73
vp_trio , time_trio , coords_trio = neighbor .add_trio (
67
- nearest_vp ,
74
+ getattr ( row , "nearest_vp_idx" ) ,
68
75
np .asarray (getattr (row , "trip_vp_idx" )),
69
76
np .asarray (getattr (row , "location_timestamp_local" )),
70
- np .array (getattr (row , "trip_geometry" ).coords ),
77
+ np .asarray (getattr (row , "trip_geometry" ).coords ),
71
78
)
72
79
73
- nearest_vp_idx_series .append (nearest_vp )
74
- trio_line = shapely .LineString (coords_trio )
75
80
vp_trio_series .append (vp_trio )
76
81
time_trio_series .append (time_trio )
77
- coords_trio_series .append (trio_line )
78
-
79
-
80
- gdf2 = gdf2 .assign (
81
- nearest_vp_idx = nearest_vp_idx_series ,
82
- vp_idx_trio = vp_trio_series ,
83
- location_timestamp_local_trio = time_trio_series ,
84
- vp_coords_trio = gpd .GeoSeries (coords_trio_series , crs = WGS84 )
85
- )
86
-
82
+ coords_trio_series .append (shapely .LineString (coords_trio ))
83
+
87
84
drop_cols = [
88
- "vp_idx" , "geometry" ,
89
85
"location_timestamp_local" ,
90
86
"trip_vp_idx" , "trip_geometry"
91
87
]
92
88
93
- gdf2 = gdf2 .drop (columns = drop_cols )
89
+ gdf2 = gdf2 .assign (
90
+ vp_idx_trio = vp_trio_series ,
91
+ location_timestamp_local_trio = time_trio_series ,
92
+ vp_coords_trio = gpd .GeoSeries (coords_trio_series , crs = WGS84 )
93
+ ).drop (columns = drop_cols )
94
94
95
- del nearest_vp_idx_series , vp_trio_series
96
- del time_trio_series , coords_trio_series
95
+ del vp_trio_series , time_trio_series , coords_trio_series
97
96
98
97
return gdf2
99
98
@@ -111,14 +110,14 @@ def nearest_neighbor_rt_stop_times(
111
110
112
111
stop_times = helpers .import_scheduled_stop_times (
113
112
analysis_date ,
114
- columns = ["trip_instance_key" ,
113
+ columns = ["trip_instance_key" , "shape_array_key" ,
115
114
"stop_sequence" , "stop_id" , "stop_pair" ,
116
115
"stop_primary_direction" ,
117
116
"geometry" ],
118
117
with_direction = True ,
119
118
get_pandas = True ,
120
119
crs = WGS84
121
- )
120
+ ). reindex ( columns = stop_time_col_order )
122
121
123
122
gdf = neighbor .merge_stop_vp_for_nearest_neighbor (
124
123
stop_times , analysis_date )
@@ -154,45 +153,48 @@ def nearest_neighbor_shape_segments(
154
153
EXPORT_FILE = dict_inputs ["stage2" ]
155
154
SEGMENT_FILE = dict_inputs ["segments_file" ]
156
155
157
- subset_trips = pd .read_parquet (
156
+ rt_trips = helpers .import_unique_vp_trips (analysis_date )
157
+
158
+ shape_stop_combinations = pd .read_parquet (
158
159
f"{ SEGMENT_GCS } { SEGMENT_FILE } _{ analysis_date } .parquet" ,
159
- columns = ["st_trip_instance_key" ]
160
- ).st_trip_instance_key .unique ()
160
+ columns = ["trip_instance_key" ,
161
+ "stop_id1" , "stop_pair" ,
162
+ "st_trip_instance_key" ],
163
+ filters = [[("trip_instance_key" , "in" , rt_trips )]]
164
+ ).rename (columns = {"stop_id1" : "stop_id" })
165
+
166
+ subset_trips = shape_stop_combinations .st_trip_instance_key .unique ()
161
167
162
168
stops_to_use = helpers .import_scheduled_stop_times (
163
169
analysis_date ,
164
170
columns = ["trip_instance_key" , "shape_array_key" ,
165
- "stop_sequence" , "stop_id" , "stop_pair" ,
166
- "stop_primary_direction" ,
167
- "geometry" ],
171
+ "stop_sequence" , "stop_id" , "stop_pair" ,
172
+ "stop_primary_direction" , "geometry" ],
168
173
filters = [[("trip_instance_key" , "in" , subset_trips )]],
169
174
get_pandas = True ,
170
175
with_direction = True
171
176
).rename (columns = {"trip_instance_key" : "st_trip_instance_key" })
172
177
173
- all_trips = helpers .import_scheduled_stop_times (
174
- analysis_date ,
175
- columns = ["trip_instance_key" , "shape_array_key" ],
176
- get_pandas = True ,
177
- with_direction = True
178
- ).drop_duplicates ().reset_index (drop = True )
179
-
180
178
stop_times = pd .merge (
181
179
stops_to_use ,
182
- all_trips ,
183
- on = "shape_array_key" ,
180
+ shape_stop_combinations ,
181
+ on = [ "st_trip_instance_key" , "stop_id" , "stop_pair" ] ,
184
182
how = "inner"
183
+ ).drop (
184
+ columns = "st_trip_instance_key"
185
+ ).drop_duplicates ().reset_index (drop = True ).reindex (
186
+ columns = stop_time_col_order
185
187
)
186
188
189
+ del stops_to_use , shape_stop_combinations
190
+
187
191
gdf = neighbor .merge_stop_vp_for_nearest_neighbor (
188
192
stop_times , analysis_date )
189
-
190
- del stop_times , all_trips , stops_to_use
191
-
192
- results = add_nearest_neighbor_result (gdf , analysis_date )
193
193
194
- del gdf
194
+ results = add_nearest_neighbor_result ( gdf , analysis_date )
195
195
196
+ del stop_times , gdf
197
+
196
198
utils .geoparquet_gcs_export (
197
199
results ,
198
200
SEGMENT_GCS ,
@@ -206,7 +208,7 @@ def nearest_neighbor_shape_segments(
206
208
del results
207
209
208
210
return
209
-
211
+
210
212
211
213
if __name__ == "__main__" :
212
214
@@ -223,5 +225,5 @@ def nearest_neighbor_shape_segments(
223
225
224
226
for analysis_date in analysis_date_list :
225
227
nearest_neighbor_shape_segments (analysis_date , STOP_SEG_DICT )
226
- nearest_neighbor_rt_stop_times (analysis_date , RT_STOP_TIMES_DICT )
228
+ # nearest_neighbor_rt_stop_times(analysis_date, RT_STOP_TIMES_DICT)
227
229
0 commit comments