Skip to content

Commit

Permalink
stacked all crosswalk dates so operator profile is complete. upload t…
Browse files Browse the repository at this point in the history
…o public gcs
  • Loading branch information
amandaha8 committed Jul 5, 2024
1 parent 8d78a70 commit ea5f85f
Show file tree
Hide file tree
Showing 11 changed files with 2,294 additions and 692 deletions.
772 changes: 730 additions & 42 deletions gtfs_digest/03_report.ipynb

Large diffs are not rendered by default.

1,926 changes: 1,304 additions & 622 deletions gtfs_digest/22_publish_public_data.ipynb

Large diffs are not rendered by default.

46 changes: 30 additions & 16 deletions gtfs_digest/_gtfs_digest_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def load_operator_profiles()->pd.DataFrame:
"""
op_profiles_url = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.operator_profiles}.parquet"

op_profiles_df = pd.read_parquet(op_profiles_url)

ntd_cols = [
"schedule_gtfs_dataset_key",
"counties_served",
Expand All @@ -135,25 +137,37 @@ def load_operator_profiles()->pd.DataFrame:
"reporter_type"
]

# Load NTD through the crosswalk for the most recent date
most_recent_date = rt_dates.y2024_dates[-1]

ntd_df = helpers.import_schedule_gtfs_key_organization_crosswalk(most_recent_date)[
ntd_cols]
all_dates = (rt_dates.y2024_dates + rt_dates.y2023_dates +
rt_dates.oct2023_week + rt_dates.apr2023_week +
rt_dates.apr2024_week
)

op_profiles_df = pd.read_parquet(op_profiles_url)

# Keep only the most recent row
op_profiles_df1 = (op_profiles_df
.sort_values(by = ['service_date'], ascending = False)
.drop_duplicates(subset = ["schedule_gtfs_dataset_key",
"name"])
).reset_index(drop = True)
# Add NTD data.
CROSSWALK = GTFS_DATA_DICT.schedule_tables.gtfs_key_crosswalk
crosswalk_df = (
time_series_utils.concatenate_datasets_across_dates(
SCHED_GCS,
CROSSWALK,
all_dates,
data_type="df",
columns=ntd_cols
)
.sort_values(["service_date"])
.reset_index(drop=True)
)

# Merge
op_profiles_df1 = pd.merge(op_profiles_df1, ntd_df, on = ["schedule_gtfs_dataset_key"], how = "left")

return op_profiles_df1
merge_cols = ["schedule_gtfs_dataset_key", "service_date"]
op_profiles_df1 = pd.merge(op_profiles_df,
crosswalk_df,
on = merge_cols,
how = "left")

# Drop duplicates created after merging
op_profiles_df2 = (op_profiles_df1
.drop_duplicates(subset = list(op_profiles_df1.columns))
.reset_index(drop = True))
return op_profiles_df2

if __name__ == "__main__":

Expand Down
188 changes: 188 additions & 0 deletions gtfs_digest/_gtfs_digest_specific_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import datetime
import pandas as pd
import numpy as np
from segment_speed_utils import helpers, time_series_utils
from shared_utils import catalog_utils, rt_dates

GTFS_DATA_DICT = catalog_utils.get_catalog("gtfs_analytics_data")
from segment_speed_utils.project_vars import (COMPILED_CACHED_VIEWS, RT_SCHED_GCS, SCHED_GCS)

"""
Datasets that are relevant to
GTFS Digest Portfolio work only.
"""
def concatenate_trips(
date_list: list,
) -> pd.DataFrame:
"""
Concatenate schedule data that's been
aggregated to route-direction-time_period for
multiple days.
"""
FILE = GTFS_DATA_DICT.schedule_downloads.trips

df = (
time_series_utils.concatenate_datasets_across_dates(
COMPILED_CACHED_VIEWS,
FILE,
date_list,
data_type="df",
columns=[
"name",
"service_date",
"route_long_name",
"trip_first_departure_datetime_pacific",
"service_hours",
],
)
.sort_values(["service_date"])
.reset_index(drop=True)
)

return df

def get_day_type(date):
"""
Function to return the day type (e.g., Monday, Tuesday, etc.) from a datetime object.
"""
days_of_week = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
return days_of_week[date.weekday()]

def weekday_or_weekend(row):
"""
Tag if a day is a weekday or Saturday/Sunday
"""
if row.day_type == "Sunday":
return "Sunday"
if row.day_type == "Saturday":
return "Saturday"
else:
return "Weekday"

def total_service_hours(date_list: list) -> pd.DataFrame:
"""
Total up service hours by departure hour,
month, and day type for an operator.
"""
# Combine all the days' data for a week
df = concatenate_trips(date_list)

# Filter
# df = df.loc[df.name == name].reset_index(drop=True)

# Add day type aka Monday, Tuesday, Wednesday...
df['day_type'] = df['service_date'].apply(get_day_type)

# Tag if the day is a weekday, Saturday, or Sunday
df["weekend_weekday"] = df.apply(weekday_or_weekend, axis=1)

# Find the minimum departure hour
df["departure_hour"] = df.trip_first_departure_datetime_pacific.dt.hour

# Delete out the specific day, leave only month & year
df["month"] = df.service_date.astype(str).str.slice(stop=7)

df2 = (
df.groupby(["name", "month", "weekend_weekday", "departure_hour"])
.agg(
{
"service_hours": "sum",
}
)
.reset_index()
)
df2["weekday_service_hours"] = df2.service_hours/5
df2 = df2.rename(columns = {'service_hours':'weekend_service_hours'})
return df2

def total_service_hours_all_months() -> pd.DataFrame:
"""
Find service hours for a full week for one operator
and for the months we have a full week's worth of data downloaded.
As of 5/2024, we have April 2023 and October 2023.
"""
# Grab the dataframes with a full week's worth of data.
apr_23week = rt_dates.get_week(month="apr2023", exclude_wed=False)
oct_23week = rt_dates.get_week(month="oct2023", exclude_wed=False)
apr_24week = rt_dates.get_week(month="apr2024", exclude_wed=False)

# Sum up total service_hours
apr_23df = total_service_hours(apr_23week)
oct_23df = total_service_hours(oct_23week)
apr_24df = total_service_hours(apr_24week)

# Combine everything
all_df = pd.concat([apr_23df, oct_23df, apr_24df])


return all_df

def load_operator_profiles()->pd.DataFrame:
"""
Load operator profile dataset for one operator
"""
op_profiles_url = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.operator_profiles}.parquet"

op_profiles_df = pd.read_parquet(op_profiles_url)

ntd_cols = [
"schedule_gtfs_dataset_key",
"counties_served",
"service_area_sq_miles",
"hq_city",
"uza_name",
"service_area_pop",
"organization_type",
"primary_uza",
"reporter_type"
]

all_dates = (rt_dates.y2024_dates + rt_dates.y2023_dates +
rt_dates.oct2023_week + rt_dates.apr2023_week +
rt_dates.apr2024_week
)

# Add NTD data.
CROSSWALK = GTFS_DATA_DICT.schedule_tables.gtfs_key_crosswalk
crosswalk_df = (
time_series_utils.concatenate_datasets_across_dates(
SCHED_GCS,
CROSSWALK,
all_dates,
data_type="df",
columns=ntd_cols
)
.sort_values(["service_date"])
.reset_index(drop=True)
)

# Merge
merge_cols = ["schedule_gtfs_dataset_key", "service_date"]
op_profiles_df1 = pd.merge(op_profiles_df,
crosswalk_df,
on = merge_cols,
how = "left")

# Drop duplicates created after merging
op_profiles_df2 = (op_profiles_df1
.drop_duplicates(subset = list(op_profiles_df1.columns))
.reset_index(drop = True))
return op_profiles_df2

if __name__ == "__main__":

# Save to GCS
OP_PROFILE_EXPORT = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.operator_profile_portfolio_view}.parquet"
SERVICE_EXPORT = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.scheduled_service_hours}.parquet"
start = datetime.datetime.now()

# Save operator profiles with NTD
operator_profiles = load_operator_profiles()
operator_profiles.to_parquet(OP_PROFILE_EXPORT)

# Save service hours
service_hours = total_service_hours_all_months()
service_hours.to_parquet(SERVICE_EXPORT)

end = datetime.datetime.now()
print(f"GTFS Digest Datasets: {end - start}")
13 changes: 7 additions & 6 deletions gtfs_digest/_portfolio_notes_todo.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
* Portfolio:
* Figure out Makefile situation.

### 7/4/2024
* Upload datasets to the public GCS.
* Add a new script for running all the operators and saving that file to the public GCS. Currently, my functions run on operator level.
### 7/5/2024
* <s>Fix `operator_profiles` so all the dates & operators are included.</s>
* <s>Upload datasets to the public GCS.
* Add this to the make file.
* Rerun the entire portfolio?
* Make sure datasets have a CSV and parquet version.</s>
* Rerun a subset of operator...Didn't work on 7/3.

### 7/3/2024 Goals
* <s>Switch color palette to colorblind friendly one.</s>
* Switch NTD info to crosswalk. Read in crosswalk file when I load in `gtfs_digest/_section2_utils/operator_profiles`.
* <s>Switch NTD info to crosswalk. Read in crosswalk file when I load in `gtfs_digest/_section2_utils/operator_profiles`.</s>
* Question: Do I need to upload this specific operator_profile view with all the NTD stuff to the public GCS?
* <s>Move Monthly Services data to its own file in `gtfs_digest`</s>
* Rerun a subset of operators for the GTFS Digest test site.
* <b>Rerun a subset of operators for the GTFS Digest test site. AH: this isn't working, posted on data_office_hours</b>

### 7/2/2024 Notes
* <s>Cardinal Direction
Expand Down
3 changes: 2 additions & 1 deletion gtfs_digest/_section1_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def load_operator_ntd_profile(organization_name:str)->pd.DataFrame:
op_profiles_url,
filters=[[("organization_name", "==", organization_name)]])

# Keep only the most recent row
op_profiles_df1 = op_profiles_df.sort_values(by = ['service_date'], ascending = False).head(1)

# Rename dataframe
Expand All @@ -85,7 +86,7 @@ def load_operator_ntd_profile(organization_name:str)->pd.DataFrame:

def load_operator_service_hours(name:str)->pd.DataFrame:

url = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.operator_profile_portfolio_view}.parquet"
url = f"{GTFS_DATA_DICT.digest_tables.dir}{GTFS_DATA_DICT.digest_tables.scheduled_service_hours}.parquet"

df = pd.read_parquet(url,
filters=[[(("name", "==", name))]])
Expand Down
2 changes: 1 addition & 1 deletion gtfs_digest/_section2_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ def frequency_chart(

chart = (
alt.Chart(df)
.mark_bar(size=7, clip=True)
.mark_bar(size=3, clip=True)
.encode(
y=alt.Y(
"yearmonthdate(Date):O",
Expand Down
6 changes: 4 additions & 2 deletions gtfs_funnel/crosswalk_gtfs_dataset_key_to_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def create_gtfs_dataset_key_to_organization_crosswalk(
def load_ntd(year: int) -> pd.DataFrame:
"""
Load NTD Data stored in our warehouse.
Select certain columns.
"""
df = (
tbls.mart_ntd.dim_annual_ntd_agency_information()
Expand Down Expand Up @@ -146,7 +147,8 @@ def merge_ntd_mobility(year:int)->pd.DataFrame:
analysis_date
)

# Add NTD
# Add some NTD data: if I want to delete this, simply take out
# ntd_df and the crosswalk_df merge.
ntd_df = merge_ntd_mobility(ntd_latest_year)

crosswalk_df = pd.merge(df,
Expand All @@ -156,7 +158,7 @@ def merge_ntd_mobility(year:int)->pd.DataFrame:
how = "left")

# Drop ntd_id from ntd_df to avoid confusion
crosswalk_df = crosswalk_df.drop(columns = ["ntd_id"])
crosswalk_df = crosswalk_df.drop(columns = ["ntd_id", "agency_name"])

crosswalk_df.to_parquet(
f"{SCHED_GCS}{EXPORT}_{analysis_date}.parquet"
Expand Down
25 changes: 25 additions & 0 deletions portfolio/gtfs_digest_testing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# GTFS Digest
This portfolio houses performance metrics from GTFS schedule and vehicle positions time-series data for all transit operators by route.

To download our processed full data that powers this portfolio, please navigate to the folder titled `gtfs_digest` [here](https://console.cloud.google.com/storage/browser/calitp-publish-data-analysis). You will find the most recent datasets in `.parquet, .csv,.geojson` formats. Match the [readable column names](https://github.com/cal-itp/data-analyses/blob/main/gtfs_digest/readable.yml) to the table names. The data pulled from the Federal Transit Administration's National Transit Data is located [here](https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information).
## Common Questions
<b>To read about the methodology, please visit [here](https://github.com/cal-itp/data-analyses/blob/main/gtfs_digest/methodology.md).</b></br>
**Why is time-series table sampling single days?**

GTFS provides us with extremely detailed information, such as the time a bus is scheduled to arrive at a stop, and the GPS coordinates of a bus at a given timestamp. When working with granular data like this, a single day statewide can be a very large table.

For context, on our sampled date in January 2024 there were 100k+ trips and 3.6 million+ stop arrivals, and that's just scheduled data. Our vehicle positions table, after deduplicating in our warehouse, had 15 million+ rows. On top of that, each operator can have a quartet of GTFS data (1 schedule table + 3 real-time tables).

Getting our pipeline right is fairly complex for a single day. Our warehouse has a set of internal keys to ensure we're matching trip for trip across quartets. If you factor in the fact that operators can update their GTFS feeds at any time in the month, there are a lot of things that are changing!

We do have monthly aggregations on our roadmap, but for now, we're building out our own time-series tables of processed data, and working through the kinks of being able to track the same route over time (as feeds get updated, identifiers change, etc). We will be starting with schedule data to figure out how to produce monthly aggregations in a scalable way.

**How does GTFS Digest fit into SB 125 performance metrics?**

[SB 125](https://calsta.ca.gov/subject-areas/sb125-transit-program) and the creation of the Transit Transformation Task Force has a section on creating performance metrics for transit operators statewide. Dive into the [legislative bill](https://legiscan.com/CA/text/SB125/id/2831757).

The Caltrans Division of Data & Digital Services has been ingesting and collecting GTFS data in our warehouse since 2021. Our own internal effort has been to create data pipelines so that the rich and comprehensive data we collect can be processed and made available for public consumption.

There overlaps with the goals of SB 125. There are a set of performance metrics that could be of interest to the task force, the public, and us! However, GTFS Digest is a **GTFS** digest, which means its primary focus is on metrics that can be derived purely from GTFS, and to do it statewide so we can understand transit operator performance. We based a lot of our metrics on the papers by [Professor Gregory Newmark](https://www.morgan.edu/sap/gregory-newmark) that gave us a roadmap of metrics that could be derived solely from GTFS that would create comparisons of transit operators regardless of size, service area and density.

GTFS Digest will continue to evolve as we dive into our own warehouse!
1 change: 1 addition & 0 deletions portfolio/gtfs_digest_testing/district_04-oakland.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# District 04 - Oakland
Git LFS file not shown

0 comments on commit ea5f85f

Please sign in to comment.