Skip to content

Commit d00cf65

Browse files
authored
Merge pull request #931 from cal-itp/gtfs-funnel
Gtfs funnel
2 parents 4e2c774 + c5f962d commit d00cf65

28 files changed

+566
-619
lines changed

gtfs_funnel/Makefile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
download_gtfs_data_one_day:
2+
# make sure to update update_vars.py for dates to download
3+
python download_trips.py
4+
python download_stops.py
5+
python download_shapes.py
6+
python download_stop_times.py
7+
python download_vehicle_positions.py
8+
python concatenate_vehicle_positions.py
9+
10+
preprocess:
11+
python stop_times_with_direction.py
12+
python vp_keep_usable.py
13+
python vp_direction.py

gtfs_funnel/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## Download Single Day of GTFS Schedule & Real-Time Data
2+
3+
Use `update_vars` and input one or several days to download.
4+
5+
1. **Schedule data**: download data for [trips](./download_trips.py), [stops](./download_stops.py), [shapes](./download_shapes.py), and [stop times](./download_stop_times.py) and cache parquets in GCS
6+
1. **Vehicle positions data**: download [RT vehicle positions](./download_vehicle_positions.py)
7+
1. Use the `Makefile` and download schedule and RT data. In terminal: `make download_gtfs_data_one_day`

gtfs_funnel/cleanup.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""
2+
Remove staged files.
3+
4+
Do this in a separate script in case we don't want to run.
5+
"""
6+
import gcsfs
7+
8+
fs = gcsfs.GCSFileSystem()
9+
10+
def if_exists_then_delete(filepath):
11+
if fs.exists(filepath):
12+
if fs.isdir(filepath):
13+
fs.rm(filepath, recursive=True)
14+
else:
15+
fs.rm(filepath)
16+
17+
return
18+
19+
20+
if __name__ == "__main__":
21+
22+
from update_vars import analysis_date_list, CONFIG_DICT
23+
24+
for analysis_date in analysis_date_list:
25+
26+
INPUT_FILE = CONFIG_DICT["usable_vp_file"]
27+
if_exists_then_delete(f"{SEGMENT_GCS}{INPUT_FILE}_{analysis_date}_stage")
28+
if_exists_then_delete(f"{SEGMENT_GCS}vp_direction_{analysis_date}.parquet")
29+

open_data/concatenate_vehicle_positions.py renamed to gtfs_funnel/concatenate_vehicle_positions.py

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -81,47 +81,51 @@ def remove_batched_parquets(analysis_date: str):
8181

8282

8383
if __name__ == "__main__":
84+
85+
from update_vars import analysis_date_list
8486

8587
LOG_FILE = "./logs/download_vp_v2.log"
8688
logger.add(LOG_FILE, retention="3 months")
8789
logger.add(sys.stderr,
8890
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
8991
level="INFO")
9092

91-
logger.info(f"Analysis date: {analysis_date}")
92-
93-
start = datetime.datetime.now()
94-
95-
# Concatenate all the batches
96-
concatenated_vp_df = concat_batches(analysis_date)
97-
98-
time1 = datetime.datetime.now()
99-
logger.info(f"concat and filter batched data: {time1 - start}")
100-
101-
concatenated_vp_df.to_parquet(
102-
f"{SEGMENT_GCS}vp_{analysis_date}_concat",
103-
partition_on = "gtfs_dataset_key")
93+
for analysis_date in analysis_date_list:
10494

105-
time2 = datetime.datetime.now()
106-
logger.info(f"export concatenated vp: {time2 - time1}")
107-
108-
# Import concatenated tabular vp and make it a gdf
109-
vp = pd.read_parquet(
110-
f"{SEGMENT_GCS}vp_{analysis_date}_concat/"
111-
).reset_index(drop=True)
112-
113-
vp_gdf = vp_into_gdf(vp)
95+
logger.info(f"Analysis date: {analysis_date}")
11496

115-
utils.geoparquet_gcs_export(
116-
vp_gdf,
117-
SEGMENT_GCS,
118-
f"vp_{analysis_date}"
119-
)
120-
121-
remove_batched_parquets(analysis_date)
122-
logger.info(f"remove batched parquets")
123-
124-
end = datetime.datetime.now()
125-
logger.info(f"execution time: {end - start}")
97+
start = datetime.datetime.now()
98+
99+
# Concatenate all the batches
100+
concatenated_vp_df = concat_batches(analysis_date)
101+
102+
time1 = datetime.datetime.now()
103+
logger.info(f"concat and filter batched data: {time1 - start}")
104+
105+
concatenated_vp_df.to_parquet(
106+
f"{SEGMENT_GCS}vp_{analysis_date}_concat",
107+
partition_on = "gtfs_dataset_key")
108+
109+
time2 = datetime.datetime.now()
110+
logger.info(f"export concatenated vp: {time2 - time1}")
111+
112+
# Import concatenated tabular vp and make it a gdf
113+
vp = pd.read_parquet(
114+
f"{SEGMENT_GCS}vp_{analysis_date}_concat/"
115+
).reset_index(drop=True)
116+
117+
vp_gdf = vp_into_gdf(vp)
118+
119+
utils.geoparquet_gcs_export(
120+
vp_gdf,
121+
SEGMENT_GCS,
122+
f"vp_{analysis_date}"
123+
)
124+
125+
remove_batched_parquets(analysis_date)
126+
logger.info(f"remove batched parquets")
127+
128+
end = datetime.datetime.now()
129+
logger.info(f"execution time: {end - start}")
126130

127131

gtfs_funnel/config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
raw_vp_file: "vp"
2+
usable_vp_file: "vp_usable"
3+
timestamp_col: "location_timestamp_local"
4+
time_min_cutoff: 10

open_data/download_shapes.py renamed to gtfs_funnel/download_shapes.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
from download_trips import get_operators
1414
from shared_utils import gtfs_utils_v2
1515
from calitp_data_analysis import geography_utils, utils
16-
from update_vars import analysis_date, COMPILED_CACHED_VIEWS
16+
from segment_speed_utils.project_vars import COMPILED_CACHED_VIEWS
1717

18-
if __name__ == "__main__":
19-
20-
logger.add("./logs/download_data.log", retention="3 months")
21-
logger.add(sys.stderr,
22-
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
23-
level="INFO")
24-
18+
def download_one_day(analysis_date: str):
19+
"""
20+
Download single day for shapes.
21+
"""
2522
logger.info(f"Analysis date: {analysis_date}")
2623
start = dt.datetime.now()
2724

@@ -64,4 +61,18 @@
6461

6562
end = dt.datetime.now()
6663
logger.info(f"execution time: {end-start}")
67-
64+
65+
return
66+
67+
68+
if __name__ == "__main__":
69+
70+
from update_vars import analysis_date_list
71+
72+
logger.add("./logs/download_data.log", retention="3 months")
73+
logger.add(sys.stderr,
74+
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
75+
level="INFO")
76+
77+
for analysis_date in analysis_date_list:
78+
download_one_day(analysis_date)

open_data/download_stop_times.py renamed to gtfs_funnel/download_stop_times.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,12 @@
1212
from loguru import logger
1313

1414
from shared_utils import gtfs_utils_v2
15-
from update_vars import analysis_date, COMPILED_CACHED_VIEWS
15+
from segment_speed_utils.project_vars import COMPILED_CACHED_VIEWS
1616

17-
18-
if __name__=="__main__":
19-
20-
logger.add("./logs/download_data.log", retention="3 months")
21-
logger.add(sys.stderr,
22-
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
23-
level="INFO")
24-
17+
def download_one_day(analysis_date: str):
18+
"""
19+
Download single day for stop_times.
20+
"""
2521
logger.info(f"Analysis date: {analysis_date}")
2622
start = dt.datetime.now()
2723

@@ -57,4 +53,19 @@
5753
f"{COMPILED_CACHED_VIEWS}{dataset}_{analysis_date}.parquet")
5854

5955
end = dt.datetime.now()
60-
logger.info(f"execution time: {end-start}")
56+
logger.info(f"execution time: {end-start}")
57+
58+
return
59+
60+
61+
if __name__=="__main__":
62+
63+
from update_vars import analysis_date_list
64+
65+
logger.add("./logs/download_data.log", retention="3 months")
66+
logger.add(sys.stderr,
67+
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
68+
level="INFO")
69+
70+
for analysis_date in analysis_date_list:
71+
download_one_day(analysis_date)

open_data/download_stops.py renamed to gtfs_funnel/download_stops.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
from download_trips import get_operators
1414
from calitp_data_analysis import geography_utils, utils
1515
from shared_utils import gtfs_utils_v2
16-
from update_vars import analysis_date, COMPILED_CACHED_VIEWS
16+
from segment_speed_utils.project_vars import COMPILED_CACHED_VIEWS
1717

18-
if __name__ == "__main__":
19-
20-
logger.add("./logs/download_data.log", retention="3 months")
21-
logger.add(sys.stderr,
22-
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
23-
level="INFO")
24-
18+
19+
def download_one_day(analysis_date: str):
20+
"""
21+
Download single day for stops.
22+
"""
2523
logger.info(f"Analysis date: {analysis_date}")
2624
start = dt.datetime.now()
2725

@@ -48,13 +46,12 @@
4846
"stop_event_count"
4947
] + route_type_cols + ["missing_route_type"]
5048

51-
5249
stops = gtfs_utils_v2.get_stops(
5350
selected_date = analysis_date,
5451
operator_feeds = FEEDS_TO_RUN,
5552
stop_cols = keep_stop_cols,
5653
get_df = True,
57-
crs = geography_utils.CA_NAD83Albers,
54+
crs = geography_utils.WGS84,
5855
)
5956

6057
utils.geoparquet_gcs_export(
@@ -65,3 +62,18 @@
6562

6663
end = dt.datetime.now()
6764
logger.info(f"execution time: {end-start}")
65+
66+
return
67+
68+
69+
if __name__ == "__main__":
70+
71+
from update_vars import analysis_date_list
72+
73+
logger.add("./logs/download_data.log", retention="3 months")
74+
logger.add(sys.stderr,
75+
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
76+
level="INFO")
77+
78+
for analysis_date in analysis_date_list:
79+
download_one_day(analysis_date)

open_data/download_trips.py renamed to gtfs_funnel/download_trips.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from loguru import logger
1212

1313
from shared_utils import gtfs_utils_v2
14-
from update_vars import analysis_date, COMPILED_CACHED_VIEWS
14+
from segment_speed_utils.project_vars import COMPILED_CACHED_VIEWS
1515

1616

1717
def get_operators(analysis_date: str):
@@ -39,22 +39,19 @@ def get_operators(analysis_date: str):
3939
return operators_to_include
4040

4141

42-
if __name__=="__main__":
43-
44-
logger.add("./logs/download_data.log", retention="3 months")
45-
logger.add(sys.stderr,
46-
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
47-
level="INFO")
48-
42+
def download_one_day(analysis_date: str):
43+
"""
44+
Download single day for trips.
45+
"""
4946
logger.info(f"Analysis date: {analysis_date}")
5047
start = dt.datetime.now()
51-
48+
5249
operators_df = get_operators(analysis_date)
53-
50+
5451
FEEDS_TO_RUN = sorted(operators_df.feed_key.unique().tolist())
55-
52+
5653
logger.info(f"# operators to run: {len(FEEDS_TO_RUN)}")
57-
54+
5855
dataset = "trips"
5956
logger.info(f"*********** Download {dataset} data ***********")
6057

@@ -74,16 +71,31 @@ def get_operators(analysis_date: str):
7471
"trip_start_date_local_tz", "trip_first_departure_datetime_local_tz",
7572
"trip_last_arrival_datetime_local_tz"
7673
]
77-
74+
7875
trips = gtfs_utils_v2.get_trips(
7976
selected_date = analysis_date,
8077
operator_feeds = FEEDS_TO_RUN,
8178
trip_cols = keep_trip_cols,
8279
get_df = True,
8380
)
84-
81+
8582
trips.to_parquet(
8683
f"{COMPILED_CACHED_VIEWS}{dataset}_{analysis_date}.parquet")
87-
84+
8885
end = dt.datetime.now()
89-
logger.info(f"execution time: {end-start}")
86+
logger.info(f"execution time: {end-start}")
87+
88+
return
89+
90+
91+
if __name__=="__main__":
92+
93+
from update_vars import analysis_date_list
94+
95+
logger.add("./logs/download_data.log", retention="3 months")
96+
logger.add(sys.stderr,
97+
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
98+
level="INFO")
99+
100+
for analysis_date in analysis_date_list:
101+
download_one_day(analysis_date)

0 commit comments

Comments
 (0)