1
+ import datetime
2
+
3
+ import dask
4
+ import dask .dataframe as dd
5
+ import geopandas as gpd
6
+ import numpy as np
7
+ import pandas as pd
8
+ from calitp_data_analysis import utils
9
+ from calitp_data_analysis .geography_utils import WGS84
10
+ import vp_spatial_accuracy
11
+ from segment_speed_utils import helpers
12
+ from segment_speed_utils .project_vars import (
13
+ GCS_FILE_PATH ,
14
+ PROJECT_CRS ,
15
+ SEGMENT_GCS ,
16
+ analysis_date ,
17
+ )
18
+
19
+ # Times
20
+ import sys
21
+ from loguru import logger
22
+
23
+ # cd rt_segment_speeds && pip install -r requirements.txt && cd
24
+
25
+ # UPDATE COMPLETENESS
26
+ def pings_trip_time (vp_usable_df : pd .DataFrame ):
27
+
28
+ # Find number of pings each minute
29
+ df = (
30
+ vp_usable_df .groupby (
31
+ [
32
+ "trip_instance_key" ,
33
+ pd .Grouper (key = "location_timestamp_local" , freq = "1Min" ),
34
+ ]
35
+ )
36
+ .vp_idx .count ()
37
+ .reset_index ()
38
+ .rename (columns = {"vp_idx" : "number_of_pings_per_minute" })
39
+ )
40
+
41
+ # Determine which rows have 2+ pings per minute
42
+ df = df .assign (
43
+ minutes_w_atleast2_trip_updates = df .apply (
44
+ lambda x : 1 if x .number_of_pings_per_minute >= 2 else 0 , axis = 1
45
+ )
46
+ )
47
+
48
+ # Need a copy of loc-timestamp-local to get max time
49
+ df ["max_time" ] = df .location_timestamp_local
50
+
51
+ # Need a copy of numer of pings per minute to count
52
+ # for total minutes w gtfs
53
+ df ["total_minute_w_gtfs" ] = df .number_of_pings_per_minute
54
+
55
+ # Find the min time for each trip and sum up total min with at least 2 pings per min
56
+ df = (
57
+ df .groupby (["trip_instance_key" ])
58
+ .agg (
59
+ {
60
+ "location_timestamp_local" : "min" ,
61
+ "max_time" : "max" ,
62
+ "minutes_w_atleast2_trip_updates" : "sum" ,
63
+ "number_of_pings_per_minute" : "median" ,
64
+ "total_minute_w_gtfs" : "count" ,
65
+ }
66
+ )
67
+ .reset_index ()
68
+ .rename (
69
+ columns = {
70
+ "location_timestamp_local" : "min_time" ,
71
+ "number_of_pings_per_minute" : "median_pings_per_min" ,
72
+ }
73
+ )
74
+ )
75
+
76
+ # Find total trip time and add an extra minute
77
+ df ["total_trip_time" ] = (df .max_time - df .min_time ) / pd .Timedelta (minutes = 1 ) + 1
78
+
79
+ df = df .drop (columns = ["min_time" , "max_time" ])
80
+ return df
81
+
82
+ def grab_shape_keys_in_vp (vp_usable : dd .DataFrame , analysis_date : str ):
83
+ """
84
+ Subset raw vp and find unique trip_instance_keys.
85
+ Create crosswalk to link trip_instance_key to shape_array_key.
86
+ """
87
+ vp_usable = (
88
+ vp_usable [["trip_instance_key" ]].drop_duplicates ().reset_index (drop = True )
89
+ )
90
+
91
+ trips_with_shape = helpers .import_scheduled_trips (
92
+ analysis_date ,
93
+ columns = ["trip_instance_key" , "shape_array_key" ],
94
+ get_pandas = True ,
95
+ )
96
+
97
+ # Only one row per trip/shape
98
+ # trip_instance_key and shape_array_key are the only 2 cols left
99
+ m1 = dd .merge (vp_usable , trips_with_shape , on = "trip_instance_key" , how = "inner" )
100
+
101
+ return m1
102
+
103
+ def buffer_shapes (
104
+ trips_with_shape : pd .DataFrame ,
105
+ analysis_date : str ,
106
+ buffer_meters : int = 35 ,
107
+ ):
108
+ """
109
+ Filter scheduled shapes down to the shapes that appear in vp.
110
+ Buffer these.
111
+
112
+ Attach the shape geometry for a subset of shapes or trips.
113
+ """
114
+ subset = trips_with_shape .shape_array_key .unique ().compute ().tolist ()
115
+
116
+ shapes = helpers .import_scheduled_shapes (
117
+ analysis_date ,
118
+ columns = ["shape_array_key" , "geometry" ],
119
+ filters = [[("shape_array_key" , "in" , subset )]],
120
+ crs = PROJECT_CRS ,
121
+ get_pandas = False ,
122
+ ).pipe (helpers .remove_shapes_outside_ca )
123
+
124
+ # to_crs takes awhile, so do a filtering on only shapes we need
125
+ shapes = shapes .assign (geometry = shapes .geometry .buffer (buffer_meters ))
126
+
127
+ trips_with_shape_geom = dd .merge (
128
+ shapes , trips_with_shape , on = "shape_array_key" , how = "inner"
129
+ )
130
+
131
+ trips_with_shape_geom = trips_with_shape_geom .compute ()
132
+ return trips_with_shape_geom
133
+
134
+ # SPATIAL ACCURACY
135
+ def vp_in_shape (
136
+ vp_usable : dd .DataFrame , trips_with_buffered_shape : gpd .GeoDataFrame
137
+ ) -> gpd .GeoDataFrame :
138
+
139
+ keep = ["trip_instance_key" , "x" , "y" , "location_timestamp_local" ]
140
+ vp_usable = vp_usable [keep ]
141
+
142
+ vp_gdf = gpd .GeoDataFrame (
143
+ vp_usable , geometry = gpd .points_from_xy (vp_usable .x , vp_usable .y ), crs = WGS84
144
+ ).to_crs (PROJECT_CRS )
145
+
146
+ gdf = pd .merge (
147
+ vp_gdf , trips_with_buffered_shape , on = "trip_instance_key" , how = "inner"
148
+ )
149
+
150
+ gdf = gdf .assign (is_within = gdf .geometry_x .within (gdf .geometry_y ))
151
+ gdf = gdf [["trip_instance_key" , "location_timestamp_local" , "is_within" ]]
152
+
153
+ return gdf
154
+
155
+ def total_counts (result : dd .DataFrame ):
156
+
157
+ total_vp = vp_spatial_accuracy .total_vp_counts_by_trip (result )
158
+
159
+ result2 = result .loc [result .is_within == True ].reset_index (drop = True )
160
+ result2 = result2 [["trip_instance_key" , "location_timestamp_local" ]]
161
+ vps_in_shape = (
162
+ result2 .groupby ("trip_instance_key" , observed = True , group_keys = False )
163
+ .agg ({"location_timestamp_local" : "count" })
164
+ .reset_index ()
165
+ .rename (columns = {"location_timestamp_local" : "vp_in_shape" })
166
+ )
167
+
168
+ # Count total vps for the trip
169
+ # total vp by trip can be done on vp_usable / break apart from vp_in_shape
170
+
171
+ count_df = pd .merge (total_vp , vps_in_shape , on = "trip_instance_key" , how = "left" )
172
+
173
+ count_df = count_df .assign (
174
+ vp_in_shape = count_df .vp_in_shape .fillna (0 ).astype ("int32" ),
175
+ total_vp = count_df .total_vp .fillna (0 ).astype ("int32" ),
176
+ )
177
+
178
+ return count_df
179
+
180
+ # SPEEDS
181
+ def load_trip_speeds (analysis_date ):
182
+ df = pd .read_parquet (
183
+ f"{ SEGMENT_GCS } trip_summary/trip_speeds_{ analysis_date } .parquet" ,
184
+ columns = [
185
+ "trip_instance_key" ,
186
+ "speed_mph" ,
187
+ "route_id" ,
188
+ "time_of_day" ,
189
+ "service_minutes" ,
190
+ ])
191
+
192
+ return df
193
+
194
+ # Complete
195
+ def vp_usable_metrics (analysis_date :str ) -> pd .DataFrame :
196
+
197
+ """
198
+ Keep for testing
199
+ operator = "Bay Area 511 Muni VehiclePositions"
200
+ gtfs_key = "7cc0cb1871dfd558f11a2885c145d144"
201
+
202
+ vp_usable= dd.read_parquet(
203
+ f"{SEGMENT_GCS}vp_usable_{analysis_date}",
204
+ filters=[
205
+ [
206
+ ("gtfs_dataset_name", "==", operator),
207
+ ("schedule_gtfs_dataset_key", "==", gtfs_key),
208
+ ]
209
+ ],
210
+ )
211
+ """
212
+ vp_usable = dd .read_parquet (f"{ SEGMENT_GCS } vp_usable_{ analysis_date } " )
213
+
214
+ ## Update Completeness ##
215
+
216
+ # Find total min with gtfs, total trip time,
217
+ # median pings per minute
218
+ pings_trip_time_df = vp_usable .map_partitions (
219
+ pings_trip_time ,
220
+ meta = {
221
+ "trip_instance_key" : "object" ,
222
+ "minutes_w_atleast2_trip_updates" : "int64" ,
223
+ "median_pings_per_min" : "int64" ,
224
+ "total_minute_w_gtfs" : "int64" ,
225
+ "total_trip_time" : "float64" ,
226
+ },
227
+ align_dataframes = False ).persist ()
228
+
229
+ ## Spatial accuracy ##
230
+
231
+ # Determine which trips have shapes associated with them
232
+ trips_with_shapes_df = grab_shape_keys_in_vp (vp_usable , analysis_date )
233
+
234
+ # Buffer the shapes
235
+ buffered_shapes_df = buffer_shapes (trips_with_shapes_df , analysis_date , 35 )
236
+
237
+ # Find the vps that fall into buffered shapes
238
+ in_shape_df = vp_usable .map_partitions (
239
+ vp_in_shape ,
240
+ buffered_shapes_df ,
241
+ meta = {
242
+ "trip_instance_key" : "object" ,
243
+ "location_timestamp_local" : "datetime64[ns]" ,
244
+ "is_within" :"bool" ,
245
+ },
246
+ align_dataframes = False ).persist ()
247
+
248
+ # Compare total vps for a trip versus total vps that
249
+ # fell in the recorded shape
250
+ spatial_accuracy_df = in_shape_df .map_partitions (
251
+ total_counts ,
252
+ meta = {"trip_instance_key" : "object" , "total_vp" : "int32" , "vp_in_shape" : "int32" },
253
+ align_dataframes = False ,).persist ()
254
+
255
+ # Load trip speeds
256
+ trip_speeds_df = load_trip_speeds (analysis_date )
257
+
258
+ # Merges
259
+ pings_trip_time_df = pings_trip_time_df .compute ()
260
+ spatial_accuracy_df = spatial_accuracy_df .compute ()
261
+
262
+ m1 = (
263
+ pings_trip_time_df .merge (spatial_accuracy_df , on = ["trip_instance_key" ], how = "outer" )
264
+ .merge (trip_speeds_df , on = ["trip_instance_key" ], how = "outer" )
265
+ )
266
+
267
+ m1 .to_parquet ('./vp_usable_metrics.parquet' )
268
+ return m1
269
+
270
+ if __name__ == "__main__" :
271
+ start = datetime .datetime .now ()
272
+ LOG_FILE = "../logs/vp_usable_test.log"
273
+ logger .add (LOG_FILE , retention = "3 months" )
274
+ logger .add (sys .stderr ,
275
+ format = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" ,
276
+ level = "INFO" )
277
+ end = datetime .datetime .now ()
278
+ logger .info (f"Analysis date: { analysis_date } , started at { start } " )
279
+ vp_usable_metrics (analysis_date )
280
+ logger .info (f"Ended at { end } , took { end - start } min to run" )
281
+ print ('done' )
0 commit comments