From 8f350f5be90499c3f8fde918d5c3186b54b2f7a7 Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Fri, 22 Nov 2024 11:13:15 -0800 Subject: [PATCH 01/13] Include 2023 on year test for staging NTD service tables [#3523] --- warehouse/models/staging/ntd_annual_data_tables/_src.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/warehouse/models/staging/ntd_annual_data_tables/_src.yml b/warehouse/models/staging/ntd_annual_data_tables/_src.yml index 383dabc05d..1bb8da7080 100644 --- a/warehouse/models/staging/ntd_annual_data_tables/_src.yml +++ b/warehouse/models/staging/ntd_annual_data_tables/_src.yml @@ -41,7 +41,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: agency @@ -247,7 +247,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: type_of_service @@ -275,7 +275,7 @@ sources: description: The year for which the data was reported. tests: - accepted_values: - values: [2022] + values: [2022, 2023] - name: _5_digit_ntd_id description: A five-digit identifying number for each agency used in the current NTD system. - name: agency From 4a6c167da83b5aa7d2a9568b56e03b05dc1395a7 Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Tue, 12 Nov 2024 16:27:45 -0800 Subject: [PATCH 02/13] Change NTD monthly schedule to run every Monday instead of every day. There is no need to run every day since it is a monthly file, but having the process running once a week give us a better window to add and process new files once it is released. [#3519] --- airflow/dags/sync_ntd_data_xlsx/METADATA.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml index b25e179385..f42903c3b5 100644 --- a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml +++ b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml @@ -1,5 +1,5 @@ -description: "Scrape tables from DOT Ridership XLSX file daily" -schedule_interval: "0 10 * * *" # 10am UTC every day +description: "Scrape tables from DOT Ridership XLSX file weekly" +schedule_interval: "0 10 * * 1" # 10am UTC every Monday tags: - all_gusty_features default_args: From 078bd261b6bd37a92484bbd3c450011054ba87b4 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Fri, 22 Nov 2024 17:45:54 -0500 Subject: [PATCH 03/13] expose all ntd annual reporting staging tables as mart (#3550) * expose the ntd annual staging tables as marts available in metabase * test commit * add most recent dbt mart schemas to dbt profile * fixed mart model sql to correctly reference stagin tables, remove unreferenced config * removed unused common fields in yml * list models in alphabetical order in yml --- warehouse/dbt_project.yml | 2 + .../ntd_fct_annual/_mart_ntd_fct_annual.yml | 37 +++++++++++++++++++ .../fct_ntd_annual_data__breakdowns.sql | 11 ++++++ ..._ntd_annual_data__breakdowns_by_agency.sql | 11 ++++++ ..._data__capital_expenses_by_capital_use.sql | 11 ++++++ ..._annual_data__capital_expenses_by_mode.sql | 11 ++++++ ..._capital_expenses_for_existing_service.sql | 11 ++++++ ...ital_expenses_for_expansion_of_service.sql | 11 ++++++ ...t_ntd_annual_data__employees_by_agency.sql | 11 ++++++ ...fct_ntd_annual_data__employees_by_mode.sql | 11 ++++++ ...a__employees_by_mode_and_employee_type.sql | 11 ++++++ .../fct_ntd_annual_data__fuel_and_energy.sql | 11 ++++++ ...annual_data__fuel_and_energy_by_agency.sql | 11 ++++++ ..._data__funding_sources_by_expense_type.sql | 11 ++++++ ...ta__funding_sources_directly_generated.sql | 11 ++++++ ...d_annual_data__funding_sources_federal.sql | 11 ++++++ ...ntd_annual_data__funding_sources_local.sql | 11 ++++++ ...ntd_annual_data__funding_sources_state.sql | 11 ++++++ ...funding_sources_taxes_levied_by_agency.sql | 11 ++++++ ...td_annual_data__maintenance_facilities.sql | 11 ++++++ ...data__maintenance_facilities_by_agency.sql | 11 ++++++ .../fct_ntd_annual_data__metrics.sql | 11 ++++++ ...l_data__operating_expenses_by_function.sql | 11 ++++++ ...rating_expenses_by_function_and_agency.sql | 11 ++++++ ...nnual_data__operating_expenses_by_type.sql | 11 ++++++ ..._operating_expenses_by_type_and_agency.sql | 11 ++++++ ...fct_ntd_annual_data__service_by_agency.sql | 11 ++++++ .../fct_ntd_annual_data__service_by_mode.sql | 11 ++++++ ..._data__service_by_mode_and_time_period.sql | 11 ++++++ ...facilities_by_agency_and_facility_type.sql | 11 ++++++ ..._annual_data__stations_by_mode_and_age.sql | 11 ++++++ ...nual_data__track_and_roadway_by_agency.sql | 11 ++++++ ...annual_data__track_and_roadway_by_mode.sql | 11 ++++++ ..._and_roadway_guideway_age_distribution.sql | 11 ++++++ ...annual_data__vehicles_age_distribution.sql | 11 ++++++ ...al_data__vehicles_type_count_by_agency.sql | 11 ++++++ 36 files changed, 413 insertions(+) create mode 100644 warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql create mode 100644 warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql diff --git a/warehouse/dbt_project.yml b/warehouse/dbt_project.yml index be4c05713f..74bb12a0c6 100644 --- a/warehouse/dbt_project.yml +++ b/warehouse/dbt_project.yml @@ -69,3 +69,5 @@ models: schema: mart_benefits ntd_validation: schema: mart_ntd_validation + ntd_fct_annual: + schema: mart_ntd_fct_annual diff --git a/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml b/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml new file mode 100644 index 0000000000..21dec5a337 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/_mart_ntd_fct_annual.yml @@ -0,0 +1,37 @@ +version: 2 + +models: + - name: fct_ntd_annual_data__breakdowns + - name: fct_ntd_annual_data__breakdowns_by_agency + - name: fct_ntd_annual_data__capital_expenses_by_capital_use + - name: fct_ntd_annual_data__capital_expenses_by_mode + - name: fct_ntd_annual_data__capital_expenses_for_existing_service + - name: fct_ntd_annual_data__capital_expenses_for_expansion_of_service + - name: fct_ntd_annual_data__employees_by_agency + - name: fct_ntd_annual_data__employees_by_mode + - name: fct_ntd_annual_data__employees_by_mode_and_employee_type + - name: fct_ntd_annual_data__fuel_and_energy + - name: fct_ntd_annual_data__fuel_and_energy_by_agency + - name: fct_ntd_annual_data__funding_sources_by_expense_type + - name: fct_ntd_annual_data__funding_sources_directly_generated + - name: fct_ntd_annual_data__funding_sources_federal + - name: fct_ntd_annual_data__funding_sources_local + - name: fct_ntd_annual_data__funding_sources_state + - name: fct_ntd_annual_data__funding_sources_taxes_levied_by_agency + - name: fct_ntd_annual_data__maintenance_facilities + - name: fct_ntd_annual_data__maintenance_facilities_by_agency + - name: fct_ntd_annual_data__metrics + - name: fct_ntd_annual_data__operating_expenses_by_function + - name: fct_ntd_annual_data__operating_expenses_by_function_and_agency + - name: fct_ntd_annual_data__operating_expenses_by_type + - name: fct_ntd_annual_data__operating_expenses_by_type_and_agency + - name: fct_ntd_annual_data__service_by_agency + - name: fct_ntd_annual_data__service_by_mode + - name: fct_ntd_annual_data__service_by_mode_and_time_period + - name: fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type + - name: fct_ntd_annual_data__stations_by_mode_and_age + - name: fct_ntd_annual_data__track_and_roadway_by_agency + - name: fct_ntd_annual_data__track_and_roadway_by_mode + - name: fct_ntd_annual_data__track_and_roadway_guideway_age_distribution + - name: fct_ntd_annual_data__vehicles_age_distribution + - name: fct_ntd_annual_data__vehicles_type_count_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql new file mode 100644 index 0000000000..a860034665 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns.sql @@ -0,0 +1,11 @@ +WITH staging_breakdowns AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__breakdowns') }} +), + +fct_ntd_annual_data__breakdowns AS ( + SELECT * + FROM staging_breakdowns +) + +SELECT * FROM fct_ntd_annual_data__breakdowns diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql new file mode 100644 index 0000000000..ffdf747d28 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__breakdowns_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_breakdowns_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__breakdowns_by_agency') }} +), + +fct_ntd_annual_data__breakdowns_by_agency AS ( + SELECT * + FROM staging_breakdowns_by_agency +) + +SELECT * FROM fct_ntd_annual_data__breakdowns_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql new file mode 100644 index 0000000000..82882ea9f1 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_capital_use.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_by_capital_use AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_by_capital_use') }} +), + +fct_ntd_annual_data__capital_expenses_by_capital_use AS ( + SELECT * + FROM staging_capital_expenses_by_capital_use +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_by_capital_use diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql new file mode 100644 index 0000000000..96457a6c8b --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_by_mode') }} +), + +fct_ntd_annual_data__capital_expenses_by_mode AS ( + SELECT * + FROM staging_capital_expenses_by_mode +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql new file mode 100644 index 0000000000..d219716996 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_existing_service.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_for_existing_service AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_for_existing_service') }} +), + +fct_ntd_annual_data__capital_expenses_for_existing_service AS ( + SELECT * + FROM staging_capital_expenses_for_existing_service +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_for_existing_service diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql new file mode 100644 index 0000000000..ca2a7b72be --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__capital_expenses_for_expansion_of_service.sql @@ -0,0 +1,11 @@ +WITH staging_capital_expenses_for_expansion_of_service AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__capital_expenses_for_expansion_of_service') }} +), + +fct_ntd_annual_data__capital_expenses_for_expansion_of_service AS ( + SELECT * + FROM staging_capital_expenses_for_expansion_of_service +) + +SELECT * FROM fct_ntd_annual_data__capital_expenses_for_expansion_of_service diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql new file mode 100644 index 0000000000..1b74d3bf79 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_agency') }} +), + +fct_ntd_annual_data__employees_by_agency AS ( + SELECT * + FROM staging_employees_by_agency +) + +SELECT * FROM fct_ntd_annual_data__employees_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql new file mode 100644 index 0000000000..fac6d6aa76 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_mode') }} +), + +fct_ntd_annual_data__employees_by_mode AS ( + SELECT * + FROM staging_employees_by_mode +) + +SELECT * FROM fct_ntd_annual_data__employees_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql new file mode 100644 index 0000000000..01efa4bcaa --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__employees_by_mode_and_employee_type.sql @@ -0,0 +1,11 @@ +WITH staging_employees_by_mode_and_employee_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__employees_by_mode_and_employee_type') }} +), + +fct_ntd_annual_data__employees_by_mode_and_employee_type AS ( + SELECT * + FROM staging_employees_by_mode_and_employee_type +) + +SELECT * FROM fct_ntd_annual_data__employees_by_mode_and_employee_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql new file mode 100644 index 0000000000..fbb795d0ae --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy.sql @@ -0,0 +1,11 @@ +WITH staging_fuel_and_energy AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__fuel_and_energy') }} +), + +fct_ntd_annual_data__fuel_and_energy AS ( + SELECT * + FROM staging_fuel_and_energy +) + +SELECT * FROM fct_ntd_annual_data__fuel_and_energy diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql new file mode 100644 index 0000000000..79f1f6e099 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__fuel_and_energy_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_fuel_and_energy_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__fuel_and_energy_by_agency') }} +), + +fct_ntd_annual_data__fuel_and_energy_by_agency AS ( + SELECT * + FROM staging_fuel_and_energy_by_agency +) + +SELECT * FROM fct_ntd_annual_data__fuel_and_energy_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql new file mode 100644 index 0000000000..0943e513ef --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_by_expense_type.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_by_expense_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_by_expense_type') }} +), + +fct_ntd_annual_data__funding_sources_by_expense_type AS ( + SELECT * + FROM staging_funding_sources_by_expense_type +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_by_expense_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql new file mode 100644 index 0000000000..93c704f122 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_directly_generated.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_directly_generated AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_directly_generated') }} +), + +fct_ntd_annual_data__funding_sources_directly_generated AS ( + SELECT * + FROM staging_funding_sources_directly_generated +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_directly_generated diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql new file mode 100644 index 0000000000..26b07b825d --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_federal.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_federal AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_federal') }} +), + +fct_ntd_annual_data__funding_sources_federal AS ( + SELECT * + FROM staging_funding_sources_federal +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_federal diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql new file mode 100644 index 0000000000..fbab02f91e --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_local.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_local AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_local') }} +), + +fct_ntd_annual_data__funding_sources_local AS ( + SELECT * + FROM staging_funding_sources_local +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_local diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql new file mode 100644 index 0000000000..128a3e581c --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_state.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_state AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_state') }} +), + +fct_ntd_annual_data__funding_sources_state AS ( + SELECT * + FROM staging_funding_sources_state +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_state diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql new file mode 100644 index 0000000000..4585ac93f8 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__funding_sources_taxes_levied_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_funding_sources_taxes_levied_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__funding_sources_taxes_levied_by_agency') }} +), + +fct_ntd_annual_data__funding_sources_taxes_levied_by_agency AS ( + SELECT * + FROM staging_funding_sources_taxes_levied_by_agency +) + +SELECT * FROM fct_ntd_annual_data__funding_sources_taxes_levied_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql new file mode 100644 index 0000000000..cba0055eb4 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities.sql @@ -0,0 +1,11 @@ +WITH staging_maintenance_facilities AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__maintenance_facilities') }} +), + +fct_ntd_annual_data__maintenance_facilities AS ( + SELECT * + FROM staging_maintenance_facilities +) + +SELECT * FROM fct_ntd_annual_data__maintenance_facilities diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql new file mode 100644 index 0000000000..bd119a2e74 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__maintenance_facilities_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_maintenance_facilities_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__maintenance_facilities_by_agency') }} +), + +fct_ntd_annual_data__maintenance_facilities_by_agency AS ( + SELECT * + FROM staging_maintenance_facilities_by_agency +) + +SELECT * FROM fct_ntd_annual_data__maintenance_facilities_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql new file mode 100644 index 0000000000..580cddef53 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__metrics.sql @@ -0,0 +1,11 @@ +WITH staging_metrics AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__metrics') }} +), + +fct_ntd_annual_data__metrics AS ( + SELECT * + FROM staging_metrics +) + +SELECT * FROM fct_ntd_annual_data__metrics diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql new file mode 100644 index 0000000000..7966560391 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_function AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_function') }} +), + +fct_ntd_annual_data__operating_expenses_by_function AS ( + SELECT * + FROM staging_operating_expenses_by_function +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_function diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql new file mode 100644 index 0000000000..694367bcbe --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_function_and_agency.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_function_and_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_function_and_agency') }} +), + +fct_ntd_annual_data__operating_expenses_by_function_and_agency AS ( + SELECT * + FROM staging_operating_expenses_by_function_and_agency +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_function_and_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql new file mode 100644 index 0000000000..8a8beb8831 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_type') }} +), + +fct_ntd_annual_data__operating_expenses_by_type AS ( + SELECT * + FROM staging_operating_expenses_by_type +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql new file mode 100644 index 0000000000..6400320d13 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__operating_expenses_by_type_and_agency.sql @@ -0,0 +1,11 @@ +WITH staging_operating_expenses_by_type_and_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__operating_expenses_by_type_and_agency') }} +), + +fct_ntd_annual_data__operating_expenses_by_type_and_agency AS ( + SELECT * + FROM staging_operating_expenses_by_type_and_agency +) + +SELECT * FROM fct_ntd_annual_data__operating_expenses_by_type_and_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql new file mode 100644 index 0000000000..4de6f4c445 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_agency') }} +), + +fct_ntd_annual_data__service_by_agency AS ( + SELECT * + FROM staging_service_by_agency +) + +SELECT * FROM fct_ntd_annual_data__service_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql new file mode 100644 index 0000000000..a1775331a8 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_mode') }} +), + +fct_ntd_annual_data__service_by_mode AS ( + SELECT * + FROM staging_service_by_mode +) + +SELECT * FROM fct_ntd_annual_data__service_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql new file mode 100644 index 0000000000..bfe4a47d9f --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__service_by_mode_and_time_period.sql @@ -0,0 +1,11 @@ +WITH staging_service_by_mode_and_time_period AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__service_by_mode_and_time_period') }} +), + +fct_ntd_annual_data__service_by_mode_and_time_period AS ( + SELECT * + FROM staging_service_by_mode_and_time_period +) + +SELECT * FROM fct_ntd_annual_data__service_by_mode_and_time_period diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql new file mode 100644 index 0000000000..ec613e72dc --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type.sql @@ -0,0 +1,11 @@ +WITH staging_stations_and_facilities_by_agency_and_facility_type AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type') }} +), + +fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type AS ( + SELECT * + FROM staging_stations_and_facilities_by_agency_and_facility_type +) + +SELECT * FROM fct_ntd_annual_data__stations_and_facilities_by_agency_and_facility_type diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql new file mode 100644 index 0000000000..3c84a410a9 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__stations_by_mode_and_age.sql @@ -0,0 +1,11 @@ +WITH staging_stations_by_mode_and_age AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__stations_by_mode_and_age') }} +), + +fct_ntd_annual_data__stations_by_mode_and_age AS ( + SELECT * + FROM staging_stations_by_mode_and_age +) + +SELECT * FROM fct_ntd_annual_data__stations_by_mode_and_age diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql new file mode 100644 index 0000000000..d1bb760745 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_by_agency') }} +), + +fct_ntd_annual_data__track_and_roadway_by_agency AS ( + SELECT * + FROM staging_track_and_roadway_by_agency +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_by_agency diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql new file mode 100644 index 0000000000..2f60b8bfe2 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_by_mode.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_by_mode AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_by_mode') }} +), + +fct_ntd_annual_data__track_and_roadway_by_mode AS ( + SELECT * + FROM staging_track_and_roadway_by_mode +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_by_mode diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql new file mode 100644 index 0000000000..5c1a4af5cf --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__track_and_roadway_guideway_age_distribution.sql @@ -0,0 +1,11 @@ +WITH staging_track_and_roadway_guideway_age_distribution AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__track_and_roadway_guideway_age_distribution') }} +), + +fct_ntd_annual_data__track_and_roadway_guideway_age_distribution AS ( + SELECT * + FROM staging_track_and_roadway_guideway_age_distribution +) + +SELECT * FROM fct_ntd_annual_data__track_and_roadway_guideway_age_distribution diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql new file mode 100644 index 0000000000..7af715712f --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_age_distribution.sql @@ -0,0 +1,11 @@ +WITH staging_vehicles_age_distribution AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__vehicles_age_distribution') }} +), + +fct_ntd_annual_data__vehicles_age_distribution AS ( + SELECT * + FROM staging_vehicles_age_distribution +) + +SELECT * FROM fct_ntd_annual_data__vehicles_age_distribution diff --git a/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql new file mode 100644 index 0000000000..dd93cce541 --- /dev/null +++ b/warehouse/models/mart/ntd_fct_annual/fct_ntd_annual_data__vehicles_type_count_by_agency.sql @@ -0,0 +1,11 @@ +WITH staging_vehicles_type_count_by_agency AS ( + SELECT * + FROM {{ ref('stg_ntd_annual_data__vehicles_type_count_by_agency') }} +), + +fct_ntd_annual_data__vehicles_type_count_by_agency AS ( + SELECT * + FROM staging_vehicles_type_count_by_agency +) + +SELECT * FROM fct_ntd_annual_data__vehicles_type_count_by_agency From 4f07ab55d69040f2fa2c040c2d6e3a2bdb1b9035 Mon Sep 17 00:00:00 2001 From: Erika Date: Mon, 25 Nov 2024 12:20:11 -0800 Subject: [PATCH 04/13] Fix Errno2 on GTFS RT validation (#3553) * Extract hourly GTFS-RT validation results to a value object * Extract GTFS-RT query for files to be processed * Remove progress bar from GTFS-RT validator script * Combine method bodies in GTFS-RT validator * Remove flow control exception from GTFS-RT validator script * Rename hour to aggregation in GTFS-RT validator script * Extract object for aggregation operations in GTFS-RT validator * GTFS-RT validator properly skips duplicate extracts * Extract hourly GTFS-RT processing logic to classes * Remove conditional from GTFS-RT validator * Make download location optional for GTFS-RT validator downloads --------- Signed-off-by: Erika Pacheco Co-authored-by: Doc Ritezel --- jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py | 1103 +++++++++-------- .../tests/test_gtfs_rt_parser.py | 27 +- 2 files changed, 582 insertions(+), 548 deletions(-) diff --git a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py index 3c272b5785..62597461df 100644 --- a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py @@ -10,30 +10,22 @@ import json import os import subprocess -import sys import tempfile import traceback from collections import defaultdict from concurrent.futures import Future, ThreadPoolExecutor from enum import Enum from functools import lru_cache -from pathlib import Path -from typing import Any, ClassVar, Dict, List, Optional, Sequence, Tuple, Union +from itertools import islice +from typing import Any, ClassVar, Dict, List, Optional, Tuple -import backoff # type: ignore import gcsfs # type: ignore import pendulum import sentry_sdk import typer -from aiohttp.client_exceptions import ( - ClientOSError, - ClientResponseError, - ServerDisconnectedError, -) from calitp_data_infra.storage import ( # type: ignore JSONL_GZIP_EXTENSION, GTFSDownloadConfig, - GTFSFeedExtract, GTFSFeedType, GTFSRTFeedExtract, GTFSScheduleFeedExtract, @@ -48,14 +40,8 @@ from google.protobuf.message import DecodeError from google.transit import gtfs_realtime_pb2 # type: ignore from pydantic import BaseModel, Field, validator -from tqdm import tqdm - -RT_VALIDATOR_JAR_LOCATION_ENV_KEY = "GTFS_RT_VALIDATOR_JAR" -JAR_DEFAULT = typer.Option( - os.environ.get(RT_VALIDATOR_JAR_LOCATION_ENV_KEY), - help="Path to the GTFS RT Validator JAR", -) +JAR_DEFAULT = os.environ["GTFS_RT_VALIDATOR_JAR"] RT_PARSED_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_PARSED"] RT_VALIDATION_BUCKET = os.environ["CALITP_BUCKET__GTFS_RT_VALIDATION"] GTFS_RT_VALIDATOR_VERSION = os.environ["GTFS_RT_VALIDATOR_VERSION"] @@ -67,22 +53,6 @@ sentry_sdk.init() -def make_dict_bq_safe(d: Dict[str, Any]) -> Dict[str, Any]: - return { - make_name_bq_safe(key): make_dict_bq_safe(value) - if isinstance(value, dict) - else value - for key, value in d.items() - } - - -def make_pydantic_model_bq_safe(model: BaseModel) -> Dict[str, Any]: - """ - This is ugly but I think it's the best option until https://github.com/pydantic/pydantic/issues/1409 - """ - return make_dict_bq_safe(json.loads(model.json())) - - class MissingMetadata(Exception): pass @@ -107,45 +77,6 @@ class RTValidationMetadata(BaseModel): gtfs_validator_version: str -def log(*args, err=False, fg=None, pbar=None, **kwargs): - # capture fg so we don't pass it to pbar - if pbar: - pbar.write(*args, **kwargs, file=sys.stderr if err else None) - else: - typer.secho(*args, err=err, fg=fg, **kwargs) - - -def upload_if_records( - fs, - tmp_dir: str, - artifact: PartitionedGCSArtifact, - records: Sequence[Union[Dict, BaseModel]], - pbar=None, -): - # BigQuery fails when trying to parse empty files, so shouldn't write them - if not records: - log( - f"WARNING: no records found for {artifact.path}, skipping upload", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - return - - log( - f"writing {len(records)} lines to {artifact.path}", - pbar=pbar, - ) - with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=tmp_dir) as f: - gzipfile = gzip.GzipFile(mode="wb", fileobj=f) - encoded = ( - r.json() if isinstance(r, BaseModel) else json.dumps(r) for r in records - ) - gzipfile.write("\n".join(encoded).encode("utf-8")) - gzipfile.close() - - put_with_retry(fs, f.name, artifact.path) - - class NoScheduleDataSpecified(Exception): pass @@ -154,25 +85,6 @@ class ScheduleDataNotFound(Exception): pass -@lru_cache -def get_schedule_extracts_for_day( - dt: pendulum.Date, -) -> Dict[str, GTFSScheduleFeedExtract]: - extracts: List[GTFSScheduleFeedExtract] - extracts, missing, invalid = fetch_all_in_partition( - cls=GTFSScheduleFeedExtract, - partitions={ - "dt": dt, - }, - ) - - # Explicitly put extracts in timestamp order so dict construction below sets - # values to the most recent extract for a given base64_url - extracts.sort(key=lambda extract: extract.ts) - - return {extract.base64_url: extract for extract in extracts} - - class RTHourlyAggregation(PartitionedGCSArtifact): partition_names: ClassVar[List[str]] = ["dt", "hour", "base64_url"] step: RTProcessingStep @@ -299,440 +211,609 @@ def dt(self) -> pendulum.Date: return self.hour.date() -def save_job_result(fs: gcsfs.GCSFileSystem, result: GTFSRTJobResult): - typer.secho( - f"saving {len(result.outcomes)} outcomes to {result.path}", - fg=typer.colors.GREEN, - ) - # TODO: I dislike having to exclude the records here - # I need to figure out the best way to have a single type represent the "metadata" of - # the content as well as the content itself - result.save_content( - fs=fs, - content="\n".join( - (json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes) - ).encode(), - exclude={"outcomes"}, - ) +class RtValidator: + def __init__(self, jar_path: str = JAR_DEFAULT): + self.jar_path = jar_path + + def execute(self, gtfs_file: str, rt_path: str): + typer.secho(f"validating {rt_path} with {gtfs_file}", fg=typer.colors.MAGENTA) + args = [ + "java", + "-jar", + str(self.jar_path), + "-gtfs", + gtfs_file, + "-gtfsRealtimePath", + rt_path, + "-sort", + "name", + ] + typer.secho(f"executing rt_validator: {' '.join(args)}") + subprocess.run( + args, + capture_output=True, + check=True, + ) -def fatal_code(e): - return isinstance(e, ClientResponseError) and e.status == 404 +class DailyScheduleExtracts: + def __init__(self, extracts: Dict[str, GTFSScheduleFeedExtract]): + self.extracts = extracts -@backoff.on_exception( - backoff.expo, - exception=(ClientOSError, ClientResponseError, ServerDisconnectedError), - max_tries=3, - giveup=fatal_code, -) -def get_with_retry(fs, *args, **kwargs): - return fs.get(*args, **kwargs) + def get_url_schedule(self, base64_url: str) -> GTFSScheduleFeedExtract: + return self.extracts[base64_url] -@backoff.on_exception( - backoff.expo, - exception=(ClientOSError, ClientResponseError, ServerDisconnectedError), - max_tries=3, -) -def put_with_retry(fs, *args, **kwargs): - return fs.put(*args, **kwargs) - - -def download_gtfs_schedule_zip( - fs, - schedule_extract: GTFSFeedExtract, - dst_dir: str, - pbar=None, -) -> str: - # fetch and zip gtfs schedule - full_dst_path = "/".join([dst_dir, schedule_extract.filename]) - log( - f"Fetching gtfs schedule data from {schedule_extract.path} to {full_dst_path}", - pbar=pbar, - ) - get_with_retry(fs, schedule_extract.path, full_dst_path) +class ScheduleStorage: + @lru_cache + def get_day(self, dt: pendulum.Date) -> DailyScheduleExtracts: + extracts, _, _ = fetch_all_in_partition( + cls=GTFSScheduleFeedExtract, + partitions={"dt": dt}, + verbose=True, + ) + # Explicitly put extracts in timestamp order so dict construction below sets + # values to the most recent extract for a given base64_url + extracts.sort(key=lambda extract: extract.ts) - # https://github.com/MobilityData/gtfs-realtime-validator/issues/92 - # try: - # os.remove(os.path.join(dst_path, "areas.txt")) - # except FileNotFoundError: - # pass + extract_dict = {extract.base64_url: extract for extract in extracts} - return full_dst_path + return DailyScheduleExtracts(extract_dict) -def execute_rt_validator( - gtfs_file: str, rt_path: str, jar_path: Path, verbose=False, pbar=None -): - log(f"validating {rt_path} with {gtfs_file}", fg=typer.colors.MAGENTA, pbar=pbar) - - args = [ - "java", - "-jar", - str(jar_path), - "-gtfs", - gtfs_file, - "-gtfsRealtimePath", - rt_path, - "-sort", - "name", - ] +class MostRecentSchedule: + def __init__(self, fs: gcsfs.GCSFileSystem, path: str, base64_validation_url: str): + self.fs = fs + self.path = path + self.base64_validation_url = base64_validation_url - log(f"executing rt_validator: {' '.join(args)}", pbar=pbar) - subprocess.run( - args, - capture_output=True, - check=True, - ) + def download(self, date: datetime.datetime) -> Optional[str]: + for day in reversed(list(date - date.subtract(days=7))): + try: + schedule_extract = ( + ScheduleStorage() + .get_day(day) + .get_url_schedule(self.base64_validation_url) + ) + except KeyError: + print( + f"no schedule data found for {self.base64_validation_url} on day {day}" + ) + continue + try: + gtfs_zip = "/".join([self.path, schedule_extract.filename]) + self.fs.get(schedule_extract.path, gtfs_zip) + return gtfs_zip + except FileNotFoundError: + print( + f"no schedule file found for {self.base64_validation_url} on day {day}" + ) + continue + return None -def validate_and_upload( - fs, - jar_path: Path, - dst_path_rt: str, - tmp_dir: str, - hour: RTHourlyAggregation, - gtfs_zip: str, - verbose: bool = False, - pbar=None, -) -> List[RTFileProcessingOutcome]: - execute_rt_validator( - gtfs_zip, - dst_path_rt, - jar_path=jar_path, - verbose=verbose, - pbar=pbar, - ) - records_to_upload = [] - outcomes = [] - for local_path, extract in hour.local_paths_to_extract(dst_path_rt).items(): - results_path = local_path + ".results.json" - try: - with open(results_path) as f: - records = json.load(f) - except FileNotFoundError as e: - # This exception was previously generating the error "[Errno 2] No such file or directory" - msg = f"WARNING: no validation output file found in {results_path} for {extract.path}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, - ) - outcomes.append( +class AggregationExtract: + def __init__(self, path: str, extract: GTFSRTFeedExtract): + self.path = path + self.extract = extract + + def get_local_path(self) -> str: + return os.path.join(self.path, self.extract.timestamped_filename) + + def get_results_path(self) -> str: + return os.path.join( + self.path, f"{self.extract.timestamped_filename}.results.json" + ) + + def hash(self) -> bytes: + with open( + os.path.join(self.path, self.extract.timestamped_filename), "rb" + ) as f: + file_hash = hashlib.md5() + while chunk := f.read(8192): + file_hash.update(chunk) + return file_hash.digest() + + def get_results(self) -> Dict[str, str]: + with open(self.get_results_path()) as f: + return json.load(f) + + def has_results(self) -> bool: + return os.path.exists(self.get_results_path()) + + +class AggregationExtracts: + def __init__( + self, fs: gcsfs.GCSFileSystem, path: str, aggregation: RTHourlyAggregation + ): + self.fs = fs + self.path = path + self.aggregation = aggregation + + def get_path(self): + return f"{self.path}/rt_{self.aggregation.name_hash}/" + + def get_extracts(self) -> List[AggregationExtract]: + return [ + AggregationExtract(self.get_path(), e) for e in self.aggregation.extracts + ] + + def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]: + return {e.get_local_path(): e.extract for e in self.get_extracts()} + + def get_results_paths(self) -> Dict[str, GTFSRTFeedExtract]: + return {e.get_results_path(): e.extract for e in self.get_extracts()} + + def get_hashed_results(self): + hashed = {} + for e in self.get_extracts(): + if e.has_results(): + hashed[e.hash()] = e.get_results() + return hashed + + def get_hashes(self) -> Dict[bytes, List[GTFSRTFeedExtract]]: + hashed: Dict[bytes, List[GTFSRTFeedExtract]] = defaultdict(list) + for e in self.get_extracts(): + hashed[e.hash()].append(e.extract) + return hashed + + def download(self): + self.fs.get( + rpath=[extract.path for extract in self.get_local_paths().values()], + lpath=list(self.get_local_paths().keys()), + ) + + def download_most_recent_schedule(self) -> Optional[str]: + first_extract = self.aggregation.extracts[0] + schedule = MostRecentSchedule( + self.fs, self.path, first_extract.config.base64_validation_url + ) + return schedule.download(first_extract.dt) + + +class HourlyFeedQuery: + def __init__( + self, + step: RTProcessingStep, + feed_type: GTFSFeedType, + files: List[GTFSRTFeedExtract], + limit: int = 0, + base64_url: Optional[str] = None, + ): + self.step = step + self.feed_type = feed_type + self.files = files + self.limit = limit + self.base64_url = base64_url + + def set_limit(self, limit: int): + return HourlyFeedQuery( + self.step, self.feed_type, self.files, limit, self.base64_url + ) + + def where_base64url(self, base64_url: Optional[str]): + return HourlyFeedQuery( + self.step, self.feed_type, self.files, self.limit, base64_url + ) + + def get_aggregates( + self, + ) -> List[RTHourlyAggregation]: + aggregates: Dict[ + Tuple[pendulum.DateTime, str], List[GTFSRTFeedExtract] + ] = defaultdict(list) + + for file in self.files: + if self.base64_url is None or file.base64_url == self.base64_url: + aggregates[(file.hour, file.base64_url)].append(file) + + if self.limit > 0: + aggregates = dict(islice(aggregates.items(), self.limit)) + + return [ + RTHourlyAggregation( + step=self.step, + filename=f"{self.feed_type}{JSONL_GZIP_EXTENSION}", + first_extract=entries[0], + extracts=entries, + ) + for (hour, base64_url), entries in aggregates.items() + ] + + def total(self) -> int: + return sum(len(agg.extracts) for agg in self.get_aggregates()) + + +class HourlyFeedFiles: + def __init__( + self, + files: List[GTFSRTFeedExtract], + files_missing_metadata: List[Blob], + files_invalid_metadata: List[Blob], + ): + self.files = files + self.files_missing_metadata = files_missing_metadata + self.files_invalid_metadata = files_invalid_metadata + + def total(self) -> int: + return ( + len(self.files) + + len(self.files_missing_metadata) + + len(self.files_invalid_metadata) + ) + + def valid(self) -> bool: + return not self.files or len(self.files) / self.total() > 0.99 + + def get_query( + self, step: RTProcessingStep, feed_type: GTFSFeedType + ) -> HourlyFeedQuery: + return HourlyFeedQuery(step, feed_type, self.files) + + +class FeedStorage: + def __init__(self, feed_type: GTFSFeedType): + self.feed_type = feed_type + + @lru_cache + def get_hour(self, hour: datetime.datetime) -> HourlyFeedFiles: + pendulum_hour = pendulum.instance(hour, tz="Etc/UTC") + files, files_missing_metadata, files_invalid_metadata = fetch_all_in_partition( + cls=GTFSRTFeedExtract, + partitions={ + "dt": pendulum_hour.date(), + "hour": pendulum_hour, + }, + table=self.feed_type, + verbose=True, + ) + return HourlyFeedFiles(files, files_missing_metadata, files_invalid_metadata) + + +class ValidationProcessor: + def __init__( + self, + aggregation: RTHourlyAggregation, + verbose: bool = False, + ): + self.aggregation = aggregation + self.verbose = verbose + + def validator(self): + return RtValidator() + + def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: + outcomes: List[RTFileProcessingOutcome] = [] + fs = get_fs() + + if not self.aggregation.extracts[0].config.schedule_url_for_validation: + outcomes = [ RTFileProcessingOutcome( - step=hour.step, + step=self.aggregation.step, success=False, - exception=e, extract=extract, + exception=NoScheduleDataSpecified(), ) - ) - continue - - records_to_upload.extend( - [ - { - "metadata": json.loads( - RTValidationMetadata( - extract_ts=extract.ts, - extract_config=extract.config, - gtfs_validator_version=GTFS_RT_VALIDATOR_VERSION, - ).json() - ), - **record, - } - for record in records + for extract in self.aggregation.extracts ] - ) - outcomes.append( - RTFileProcessingOutcome( - step=hour.step, - success=True, - extract=extract, - aggregation=hour, + aggregation_extracts = AggregationExtracts(fs, tmp_dir, self.aggregation) + aggregation_extracts.download() + gtfs_zip = aggregation_extracts.download_most_recent_schedule() + + if not gtfs_zip: + e = ScheduleDataNotFound( + f"no recent schedule data found for {self.aggregation.extracts[0].path}" ) - ) + print(e) - upload_if_records( - fs, - tmp_dir, - artifact=hour, - records=records_to_upload, - pbar=pbar, - ) + scope.fingerprint = [ + type(e), + # convert back to url manually, I don't want to mess around with the hourly class + base64.urlsafe_b64decode(self.aggregation.base64_url.encode()).decode(), + ] + sentry_sdk.capture_exception(e, scope=scope) - return outcomes + outcomes = [ + RTFileProcessingOutcome( + step=self.aggregation.step, + success=False, + extract=extract, + exception=e, + ) + for extract in self.aggregation.extracts + ] + if not outcomes: + try: + self.validator().execute(gtfs_zip, aggregation_extracts.get_path()) -def parse_and_upload( - fs, - dst_path_rt, - tmp_dir, - hour: RTHourlyAggregation, - verbose=False, - pbar=None, -) -> List[RTFileProcessingOutcome]: - written = 0 - outcomes = [] - gzip_fname = str(tmp_dir + hour.unique_filename) + # these are the only two types of errors we expect; let any others bubble up + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode("utf-8") - # ParseFromString() seems to not release memory well, so manually handle - # writing to the gzip and cleaning up after ourselves + fingerprint: List[Any] = [ + type(e), + # convert back to url manually, I don't want to mess around with the hourly class + base64.urlsafe_b64decode( + self.aggregation.base64_url.encode() + ).decode(), + ] + fingerprint.append(e.returncode) - with gzip.open(gzip_fname, "w") as gzipfile: - for extract in hour.extracts: - feed = gtfs_realtime_pb2.FeedMessage() + # we could also use a custom exception for this + if "Unexpected end of ZLIB input stream" in stderr: + fingerprint.append("Unexpected end of ZLIB input stream") - try: - with open( - os.path.join(dst_path_rt, extract.timestamped_filename), "rb" - ) as f: - feed.ParseFromString(f.read()) - parsed = json_format.MessageToDict(feed) - except DecodeError as e: - if verbose: - log( - f"WARNING: DecodeError for {str(extract.path)}", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - outcomes.append( + scope.fingerprint = fingerprint + + # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above + scope.set_context("Process", {"stderr": stderr[-2000:]}) + + sentry_sdk.capture_exception(e, scope=scope) + + outcomes = [ RTFileProcessingOutcome( - step=RTProcessingStep.parse, + step=self.aggregation.step, success=False, - exception=e, extract=extract, + exception=e, + process_stderr=stderr, ) + for extract in self.aggregation.extracts + ] + + if not outcomes: + records_to_upload = [] + hashed_results = aggregation_extracts.get_hashed_results() + for hash, extracts in aggregation_extracts.get_hashes().items(): + try: + records = hashed_results[hash] + except KeyError as e: + if self.verbose: + paths = ", ".join(e.path for e in extracts) + typer.secho( + f"WARNING: no results found for {paths}", + fg=typer.colors.YELLOW, + ) + + for extract in extracts: + outcomes.append( + RTFileProcessingOutcome( + step=self.aggregation.step, + success=False, + exception=e, + extract=extract, + ) + ) + continue + + records_to_upload.extend( + [ + { + "metadata": json.loads( + RTValidationMetadata( + extract_ts=extracts[0].ts, + extract_config=extracts[0].config, + gtfs_validator_version=GTFS_RT_VALIDATOR_VERSION, + ).json() + ), + **record, + } + for record in records + ] ) - continue - if not parsed: - msg = f"WARNING: no parsed dictionary found in {str(extract.path)}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, + for extract in extracts: + outcomes.append( + RTFileProcessingOutcome( + step=self.aggregation.step, + success=True, + extract=extract, + aggregation=self.aggregation, + ) ) - outcomes.append( - RTFileProcessingOutcome( - step=RTProcessingStep.parse, - success=False, - exception=ValueError(msg), - extract=extract, + + # BigQuery fails when trying to parse empty files, so shouldn't write them + if records_to_upload: + typer.secho( + f"writing {len(records_to_upload)} lines to {self.aggregation.path}", + ) + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, dir=tmp_dir + ) as f: + gzipfile = gzip.GzipFile(mode="wb", fileobj=f) + encoded = ( + r.json() if isinstance(r, BaseModel) else json.dumps(r) + for r in records_to_upload ) + gzipfile.write("\n".join(encoded).encode("utf-8")) + gzipfile.close() + + fs.put(f.name, self.aggregation.path) + else: + typer.secho( + f"WARNING: no records found for {self.aggregation.path}, skipping upload", + fg=typer.colors.YELLOW, ) - continue - if "entity" not in parsed: - msg = f"WARNING: no parsed entity found in {str(extract.path)}" - if verbose: - log( - msg, - fg=typer.colors.YELLOW, - pbar=pbar, + return outcomes + + +class ParseProcessor: + def __init__(self, aggregation: RTHourlyAggregation, verbose: bool = False): + self.aggregation = aggregation + self.verbose = verbose + + def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: + outcomes: List[RTFileProcessingOutcome] = [] + fs = get_fs() + dst_path_rt = f"{tmp_dir}/rt_{self.aggregation.name_hash}/" + fs.get( + rpath=[ + extract.path + for extract in self.aggregation.local_paths_to_extract( + dst_path_rt + ).values() + ], + lpath=list(self.aggregation.local_paths_to_extract(dst_path_rt).keys()), + ) + + written = 0 + gzip_fname = str(tmp_dir + self.aggregation.unique_filename) + + # ParseFromString() seems to not release memory well, so manually handle + # writing to the gzip and cleaning up after ourselves + + with gzip.open(gzip_fname, "w") as gzipfile: + for extract in self.aggregation.extracts: + feed = gtfs_realtime_pb2.FeedMessage() + + try: + with open( + os.path.join(dst_path_rt, extract.timestamped_filename), "rb" + ) as f: + feed.ParseFromString(f.read()) + parsed = json_format.MessageToDict(feed) + except DecodeError as e: + if self.verbose: + typer.secho( + f"WARNING: DecodeError for {str(extract.path)}", + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=False, + exception=e, + extract=extract, + ) + ) + continue + + if not parsed: + msg = f"WARNING: no parsed dictionary found in {str(extract.path)}" + if self.verbose: + typer.secho( + msg, + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=False, + exception=ValueError(msg), + extract=extract, + ) + ) + continue + + if "entity" not in parsed: + msg = f"WARNING: no parsed entity found in {str(extract.path)}" + if self.verbose: + typer.secho( + msg, + fg=typer.colors.YELLOW, + ) + outcomes.append( + RTFileProcessingOutcome( + step=RTProcessingStep.parse, + success=True, + extract=extract, + header=parsed["header"], + ) ) + continue + + for record in parsed["entity"]: + gzipfile.write( + ( + json.dumps( + { + "header": parsed["header"], + # back and forth so we use pydantic serialization + "metadata": json.loads( + RTParsingMetadata( + extract_ts=extract.ts, + extract_config=extract.config, + ).json() + ), + **copy.deepcopy(record), + } + ) + + "\n" + ).encode("utf-8") + ) + written += 1 outcomes.append( RTFileProcessingOutcome( step=RTProcessingStep.parse, success=True, extract=extract, + aggregation=self.aggregation, header=parsed["header"], ) ) - continue + del parsed - for record in parsed["entity"]: - gzipfile.write( - ( - json.dumps( - { - "header": parsed["header"], - # back and forth so we use pydantic serialization - "metadata": json.loads( - RTParsingMetadata( - extract_ts=extract.ts, - extract_config=extract.config, - ).json() - ), - **copy.deepcopy(record), - } - ) - + "\n" - ).encode("utf-8") - ) - written += 1 - outcomes.append( - RTFileProcessingOutcome( - step=RTProcessingStep.parse, - success=True, - extract=extract, - aggregation=hour, - header=parsed["header"], - ) + if written: + typer.secho( + f"writing {written} lines to {self.aggregation.path}", + ) + fs.put(gzip_fname, self.aggregation.path) + else: + typer.secho( + f"WARNING: no records at all for {self.aggregation.path}", + fg=typer.colors.YELLOW, ) - del parsed - - if written: - log( - f"writing {written} lines to {hour.path}", - pbar=pbar, - ) - put_with_retry(fs, gzip_fname, hour.path) - else: - log( - f"WARNING: no records at all for {hour.path}", - fg=typer.colors.YELLOW, - pbar=pbar, - ) - return outcomes + return outcomes # Originally this whole function was retried, but tmpdir flakiness will throw # exceptions in backoff's context, which ruins things def parse_and_validate( - hour: RTHourlyAggregation, - jar_path: Path, + aggregation: RTHourlyAggregation, verbose: bool = False, - pbar=None, ) -> List[RTFileProcessingOutcome]: with tempfile.TemporaryDirectory() as tmp_dir: with sentry_sdk.push_scope() as scope: - scope.set_tag("config_feed_type", hour.first_extract.config.feed_type) - scope.set_tag("config_name", hour.first_extract.config.name) - scope.set_tag("config_url", hour.first_extract.config.url) - scope.set_context("RT Hourly Aggregation", json.loads(hour.json())) - - fs = get_fs() - dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/" - get_with_retry( - fs, - rpath=[ - extract.path - for extract in hour.local_paths_to_extract(dst_path_rt).values() - ], - lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()), + scope.set_tag( + "config_feed_type", aggregation.first_extract.config.feed_type ) + scope.set_tag("config_name", aggregation.first_extract.config.name) + scope.set_tag("config_url", aggregation.first_extract.config.url) + scope.set_context("RT Hourly Aggregation", json.loads(aggregation.json())) - if hour.step == RTProcessingStep.validate: - if not hour.extracts[0].config.schedule_url_for_validation: - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=NoScheduleDataSpecified(), - ) - for extract in hour.extracts - ] - - try: - first_extract = hour.extracts[0] - extract_day = first_extract.dt - for target_date in reversed( - list(extract_day - extract_day.subtract(days=7)) - ): # Fall back to most recent available schedule within 7 days - try: - schedule_extract = get_schedule_extracts_for_day( - target_date - )[first_extract.config.base64_validation_url] - - scope.set_context( - "Schedule Extract", json.loads(schedule_extract.json()) - ) + if ( + aggregation.step != RTProcessingStep.validate + and aggregation.step != RTProcessingStep.parse + ): + raise RuntimeError("we should not be here") - gtfs_zip = download_gtfs_schedule_zip( - fs, - schedule_extract=schedule_extract, - dst_dir=tmp_dir, - pbar=pbar, - ) + if aggregation.step == RTProcessingStep.validate: + return ValidationProcessor(aggregation, verbose).process(tmp_dir, scope) - break - except (KeyError, FileNotFoundError): - print( - f"no schedule data found for {first_extract.path} and day {target_date}" - ) - else: - raise ScheduleDataNotFound( - f"no recent schedule data found for {first_extract.path}" - ) + if aggregation.step == RTProcessingStep.parse: + return ParseProcessor(aggregation, verbose).process(tmp_dir, scope) - return validate_and_upload( - fs=fs, - jar_path=jar_path, - dst_path_rt=dst_path_rt, - tmp_dir=tmp_dir, - hour=hour, - gtfs_zip=gtfs_zip, - verbose=verbose, - pbar=pbar, - ) - - # these are the only two types of errors we expect; let any others bubble up - except (ScheduleDataNotFound, subprocess.CalledProcessError) as e: - stderr = None - - fingerprint: List[Any] = [ - type(e), - # convert back to url manually, I don't want to mess around with the hourly class - base64.urlsafe_b64decode(hour.base64_url.encode()).decode(), - ] - if isinstance(e, subprocess.CalledProcessError): - fingerprint.append(e.returncode) - stderr = e.stderr.decode("utf-8") - # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above - scope.set_context( - "Process", {"stderr": e.stderr.decode("utf-8")[-2000:]} - ) - - # we could also use a custom exception for this - if "Unexpected end of ZLIB input stream" in stderr: - fingerprint.append("Unexpected end of ZLIB input stream") - - scope.fingerprint = fingerprint - sentry_sdk.capture_exception(e, scope=scope) - - if verbose: - log( - f"{str(e)} thrown for {hour.path}", - fg=typer.colors.RED, - pbar=pbar, - ) - if isinstance(e, subprocess.CalledProcessError): - log( - e.stderr.decode("utf-8"), - fg=typer.colors.YELLOW, - pbar=pbar, - ) - - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=e, - process_stderr=stderr, - ) - for extract in hour.extracts - ] +def make_dict_bq_safe(d: Dict[str, Any]) -> Dict[str, Any]: + return { + make_name_bq_safe(key): make_dict_bq_safe(value) + if isinstance(value, dict) + else value + for key, value in d.items() + } - if hour.step == RTProcessingStep.parse: - return parse_and_upload( - fs=fs, - dst_path_rt=dst_path_rt, - tmp_dir=tmp_dir, - hour=hour, - verbose=verbose, - pbar=pbar, - ) - raise RuntimeError("we should not be here") +def make_pydantic_model_bq_safe(model: BaseModel) -> Dict[str, Any]: + """ + This is ugly but I think it's the best option until https://github.com/pydantic/pydantic/issues/1409 + """ + return make_dict_bq_safe(json.loads(model.json())) @app.command() @@ -741,56 +822,25 @@ def main( feed_type: GTFSFeedType, hour: datetime.datetime, limit: int = 0, - progress: bool = typer.Option( - False, - help="If true, display progress bar; useful for development but not in production.", - ), threads: int = 4, - jar_path: Path = JAR_DEFAULT, verbose: bool = False, base64url: Optional[str] = None, ): - pendulum_hour = pendulum.instance(hour, tz="Etc/UTC") - files: List[GTFSRTFeedExtract] - files_missing_metadata: List[Blob] - files_invalid_metadata: List[Blob] - files, files_missing_metadata, files_invalid_metadata = fetch_all_in_partition( - cls=GTFSRTFeedExtract, - partitions={ - "dt": pendulum_hour.date(), - "hour": pendulum_hour, - }, - table=feed_type, - verbose=True, - ) - - total = len(files) + len(files_missing_metadata) + len(files_invalid_metadata) - if files and len(files) / total < 0.99: - typer.secho(f"missing: {files_missing_metadata}") - typer.secho(f"invalid: {files_invalid_metadata}") + hourly_feed_files = FeedStorage(feed_type).get_hour(hour) + if not hourly_feed_files.valid(): + typer.secho(f"missing: {hourly_feed_files.files_missing_metadata}") + typer.secho(f"invalid: {hourly_feed_files.files_invalid_metadata}") + error_count = hourly_feed_files.total() - len(hourly_feed_files.files) raise RuntimeError( - f"too many files have missing/invalid metadata; {total - len(files)} of {total}" # noqa: E702 + f"too many files have missing/invalid metadata; {error_count} of {hourly_feed_files.total()}" # noqa: E702 ) - - rt_aggs: Dict[Tuple[pendulum.DateTime, str], List[GTFSRTFeedExtract]] = defaultdict( - list + aggregated_feed = hourly_feed_files.get_query(step, feed_type) + aggregations_to_process = ( + aggregated_feed.where_base64url(base64url).set_limit(limit).get_aggregates() ) - for file in files: - rt_aggs[(file.hour, file.base64_url)].append(file) - - aggregations_to_process = [ - RTHourlyAggregation( - step=step, - filename=f"{feed_type}{JSONL_GZIP_EXTENSION}", - first_extract=files[0], - extracts=files, - ) - for (hour, base64url), files in rt_aggs.items() - ] - typer.secho( - f"found {len(files)} {feed_type} files in {len(aggregations_to_process)} aggregations to process", + f"found {len(hourly_feed_files.files)} {feed_type} files in {len(aggregated_feed.get_aggregates())} aggregations to process", fg=typer.colors.MAGENTA, ) @@ -798,18 +848,9 @@ def main( typer.secho( f"url filter applied, only processing {base64url}", fg=typer.colors.YELLOW ) - aggregations_to_process = [ - agg for agg in aggregations_to_process if agg.base64_url == base64url - ] if limit: typer.secho(f"limit of {limit} feeds was set", fg=typer.colors.YELLOW) - aggregations_to_process = list( - sorted(aggregations_to_process, key=lambda feed: feed.path) - )[:limit] - - aggregated_total = sum(len(agg.extracts) for agg in aggregations_to_process) - pbar = tqdm(total=len(aggregations_to_process)) if progress else None outcomes: List[RTFileProcessingOutcome] = [ RTFileProcessingOutcome( @@ -818,7 +859,7 @@ def main( blob_path=blob.path, exception=MissingMetadata(), ) - for blob in files_missing_metadata + for blob in hourly_feed_files.files_missing_metadata ] + [ RTFileProcessingOutcome( step=step.value, @@ -826,7 +867,7 @@ def main( blob_path=blob.path, exception=InvalidMetadata(), ) - for blob in files_invalid_metadata + for blob in hourly_feed_files.files_invalid_metadata ] exceptions = [] @@ -840,34 +881,26 @@ def main( futures: Dict[Future, RTHourlyAggregation] = { pool.submit( parse_and_validate, - hour=hour, - jar_path=jar_path, + aggregation=aggregation, verbose=verbose, - pbar=pbar, - ): hour - for hour in aggregations_to_process + ): aggregation + for aggregation in aggregations_to_process } for future in concurrent.futures.as_completed(futures): - hour = futures[future] - if pbar: - pbar.update(1) + aggregation = futures[future] try: outcomes.extend(future.result()) except KeyboardInterrupt: raise except Exception as e: - log( - f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}", + typer.secho( + f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {aggregation.path}\n{traceback.format_exc()}", err=True, fg=typer.colors.RED, - pbar=pbar, ) sentry_sdk.capture_exception(e) - exceptions.append((e, hour.path, traceback.format_exc())) - - if pbar: - del pbar + exceptions.append((e, aggregation.path, traceback.format_exc())) if aggregations_to_process: result = GTFSRTJobResult( @@ -878,11 +911,25 @@ def main( feed_type=feed_type, outcomes=outcomes, ) - save_job_result(get_fs(), result) + typer.secho( + f"saving {len(result.outcomes)} outcomes to {result.path}", + fg=typer.colors.GREEN, + ) + # TODO: I dislike having to exclude the records here + # I need to figure out the best way to have a single type represent the "metadata" of + # the content as well as the content itself + result.save_content( + fs=get_fs(), + content="\n".join( + (json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes) + ).encode(), + exclude={"outcomes"}, + ) assert ( - len(outcomes) == aggregated_total - ), f"we ended up with {len(outcomes)} outcomes from {aggregated_total}" + len(outcomes) + == aggregated_feed.where_base64url(base64url).set_limit(limit).total() + ), f"we ended up with {len(outcomes)} outcomes from {aggregated_feed.where_base64url(base64url).set_limit(limit).total()}" if exceptions: exc_str = "\n".join(str(tup) for tup in exceptions) diff --git a/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py index 467f2f5c34..09c3e56789 100644 --- a/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/tests/test_gtfs_rt_parser.py @@ -54,11 +54,7 @@ def test_rt_file_processing_outcome_construction() -> None: @pytest.mark.skipif("not config.getoption('--gcs')", reason="requires GCS credentials") def test_vehicle_positions(): - result = runner.invoke( - app, - ["parse", "vehicle_positions", "1999-10-22T18:00:00"], - catch_exceptions=False, - ) + result = runner.invoke(app, ["parse", "vehicle_positions", "1999-10-22T18:00:00"]) assert result.exit_code == 0 assert ( "test-calitp-gtfs-rt-raw-v2/vehicle_positions/dt=1999-10-22/hour=1999-10-22T18:00:00+00:00" @@ -79,7 +75,6 @@ def test_no_vehicle_positions_for_date(): result = runner.invoke( app, ["parse", "vehicle_positions", "2022-09-14T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert "0 vehicle_positions files in 0 aggregations" in result.stdout @@ -92,7 +87,6 @@ def test_no_vehicle_positions_for_url(): result = runner.invoke( app, ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", "nope"], - catch_exceptions=False, ) assert result.exit_code == 0 assert "found 5158 vehicle_positions files in 136 aggregations" in result.stdout @@ -108,7 +102,6 @@ def test_no_records_for_url_vehicle_positions_on_date(): result = runner.invoke( app, ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert "found 5158 vehicle_positions files in 136 aggregations" in result.stdout @@ -121,9 +114,7 @@ def test_no_records_for_url_vehicle_positions_on_date(): def test_trip_updates(): base64url = "aHR0cHM6Ly9hcGkuNTExLm9yZy90cmFuc2l0L3RyaXB1cGRhdGVzP2FnZW5jeT1TQQ==" result = runner.invoke( - app, - ["parse", "trip_updates", "2024-10-22T18:00:00", "--base64url", base64url], - catch_exceptions=False, + app, ["parse", "trip_updates", "2024-10-22T18:00:00", "--base64url", base64url] ) assert result.exit_code == 0 assert ( @@ -144,7 +135,6 @@ def test_service_alerts(): result = runner.invoke( app, ["parse", "service_alerts", "2024-10-22T18:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert ( @@ -165,7 +155,6 @@ def test_validation(): result = runner.invoke( app, ["validate", "trip_updates", "2024-08-28T19:00:00", "--base64url", base64url], - catch_exceptions=False, ) assert result.exit_code == 0 assert ( @@ -173,7 +162,6 @@ def test_validation(): in result.stdout ) assert "3269 trip_updates files in 125 aggregations" in result.stdout - assert "Fetching gtfs schedule data" in result.stdout assert "validating" in result.stdout assert "executing rt_validator" in result.stdout assert "writing 50 lines" in result.stdout @@ -194,7 +182,6 @@ def test_no_recent_schedule_for_vehicle_positions_on_validation(): "--base64url", base64url, ], - catch_exceptions=True, ) assert result.exit_code == 0 assert ( @@ -204,6 +191,7 @@ def test_no_recent_schedule_for_vehicle_positions_on_validation(): assert "5158 vehicle_positions files in 136 aggregations" in result.stdout assert f"url filter applied, only processing {base64url}" in result.stdout assert "no schedule data found" in result.stdout + assert "no recent schedule data found" in result.stdout assert "test-calitp-gtfs-rt-validation" in result.stdout assert "saving 38 outcomes" in result.stdout @@ -220,9 +208,7 @@ def test_no_output_file_for_vehicle_positions_on_validation(): 3, "--verbose", ], - catch_exceptions=True, ) - print(result.stdout) assert result.exit_code == 0 assert ( "test-calitp-gtfs-rt-raw-v2/vehicle_positions/dt=2024-10-17/hour=2024-10-17T00:00:00+00:00" @@ -230,6 +216,7 @@ def test_no_output_file_for_vehicle_positions_on_validation(): ) assert "5487 vehicle_positions files in 139 aggregations" in result.stdout assert "limit of 3 feeds was set" in result.stdout - # "WARNING: no validation output file found" was previously generating the error "[Errno 2] No such file or directory" - assert "WARNING: no validation output file found" in result.stdout - assert "saving 122 outcomes" in result.stdout + assert "validating" in result.stdout + assert "executing rt_validator" in result.stdout + assert "writing 69 lines" in result.stdout + assert "saving 114 outcomes" in result.stdout From 4a6334233ee34fa2d82eadfdc36452386595d638 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Tue, 26 Nov 2024 10:34:09 -0500 Subject: [PATCH 05/13] dag to scrape and save the current ridership URL from the NTD portal (#3545) * dag to scrape the current ridership URL from the NTD portal * fix naming and add some descriptions * reconfigured airflow dag setup for dependencies and special handling * test storing variables in xcoms * cleaned up imports * rebase * remove and reorganize some lingering and unnecessary code and test * linter not working * refactor lambda for flake8 * flake8 config change * flake8 config change again * create function of url finder * add comment for flake8 suppression * accidentally pushed copy file * suppress whitespace after colon error * last pass at configuration changes * suppress whitespace after colon error * remove testing comments, clean up changed files --- .pre-commit-config.yaml | 4 +- airflow/dags/sync_ntd_data_xlsx/METADATA.yml | 1 + ...dership_with_adjustments_and_estimates.yml | 6 ++- .../scrape_ntd_ridership_xlsx_url.py | 42 +++++++++++++++++++ airflow/plugins/operators/scrape_ntd_xlsx.py | 35 ++++++++++++---- 5 files changed, 77 insertions(+), 11 deletions(-) create mode 100644 airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78df7f10af..0f497dc163 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,9 +15,11 @@ repos: rev: 6.0.0 hooks: - id: flake8 - args: ["--ignore=E501,W503"] # line too long and line before binary operator (black is ok with these) + args: ["--ignore=E501,W503,E231"] # line too long and line before binary operator (black is ok with these) and explicitly ignore the whitespace after colon error types: - python + # Suppress SyntaxWarning about invalid escape sequence from calitp-data-infra dependency without modifying source + entry: env PYTHONWARNINGS="ignore::SyntaxWarning" flake8 - repo: https://github.com/psf/black rev: 23.1.0 hooks: diff --git a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml index f42903c3b5..b6de58e9b7 100644 --- a/airflow/dags/sync_ntd_data_xlsx/METADATA.yml +++ b/airflow/dags/sync_ntd_data_xlsx/METADATA.yml @@ -15,5 +15,6 @@ default_args: retry_delay: !timedelta 'minutes: 2' concurrency: 50 #sla: !timedelta 'hours: 2' + provide_context: True wait_for_defaults: timeout: 3600 diff --git a/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml b/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml index 6099b20ca3..042891b0d8 100644 --- a/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml +++ b/airflow/dags/sync_ntd_data_xlsx/ridership_historical/complete_monthly_ridership_with_adjustments_and_estimates.yml @@ -1,5 +1,7 @@ operator: operators.NtdDataProductXLSXOperator product: 'complete_monthly_ridership_with_adjustments_and_estimates' -xlsx_file_url: 'https://www.transit.dot.gov/sites/fta.dot.gov/files/2024-11/September%202024%20Complete%20Monthly%20Ridership%20%28with%20adjustments%20and%20estimates%29_241101.xlsx' -year: 'historical' +xlsx_file_url: 'https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release' # placeholder for scraped url from scrape_ntd_ridership_url task +year: 'historical' # one of: 'historical' (long history), 'mutli-year' (select history), or a specific year (ex: 2022) +dependencies: + - scrape_ntd_ridership_xlsx_url diff --git a/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py new file mode 100644 index 0000000000..95b4d1e486 --- /dev/null +++ b/airflow/dags/sync_ntd_data_xlsx/scrape_ntd_ridership_xlsx_url.py @@ -0,0 +1,42 @@ +# --- +# python_callable: scrape_ntd_ridership_xlsx_url +# provide_context: true +# --- +import logging + +import requests +from bs4 import BeautifulSoup +from pydantic import HttpUrl, parse_obj_as + + +# pushes the scraped URL value to XCom +def push_url_to_xcom(scraped_url, context): + task_instance = context["ti"] + task_instance.xcom_push(key="current_url", value=scraped_url) + + +# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/' +def href_matcher(href): + return ( + href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx") + ) + + +def scrape_ntd_ridership_xlsx_url(**context): + # page to find download URL + url = "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release" + req = requests.get(url) + soup = BeautifulSoup(req.text, "html.parser") + + link = soup.find("a", href=href_matcher) + + # Extract the href if the link is found + file_link = link["href"] if link else None + + updated_url = f"https://www.transit.dot.gov{file_link}" + + validated_url = parse_obj_as(HttpUrl, updated_url) + + logging.info(f"Validated URL: {validated_url}.") + + push_url_to_xcom(scraped_url=validated_url, context=context) diff --git a/airflow/plugins/operators/scrape_ntd_xlsx.py b/airflow/plugins/operators/scrape_ntd_xlsx.py index ead796952b..ecdd2052ba 100644 --- a/airflow/plugins/operators/scrape_ntd_xlsx.py +++ b/airflow/plugins/operators/scrape_ntd_xlsx.py @@ -20,6 +20,16 @@ CLEAN_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"] +# pulls the URL from XCom +def pull_url_from_xcom(context): + task_instance = context["ti"] + pulled_value = task_instance.xcom_pull( + task_ids="scrape_ntd_ridership_xlsx_url", key="current_url" + ) + print(f"Pulled value from XCom: {pulled_value}") + return pulled_value + + class NtdDataProductXLSXExtract(PartitionedGCSArtifact): bucket: ClassVar[str] year: str @@ -41,6 +51,11 @@ class Config: arbitrary_types_allowed = True def fetch_from_ntd_xlsx(self, file_url): + # As of now, the only file that we are downloading is for complete_monthly_ridership_with_adjustments_and_estimates + # and the download link changes every time they update the date, so we have special handling for that here, which is dependent + # another dag task called scrape_ntd_ridership_xlsx_url.py. if we look to download other xlsx files from the DOT portal and they + # also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic + validated_url = parse_obj_as(HttpUrl, file_url) logging.info(f"reading file from url {validated_url}") @@ -84,6 +99,7 @@ def __init__( product: str, xlsx_file_url, year: int, + *args, **kwargs, ): self.year = year @@ -98,15 +114,18 @@ def __init__( filename=f"{self.year}__{self.product}_raw.xlsx", ) - super().__init__(**kwargs) + super().__init__(*args, **kwargs) - def execute(self, **kwargs): - excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx( - self.raw_excel_extract.file_url - ) - logging.info( - f"file url is {self.xlsx_file_url} and file type is {type(self.xlsx_file_url)}" - ) + def execute(self, context, *args, **kwargs): + download_url = self.raw_excel_extract.file_url + + if self.product == "complete_monthly_ridership_with_adjustments_and_estimates": + download_url = pull_url_from_xcom(context=context) + + # see what is returned + logging.info(f"reading ridership url as {download_url}") + + excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url) self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content) From 82c74e836d3cb9cbaa66be6bb001096a7db93bf0 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Tue, 26 Nov 2024 11:43:57 -0500 Subject: [PATCH 06/13] add requirement necessary for recently merged pr 3545 (#3560) --- airflow/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/requirements.txt b/airflow/requirements.txt index 769ccfcaa0..5d99a6f713 100644 --- a/airflow/requirements.txt +++ b/airflow/requirements.txt @@ -7,3 +7,4 @@ sentry-sdk==1.17.0 platformdirs<3,>=2.5 boto3==1.26.87 openpyxl==3.1.5 +beautifulsoup4==4.12.3 From 0dfe3b65e7912a37b0be77ff8f4ad02610085963 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Tue, 26 Nov 2024 12:38:13 -0500 Subject: [PATCH 07/13] Scrape geoportal for shn, add SHN flag on stops table (#3529) * scrape geoportal for shn, add flag on stops table * successful initial scrape of geoportal endpoint * functioning json format and external table creation, need to modify coordinates * shn external tables * add initial source, staging, and docs for state geoportal scrape * add flag for shn to dim_stops_latest * readd templating for buckets and remove hardcode schema * refactor operator to be more agnostic for data inputs * add handling for state_highway_network table * fix comment * remove local variables and comments to allow for a production merge when ready * use stop_id for calculations * Apply suggestions from code review Co-authored-by: Mjumbe Poe * restore source table * remove hardcoded source, remove old comments, add comment decribing buffer distance * remove hardcoded bucket * refactor how we keep final columns, dynamically rename columns * restore _gtfs_key over stop_id in shn geo calculation * checking before refactor * revisions based on Mjumbe's review --------- Co-authored-by: Mjumbe Poe --- .../state_geoportal/state_highway_network.yml | 29 +++ .../dags/scrape_state_geoportal/METADATA.yml | 19 ++ .../state_highway_network.yml | 7 + airflow/plugins/operators/__init__.py | 1 + .../operators/scrape_state_geoportal.py | 217 ++++++++++++++++++ .../gtfs_schedule_latest/dim_stops_latest.sql | 45 +++- .../models/staging/state_geoportal/_src.yml | 9 + .../state_geoportal/_stg_state_geoportal.yml | 4 + ...geoportal__state_highway_network_stops.sql | 14 ++ 9 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml create mode 100644 airflow/dags/scrape_state_geoportal/METADATA.yml create mode 100644 airflow/dags/scrape_state_geoportal/state_highway_network.yml create mode 100644 airflow/plugins/operators/scrape_state_geoportal.py create mode 100644 warehouse/models/staging/state_geoportal/_src.yml create mode 100644 warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml create mode 100644 warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..9dd396fcdf --- /dev/null +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -0,0 +1,29 @@ +operator: operators.ExternalTable +bucket: gs://calitp-state-geoportal-scrape +source_objects: + - "state_highway_network_geodata/*.jsonl.gz" +source_format: NEWLINE_DELIMITED_JSON +use_bq_client: true +hive_options: + mode: CUSTOM + require_partition_filter: false + source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/" +destination_project_dataset_table: "external_state_geoportal.state_highway_network" +prefix_bucket: false +post_hook: | + SELECT * + FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network + LIMIT 1; +schema_fields: + - name: Route + type: INTEGER + - name: County + type: STRING + - name: District + type: INTEGER + - name: RouteType + type: STRING + - name: Direction + type: STRING + - name: wkt_coordinates + type: GEOGRAPHY diff --git a/airflow/dags/scrape_state_geoportal/METADATA.yml b/airflow/dags/scrape_state_geoportal/METADATA.yml new file mode 100644 index 0000000000..95ec8d3742 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/METADATA.yml @@ -0,0 +1,19 @@ +description: "Scrape State Highway Network from State Geoportal" +schedule_interval: "0 4 1 * *" # 4am UTC first day of every month +tags: + - all_gusty_features +default_args: + owner: airflow + depends_on_past: False + catchup: False + start_date: "2024-09-15" + email: + - "hello@calitp.org" + email_on_failure: True + email_on_retry: False + retries: 1 + retry_delay: !timedelta 'minutes: 2' + concurrency: 50 + #sla: !timedelta 'hours: 2' +wait_for_defaults: + timeout: 3600 diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..d5be284196 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -0,0 +1,7 @@ +operator: operators.StateGeoportalAPIOperator + +root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' +service: "CHhighway/SHN_Lines" +layer: "0" +product: 'state_highway_network' +resultRecordCount: 2000 diff --git a/airflow/plugins/operators/__init__.py b/airflow/plugins/operators/__init__.py index 209be114fd..78a7178186 100644 --- a/airflow/plugins/operators/__init__.py +++ b/airflow/plugins/operators/__init__.py @@ -9,3 +9,4 @@ from operators.pod_operator import PodOperator from operators.scrape_ntd_api import NtdDataProductAPIOperator from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator +from operators.scrape_state_geoportal import StateGeoportalAPIOperator diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py new file mode 100644 index 0000000000..a538df5495 --- /dev/null +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -0,0 +1,217 @@ +import gzip +import logging +import os +from typing import ClassVar, List + +import pandas as pd # type: ignore +import pendulum +import requests +from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore +from pydantic import HttpUrl, parse_obj_as + +from airflow.models import BaseOperator # type: ignore + +API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] + + +class StateGeoportalAPIExtract(PartitionedGCSArtifact): + bucket: ClassVar[str] + execution_ts: pendulum.DateTime = pendulum.now() + dt: pendulum.Date = execution_ts.date() + partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + + # The name to be used in the data warehouse to refer to the data + # product. + product: str + + # The root of the ArcGIS services. As of Nov 2024, this should + # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". + root_url: str + + # The name of the service being requested. In the feature service's + # URL, this will be everything between the root and "/FeatureServer". + # Don't include a leading or trailing slash. + service: str + + # The layer to query. This will usually be "0", so that is the + # default. + layer: str = "0" + + # The query filter. By default, all rows will be returned from the + # service. Refer to the ArcGIS documentation for syntax: + # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + where: str = "1=1" + + # A comma-separated list of fields to include in the results. Use + # "*" (the default) to include all fields. + outFields: str = "*" + + # The number of records to request for each API call (the operator + # will request all data from the layer in batches of this size). + resultRecordCount: int + + @property + def table(self) -> str: + return self.product + + @property + def filename(self) -> str: + return self.table + + class Config: + arbitrary_types_allowed = True + + def fetch_from_state_geoportal(self): + """ """ + + logging.info(f"Downloading state geoportal data for {self.product}.") + + try: + # Set up the parameters for the request + url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" + validated_url = parse_obj_as(HttpUrl, url) + + params = { + "where": self.where, + "outFields": self.outFields, + "f": "geojson", + "resultRecordCount": self.resultRecordCount, + } + + all_features = [] # To store all retrieved rows + offset = 0 + + while True: + # Update the resultOffset for each request + params["resultOffset"] = offset + + # Make the request + response = requests.get(validated_url, params=params) + response.raise_for_status() + data = response.json() + + # Break the loop if there are no more features + if "features" not in data or not data["features"]: + break + + # Append the retrieved features + all_features.extend(data["features"]) + + # Increment the offset + offset += params["resultRecordCount"] + + if all_features is None or len(all_features) == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + + pass + else: + logging.info( + f"Downloaded {self.product} data with {len(all_features)} rows!" + ) + + return all_features + + except requests.exceptions.RequestException as e: + logging.info(f"An error occurred: {e}") + + raise + + +# # Function to convert coordinates to WKT format +def to_wkt(geometry_type, coordinates): + if geometry_type == "LineString": + # Format as a LineString + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + # Format as a MultiLineString + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + return None + + +class JSONExtract(StateGeoportalAPIExtract): + bucket = API_BUCKET + + +class StateGeoportalAPIOperator(BaseOperator): + template_fields = ( + "product", + "root_url", + "service", + "layer", + "resultRecordCount", + ) + + def __init__( + self, + product, + root_url, + service, + layer, + resultRecordCount, + **kwargs, + ): + self.product = product + self.root_url = root_url + self.service = service + self.layer = layer + self.resultRecordCount = resultRecordCount + + """An operator that extracts and saves JSON data from the State Geoportal + and saves it as one JSONL file, hive-partitioned by date in Google Cloud + """ + + # Save JSONL files to the bucket + self.extract = JSONExtract( + root_url=self.root_url, + service=self.service, + product=f"{self.product}_geodata", + layer=self.layer, + resultRecordCount=self.resultRecordCount, + filename=f"{self.product}_geodata.jsonl.gz", + ) + + super().__init__(**kwargs) + + def execute(self, **kwargs): + api_content = self.extract.fetch_from_state_geoportal() + + df = pd.json_normalize(api_content) + + if self.product == "state_highway_network": + # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) + + # Create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) + + # Compress the DataFrame content and save it + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 8c964acb31..58b648a57b 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,6 +4,49 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} +), + +stg_state_geoportal__state_highway_network_stops AS ( + SELECT * + FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} +), + + +buffer_geometry_table AS ( + SELECT + -- equal to 100ft, as requested by Uriel + ST_BUFFER(wkt_coordinates, + 30.48) AS buffer_geometry + FROM stg_state_geoportal__state_highway_network_stops +), + +current_stops AS ( + SELECT + pt_geom, + _gtfs_key + FROM dim_stops_latest +), + + +stops_on_shn AS ( + SELECT + current_stops.* + FROM buffer_geometry_table, current_stops + WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) +), + +dim_stops_latest_with_shn_boolean AS ( + +SELECT + dim_stops_latest.*, + IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest +FROM + dim_stops_latest +LEFT JOIN + stops_on_shn +ON + dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key ) -SELECT * FROM dim_stops_latest +SELECT * FROM dim_stops_latest_with_shn_boolean diff --git a/warehouse/models/staging/state_geoportal/_src.yml b/warehouse/models/staging/state_geoportal/_src.yml new file mode 100644 index 0000000000..2e2d8dc2de --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_src.yml @@ -0,0 +1,9 @@ +version: 2 + +sources: + - name: external_state_geoportal + description: Data tables scraped from state geoportal. + database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}" + schema: external_state_geoportal + tables: + - name: state_highway_network diff --git a/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml new file mode 100644 index 0000000000..7e12ae93f5 --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml @@ -0,0 +1,4 @@ +version: 2 + +models: + - name: stg_state_geoportal__state_highway_network_stops diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql new file mode 100644 index 0000000000..a89a1075ee --- /dev/null +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -0,0 +1,14 @@ +WITH external_state_geoportal__state_highway_network AS ( + SELECT * + FROM {{ source('external_state_geoportal', 'state_highway_network') }} +), + +stg_state_geoportal__state_highway_network_stops AS( + + SELECT * + FROM external_state_geoportal__state_highway_network + -- we pull the whole table every month in the pipeline, so this gets only the latest extract + QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1 +) + +SELECT * FROM stg_state_geoportal__state_highway_network_stops From 04d9328abd6945f56e778e687466d98326ddfcd7 Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Fri, 1 Nov 2024 13:37:38 -0700 Subject: [PATCH 08/13] Review NTD Annual Agency Information scrape process. Two new columns added on 2022 and 2023 files: - division_department - state_parent_ntd_id [#3497] --- .../annual_database_agency_information.yml | 83 +++++-------------- warehouse/models/mart/ntd/_mart_ntd.yml | 2 +- .../ntd/dim_annual_ntd_agency_information.sql | 72 ++++++++-------- warehouse/models/staging/ntd/_stg_ntd.yml | 1 + ...td__annual_database_agency_information.sql | 74 +++++++++-------- 5 files changed, 100 insertions(+), 132 deletions(-) diff --git a/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml b/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml index 7aaf092756..e4054fc24c 100644 --- a/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml +++ b/airflow/dags/create_external_tables/ntd_data_products/annual_database_agency_information.yml @@ -16,125 +16,88 @@ hive_options: source_uri_prefix: "annual-database-agency-information/{dt:DATE}/{ts:TIMESTAMP}/{year:INTEGER}/" schema_fields: - name: number_of_state_counties - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: tam_tier type: STRING - mode: NULLABLE - name: personal_vehicles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: density type: FLOAT - mode: NULLABLE - name: uza_name type: STRING - mode: NULLABLE - name: tribal_area_name type: STRING - mode: NULLABLE - name: service_area_sq_miles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: total_voms - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: city type: STRING - mode: NULLABLE - name: fta_recipient_id - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: region - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: state_admin_funds_expended - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: zip_code_ext - type: FLOAT - mode: NULLABLE + type: STRING - name: zip_code - type: FLOAT - mode: NULLABLE + type: STRING - name: ueid type: STRING - mode: NULLABLE - name: address_line_2 type: STRING - mode: NULLABLE - name: number_of_counties_with_service - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: reporter_acronym type: STRING - mode: NULLABLE - name: original_due_date - type: INTEGER - mode: NULLABLE + type: STRING - name: sq_miles - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: address_line_1 type: STRING - mode: NULLABLE - name: p_o__box type: STRING - mode: NULLABLE - name: fy_end_date - type: INTEGER - mode: NULLABLE + type: STRING - name: reported_by_ntd_id type: STRING - mode: NULLABLE - name: population - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: reporting_module type: STRING - mode: NULLABLE - name: service_area_pop - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: subrecipient_type type: STRING - mode: NULLABLE - name: state type: STRING - mode: NULLABLE - name: volunteer_drivers - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: primary_uza - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: doing_business_as type: STRING - mode: NULLABLE - name: reporter_type type: STRING - mode: NULLABLE - name: legacy_ntd_id type: STRING - mode: NULLABLE - name: voms_do - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: url type: STRING - mode: NULLABLE - name: reported_by_name type: STRING - mode: NULLABLE - name: voms_pt - type: FLOAT - mode: NULLABLE + type: NUMERIC - name: organization_type type: STRING - mode: NULLABLE - name: agency_name type: STRING - mode: NULLABLE - name: ntd_id type: STRING - mode: NULLABLE + - name: division_department + type: STRING + - name: state_parent_ntd_id + type: STRING diff --git a/warehouse/models/mart/ntd/_mart_ntd.yml b/warehouse/models/mart/ntd/_mart_ntd.yml index 9d5040e991..b4feb9e8fa 100644 --- a/warehouse/models/mart/ntd/_mart_ntd.yml +++ b/warehouse/models/mart/ntd/_mart_ntd.yml @@ -118,7 +118,7 @@ models: - dbt_utils.mutually_exclusive_ranges: lower_bound_column: _valid_from upper_bound_column: _valid_to - partition_by: CONCAT(year, '_', ntd_id) + partition_by: CONCAT(year, '_', ntd_id, '_', COALESCE(state_parent_ntd_id, '')) gaps: required columns: - name: key diff --git a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql b/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql index 250667c5d4..66ca2eca2b 100644 --- a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql +++ b/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql @@ -2,55 +2,57 @@ WITH stg_ntd__annual_database_agency_information AS ( SELECT *, -- TODO: this does not handle deletes - LEAD(ts) OVER (PARTITION BY year, ntd_id ORDER BY ts ASC) AS next_ts, + LEAD(ts) OVER (PARTITION BY year, ntd_id, state_parent_ntd_id ORDER BY ts ASC) AS next_ts, FROM {{ ref('stg_ntd__annual_database_agency_information') }} ), dim_annual_ntd_agency_information AS ( SELECT - {{ dbt_utils.generate_surrogate_key(['year', 'ntd_id', 'ts']) }} as key, + {{ dbt_utils.generate_surrogate_key(['year', 'ntd_id', 'state_parent_ntd_id', 'ts']) }} AS key, year, ntd_id, - number_of_state_counties, - tam_tier, - personal_vehicles, - density, - uza_name, - tribal_area_name, - service_area_sq_miles, - total_voms, - city, - fta_recipient_id, - region, - state_admin_funds_expended, - zip_code_ext, - zip_code, - ueid, - address_line_2, - number_of_counties_with_service, + state_parent_ntd_id, + agency_name, reporter_acronym, - original_due_date, - sq_miles, - address_line_1, - p_o__box, - fy_end_date, + doing_business_as, + division_department, + legacy_ntd_id, reported_by_ntd_id, - population, + reported_by_name, + reporter_type, reporting_module, - service_area_pop, + organization_type, subrecipient_type, + fy_end_date, + original_due_date, + address_line_1, + address_line_2, + p_o__box, + city, state, - volunteer_drivers, - primary_uza, - doing_business_as, - reporter_type, - legacy_ntd_id, - voms_do, + zip_code, + zip_code_ext, + region, url, - reported_by_name, + fta_recipient_id, + ueid, + service_area_sq_miles, + service_area_pop, + primary_uza_code, + primary_uza_name, + tribal_area_name, + population, + density, + sq_miles, + voms_do, voms_pt, - organization_type, - agency_name, + total_voms, + volunteer_drivers, + personal_vehicles, + tam_tier, + number_of_state_counties, + number_of_counties_with_service, + state_admin_funds_expended, ts AS _valid_from, {{ make_end_of_valid_range('COALESCE(next_ts, CAST("2099-01-01" AS TIMESTAMP))') }} AS _valid_to, next_ts IS NULL AS _is_current, diff --git a/warehouse/models/staging/ntd/_stg_ntd.yml b/warehouse/models/staging/ntd/_stg_ntd.yml index 87ff1d007e..ab9b0b5413 100644 --- a/warehouse/models/staging/ntd/_stg_ntd.yml +++ b/warehouse/models/staging/ntd/_stg_ntd.yml @@ -8,6 +8,7 @@ models: - ts - year - ntd_id + - state_parent_ntd_id columns: - name: ntd_id tests: diff --git a/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql b/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql index 61c3eed407..0112e9bee2 100644 --- a/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql +++ b/warehouse/models/staging/ntd/stg_ntd__annual_database_agency_information.sql @@ -4,50 +4,52 @@ WITH source AS ( stg_ntd__annual_database_agency_information AS ( SELECT - number_of_state_counties, - tam_tier, - personal_vehicles, - density, - uza_name, - tribal_area_name, - service_area_sq_miles, - total_voms, - city, - fta_recipient_id, - region, - state_admin_funds_expended, - zip_code_ext, - zip_code, - ueid, - address_line_2, - number_of_counties_with_service, + year, + ntd_id, + state_parent_ntd_id, + agency_name, reporter_acronym, - original_due_date, - sq_miles, - address_line_1, - p_o__box, - fy_end_date, + doing_business_as, + division_department, + legacy_ntd_id, reported_by_ntd_id, - population, + reported_by_name, + reporter_type, reporting_module, - service_area_pop, + organization_type, subrecipient_type, + fy_end_date, + original_due_date, + address_line_1, + address_line_2, + p_o__box, + city, state, - volunteer_drivers, - primary_uza, - doing_business_as, - reporter_type, - legacy_ntd_id, - voms_do, + zip_code, + zip_code_ext, + region, url, - reported_by_name, + fta_recipient_id, + ueid, + service_area_sq_miles, + service_area_pop, + primary_uza AS primary_uza_code, + uza_name AS primary_uza_name, + tribal_area_name, + population, + density, + sq_miles, + voms_do, voms_pt, - organization_type, - agency_name, - ntd_id, + total_voms, + volunteer_drivers, + personal_vehicles, + tam_tier, + number_of_state_counties, + number_of_counties_with_service, + state_admin_funds_expended, dt, - ts, - year, + ts FROM source ) From de7fcec096ae134c20cb872807131f500de40f46 Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Mon, 25 Nov 2024 12:14:37 -0800 Subject: [PATCH 09/13] Add documentation to NTD Annual Agency Information [#3497] --- warehouse/models/docs/_docs_ntd.md | 5 + warehouse/models/mart/ntd/_mart_ntd.yml | 132 ++++++++++++++++++++-- warehouse/models/staging/ntd/_src.yml | 6 + warehouse/models/staging/ntd/_stg_ntd.yml | 6 + 4 files changed, 140 insertions(+), 9 deletions(-) diff --git a/warehouse/models/docs/_docs_ntd.md b/warehouse/models/docs/_docs_ntd.md index 4ca3df7fc6..6ce0a129e6 100644 --- a/warehouse/models/docs/_docs_ntd.md +++ b/warehouse/models/docs/_docs_ntd.md @@ -2,6 +2,10 @@ Docs for NTD models; {% docs ntd_id %} A five-digit identifying number for each agency used in the current NTD system. +FTA assigns each reporter a unique five-digit NTD Identification Number. +The first digit of the NTD ID corresponds to the FTA Region where the reporter is located (e.g., 9#### indicates Region IX). +The code will have a four-to-five digit prefix for any entity submitting the report on behalf of the reporter. +For example, State Departments of Transportation (usually indicated as #R##) submit on behalf of their subrecipients. {% enddocs %} {% docs ntd_legacy_id %} @@ -40,6 +44,7 @@ The state in which the agency is headquartered. {% enddocs %} {% docs ntd_primary_uza_code %} +The primary urbanized area served by the transit agency. UACE Code remains consistent across census years. {% enddocs %} diff --git a/warehouse/models/mart/ntd/_mart_ntd.yml b/warehouse/models/mart/ntd/_mart_ntd.yml index b4feb9e8fa..2ad4804c94 100644 --- a/warehouse/models/mart/ntd/_mart_ntd.yml +++ b/warehouse/models/mart/ntd/_mart_ntd.yml @@ -104,16 +104,17 @@ x-common-fields: models: - name: dim_annual_ntd_agency_information description: > - Versioned extracts of the NTD Annual Database Agency Information. + Contains basic contact and agency information for each NTD reporter. - The versioning is bitemporal, so records are versioned at the year + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. - and ntd_id level. This means you must join based on - _valid_from/_valid_from + The versioning is bitemporal, so records are versioned at the year, ntd_id, and state_parent_ntd_id level. + This means you must join based on _valid_from/_valid_from to get the records for a given ntd_id and state_parent_ntd_id, + and then choose which year to look up. - to get the records for a given ntd_id, and then choose which year to - - look up. + Use _is_current to find the latest version for each set of year, ntd_id, and state_parent_ntd_id. tests: - dbt_utils.mutually_exclusive_ranges: lower_bound_column: _valid_from @@ -125,10 +126,11 @@ models: tests: - not_null - unique - - name: year + - <<: *report_year + name: year tests: - not_null - - name: ntd_id + - <<: *ntd_id tests: - not_null - name: _valid_from @@ -138,8 +140,120 @@ models: tests: - not_null - name: _is_current + description: Indicates the latest report version for each year, ntd_id, and state_parent_ntd_id. tests: - not_null + - name: state_parent_ntd_id + description: | + Indicates the ID number of the transit agency reporting to the database on behalf of the transit agency. + - name: agency_name + description: | + The agency name is the full legal name of the agency. + If reporting is required under an FTA grant program, this must reflect the legal name of the funding recipient. + - name: doing_business_as + description: The name under which the reporting agency is doing business. + - name: address_line_1 + description: First line of the agency's mailing address. + - name: address_line_2 + description: Second line of the agency's mailing address (if applicable). + - name: p_o__box + description: The PO Box of the agency (if applicable). + - name: city + description: City of the agency's mailing address. + - name: state + description: State of the agency's mailing address. + - name: zip_code + description: Zip Code of the agency's mailing address. + - name: zip_code_ext + description: Zip Code Extension of the agency's mailing address. + - name: region + description: The FTA region in which the reporter is located. + - name: density + description: The population density of the Primary UZA of the agency, if one exists. + - name: ueid + description: | + The UEID is a number or other identifier used to identify a specific commercial, nonprofit, or Government entity. + This is now reported in place of DUNS number for each unique transit agency reporting to the NTD. + See the U.S. General Services Administration UEID web page for more information. + - name: fta_recipient_id + description: | + The four-digit number assigned to a transit agency for the Federal Transit Administration (FTA) electronic grant making system — TrAMS (Transportation Award Management System). + - name: original_due_date + description: The date on which the 2020 NTD Report was due to FTA. + - name: fy_end_date + description: Calendar selection for the last day of an agency's fiscal year. + - name: number_of_counties_with_service + description: | + States report the total number of counties in the state that are currently served, in whole or in part, by Formula Grants for Rural Areas (§5311)-funded operators. + - name: number_of_state_counties + description: The number of Counties in given State (for State Departments of Transportation). + - *organization_type + - name: personal_vehicles + description: | + Vehicles that are used by the transit provider to transport passengers in revenue service but are owned by private individuals, typically an employee of the agency or a volunteer driver. + - name: population + description: The population of the Primary UZA of the agency, if one exists. + - *primary_uza_code + - *primary_uza_name + - name: reported_by_name + description: The NTD ID of the entity reporting on behalf of another entity. + - name: reported_by_ntd_id + description: | + The entity, usually a State, submitting an NTD report on behalf of another entity, usually a subrecipient of the State. + - name: reporter_acronym + description: The acronym used by the reporting agency. + - name: reporter_type + description: | + Reporter Type will be based on where they operate and the reporting requirements associated with their agency. + Agencies that receive Chapter 53 funds and own, operate, or manage capital assets in public transportation are also required to file an annual report, even if they do not receive §5307 or §5311 funds. + Agencies that do not receive or benefit from FTA funding may elect to submit their data to the NTD as Voluntary Reporter but are still assigned a reporter type. + Current types are: + `Building Reporter`, + `Full Reporter`, + `Group Plan Sponsor`, + `Planning Reporter`, + `Reduced Asset Reporter`, + `Reduced Reporter`, + `Rural Reporter`, + `Separate Service`, + `State Reporter`. + - name: reporting_module + description: | + A general classification that will determine which, if any, FTA formula programs will use the NTD data. + For example, Tribes and Native Villages will have data included in the in the §5311j Tribal Transit Program. + Reporters receiving Chapter 53 funds but not receiving or benefiting from §5307 and §5311 + AND not electing to report service data are classified as Asset due to the requirement to report asset inventory data. + These agencies are not presently included in formula program datasets. + - name: service_area_pop + description: | + A measure of access to transit service in terms of population served and area coverage (square miles). + The reporting transit agency determines the service area boundaries and population for most transit services using the definitions contained in the Americans with Disabilities Act of 1990 (ADA), + i.e. a corridor surrounding the routes 3/4 of a mile on either side, or for rail, a series of circles of radius 3/4 mile centered on each station Transit agency reporters are required to submit service area information. + - name: sq_miles + description: The square miles of the Primary UZA of the agency, if one exists. + - *service_area_sq_miles + - name: state_admin_funds_expended + description: | + States report the §5311 revenues they expended as a result of administering the program. + Since the §5311 program operates on a reimbursement basis, revenues expended during the report year will be expended during the same year. + Report the operating revenue expended during the report year from FTA §5311 Formula Grants for Rural Areas funds. + - name: subrecipient_type + description: Reflects the type of Rural Formula Grant funding received by the subrecipient. + - name: tam_tier + description: | + Defines whether the agency is a Tier I agency required to produce their own Transit Asset Management plan (and, in parenparens, on what basis) + or a Tier II operator eligible to be in a group TAM Plan. + N/A indicates that the requirement does not apply. + - name: total_voms + description: | + The Vehicles Operated in Maximum Service ("peak service level") across the entire fiscal year for the given agency. + - name: tribal_area_name + description: The tribal land, determined by US Census data, on which tribes operate. + - name: url + description: Agency's transit website. + - name: volunteer_drivers + description: | + Individuals who drive vehicles in revenue service to transport passengers for the transit provider but are not employees of the transit provider and are not compensated for their labor. - name: dim_annual_funding_sources description: >- diff --git a/warehouse/models/staging/ntd/_src.yml b/warehouse/models/staging/ntd/_src.yml index 099e5fa62f..24737376d7 100644 --- a/warehouse/models/staging/ntd/_src.yml +++ b/warehouse/models/staging/ntd/_src.yml @@ -7,3 +7,9 @@ sources: schema: external_ntd_data_products tables: - name: annual_database_agency_information + description: | + Contains basic contact and agency information for each NTD reporter. + + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. diff --git a/warehouse/models/staging/ntd/_stg_ntd.yml b/warehouse/models/staging/ntd/_stg_ntd.yml index ab9b0b5413..61c9194dfa 100644 --- a/warehouse/models/staging/ntd/_stg_ntd.yml +++ b/warehouse/models/staging/ntd/_stg_ntd.yml @@ -2,6 +2,12 @@ version: 2 models: - name: stg_ntd__annual_database_agency_information + description: | + Contains basic contact and agency information for each NTD reporter. + + The dataset can be found at: + https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information + * For other years, just replace 2023 by the desired year. tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: From bed2f488a468eb15ea03d8bffee81a08923e172b Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Mon, 25 Nov 2024 16:59:17 -0800 Subject: [PATCH 10/13] Rename NTD mart table dim_annual_ntd_agency_information to dim_annual_agency_information [#3497] --- docs/warehouse/warehouse_starter_kit.md | 2 +- warehouse/models/docs/_docs_transit_database.md | 2 +- warehouse/models/mart/ntd/_mart_ntd.yml | 2 +- ...ency_information.sql => dim_annual_agency_information.sql} | 4 ++-- .../mart/transit_database/dim_mobility_mart_providers.sql | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) rename warehouse/models/mart/ntd/{dim_annual_ntd_agency_information.sql => dim_annual_agency_information.sql} (95%) diff --git a/docs/warehouse/warehouse_starter_kit.md b/docs/warehouse/warehouse_starter_kit.md index 893b089c77..6ac226eb0f 100644 --- a/docs/warehouse/warehouse_starter_kit.md +++ b/docs/warehouse/warehouse_starter_kit.md @@ -65,7 +65,7 @@ For a given day: ### Other -- [dim_annual_ntd_agency_information](https://dbt-docs.calitp.org/#!/model/model.calitp_warehouse.dim_annual_database_agency_information) +- [dim_annual_agency_information](https://dbt-docs.calitp.org/#!/model/model.calitp_warehouse.dim_annual_database_agency_information) - View some of the data produced by the [US Department of Transportation](https://www.transit.dot.gov/ntd) for the National Transit Database. - Information from 2018-2021 are available. diff --git a/warehouse/models/docs/_docs_transit_database.md b/warehouse/models/docs/_docs_transit_database.md index 2fa85cefcb..34cb0e36d3 100644 --- a/warehouse/models/docs/_docs_transit_database.md +++ b/warehouse/models/docs/_docs_transit_database.md @@ -200,7 +200,7 @@ are implemented for future schema consistency, but historical data has not yet b {% docs ntd_agency_info_table %} -DEPRECATED: Please use mart_ntd.dim_annual_ntd_agency_information going forward. +DEPRECATED: Please use mart_ntd.dim_annual_agency_information going forward. 2018 NTD Agency Info Table Imported 10/6/2021 from fta.gov diff --git a/warehouse/models/mart/ntd/_mart_ntd.yml b/warehouse/models/mart/ntd/_mart_ntd.yml index 2ad4804c94..9b7c360f99 100644 --- a/warehouse/models/mart/ntd/_mart_ntd.yml +++ b/warehouse/models/mart/ntd/_mart_ntd.yml @@ -102,7 +102,7 @@ x-common-fields: description: '{{ doc("ntd_xlsx_execution_ts") }}' models: - - name: dim_annual_ntd_agency_information + - name: dim_annual_agency_information description: > Contains basic contact and agency information for each NTD reporter. diff --git a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql b/warehouse/models/mart/ntd/dim_annual_agency_information.sql similarity index 95% rename from warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql rename to warehouse/models/mart/ntd/dim_annual_agency_information.sql index 66ca2eca2b..772aa18532 100644 --- a/warehouse/models/mart/ntd/dim_annual_ntd_agency_information.sql +++ b/warehouse/models/mart/ntd/dim_annual_agency_information.sql @@ -6,7 +6,7 @@ WITH stg_ntd__annual_database_agency_information AS ( FROM {{ ref('stg_ntd__annual_database_agency_information') }} ), -dim_annual_ntd_agency_information AS ( +dim_annual_agency_information AS ( SELECT {{ dbt_utils.generate_surrogate_key(['year', 'ntd_id', 'state_parent_ntd_id', 'ts']) }} AS key, year, @@ -59,4 +59,4 @@ dim_annual_ntd_agency_information AS ( FROM stg_ntd__annual_database_agency_information ) -SELECT * FROM dim_annual_ntd_agency_information +SELECT * FROM dim_annual_agency_information diff --git a/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql b/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql index 5c5b7e8947..600331d1d1 100644 --- a/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql +++ b/warehouse/models/mart/transit_database/dim_mobility_mart_providers.sql @@ -77,7 +77,7 @@ funding_by_org AS ( -- We cannot use `_is_current` here because every year is marked as "current" -- since it's the "current" record for the respective year. annual_ntd AS ( - SELECT * FROM {{ ref('dim_annual_ntd_agency_information') }} + SELECT * FROM {{ ref('dim_annual_agency_information') }} WHERE state = "CA" -- We only want data from the latest data from NTD. In the rare edge case From 81fe56d99abc28489697971daf5ecc1240736796 Mon Sep 17 00:00:00 2001 From: Erika Pacheco Date: Mon, 25 Nov 2024 17:51:32 -0800 Subject: [PATCH 11/13] Bump upload-artifact version to v3 in order to fix deprecated error on build-docs. `This request has been automatically failed because it uses a deprecated version of `actions/upload-artifact: v2`. Learn more: https://github.blog/changelog/2024-02-13-deprecation-notice-v1-and-v2-of-the-artifact-actions/` --- .github/workflows/publish-docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index 09ae0976b6..323fc12e43 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -29,7 +29,7 @@ jobs: - name: Build jupyter book run: jb build docs --warningiserror --keep-going # set doc to fail on any sphinx warning - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v3 if: always() with: name: docs-build From adb083f096b488cd7eccd91d92fe52b5848ddf66 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Tue, 26 Nov 2024 14:48:36 -0500 Subject: [PATCH 12/13] revise name of misleading state highway network flag on dim_stops_latest (#3564) --- warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 58b648a57b..c39c5d803d 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -40,7 +40,7 @@ dim_stops_latest_with_shn_boolean AS ( SELECT dim_stops_latest.*, - IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest + IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS on_state_highway_network FROM dim_stops_latest LEFT JOIN From a069890a0a707a3c8ce14b99692712f02c6aae0c Mon Sep 17 00:00:00 2001 From: Doc Ritezel Date: Tue, 26 Nov 2024 12:15:13 -0800 Subject: [PATCH 13/13] Provide a better message when GTFS-RT validator jar skips a file Signed-off-by: Erika Pacheco --- jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py | 54 ++++++++++++------------ jobs/gtfs-rt-parser-v2/poetry.lock | 53 ++++++++++++++++++++++- jobs/gtfs-rt-parser-v2/pyproject.toml | 3 +- 3 files changed, 80 insertions(+), 30 deletions(-) diff --git a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py index 62597461df..e4b792540e 100644 --- a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py @@ -61,6 +61,18 @@ class InvalidMetadata(Exception): pass +class NoScheduleDataSpecified(Exception): + pass + + +class ScheduleDataNotFound(Exception): + pass + + +class NoValidatorResults(Exception): + pass + + class RTProcessingStep(str, Enum): parse = "parse" validate = "validate" @@ -77,14 +89,6 @@ class RTValidationMetadata(BaseModel): gtfs_validator_version: str -class NoScheduleDataSpecified(Exception): - pass - - -class ScheduleDataNotFound(Exception): - pass - - class RTHourlyAggregation(PartitionedGCSArtifact): partition_names: ClassVar[List[str]] = ["dt", "hour", "base64_url"] step: RTProcessingStep @@ -277,7 +281,7 @@ def download(self, date: datetime.datetime) -> Optional[str]: .get_url_schedule(self.base64_validation_url) ) except KeyError: - print( + typer.secho( f"no schedule data found for {self.base64_validation_url} on day {day}" ) continue @@ -287,7 +291,7 @@ def download(self, date: datetime.datetime) -> Optional[str]: self.fs.get(schedule_extract.path, gtfs_zip) return gtfs_zip except FileNotFoundError: - print( + typer.secho( f"no schedule file found for {self.base64_validation_url} on day {day}" ) continue @@ -346,17 +350,17 @@ def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]: def get_results_paths(self) -> Dict[str, GTFSRTFeedExtract]: return {e.get_results_path(): e.extract for e in self.get_extracts()} - def get_hashed_results(self): + def get_hashed_results(self) -> Dict[str, Any]: hashed = {} for e in self.get_extracts(): if e.has_results(): - hashed[e.hash()] = e.get_results() + hashed[e.hash().hex()] = e.get_results() return hashed - def get_hashes(self) -> Dict[bytes, List[GTFSRTFeedExtract]]: - hashed: Dict[bytes, List[GTFSRTFeedExtract]] = defaultdict(list) + def get_hashes(self) -> Dict[str, List[GTFSRTFeedExtract]]: + hashed: Dict[str, List[GTFSRTFeedExtract]] = defaultdict(list) for e in self.get_extracts(): - hashed[e.hash()].append(e.extract) + hashed[e.hash().hex()].append(e.extract) return hashed def download(self): @@ -507,7 +511,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: e = ScheduleDataNotFound( f"no recent schedule data found for {self.aggregation.extracts[0].path}" ) - print(e) + typer.secho(e) scope.fingerprint = [ type(e), @@ -571,11 +575,11 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: for hash, extracts in aggregation_extracts.get_hashes().items(): try: records = hashed_results[hash] - except KeyError as e: + except KeyError: if self.verbose: paths = ", ".join(e.path for e in extracts) typer.secho( - f"WARNING: no results found for {paths}", + f"WARNING: validator did not produce results for {paths}", fg=typer.colors.YELLOW, ) @@ -584,7 +588,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: RTFileProcessingOutcome( step=self.aggregation.step, success=False, - exception=e, + exception=NoValidatorResults("No validator output"), extract=extract, ) ) @@ -680,7 +684,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]: except DecodeError as e: if self.verbose: typer.secho( - f"WARNING: DecodeError for {str(extract.path)}", + f'DecodeError: "{str(e)}" thrown when decoding {str(extract.path)}', fg=typer.colors.YELLOW, ) outcomes.append( @@ -918,13 +922,9 @@ def main( # TODO: I dislike having to exclude the records here # I need to figure out the best way to have a single type represent the "metadata" of # the content as well as the content itself - result.save_content( - fs=get_fs(), - content="\n".join( - (json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes) - ).encode(), - exclude={"outcomes"}, - ) + raw = [json.dumps(make_pydantic_model_bq_safe(o)) for o in result.outcomes] + content = "\n".join(raw).encode("utf-8") + result.save_content(fs=get_fs(), content=content, exclude={"outcomes"}) assert ( len(outcomes) diff --git a/jobs/gtfs-rt-parser-v2/poetry.lock b/jobs/gtfs-rt-parser-v2/poetry.lock index 2d2c695f4d..fb12c1b018 100644 --- a/jobs/gtfs-rt-parser-v2/poetry.lock +++ b/jobs/gtfs-rt-parser-v2/poetry.lock @@ -507,6 +507,22 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "flake8" +version = "7.1.1" +description = "the modular source code checker: pep8 pyflakes and co" +optional = false +python-versions = ">=3.8.1" +files = [ + {file = "flake8-7.1.1-py2.py3-none-any.whl", hash = "sha256:597477df7860daa5aa0fdd84bf5208a043ab96b8e96ab708770ae0364dd03213"}, + {file = "flake8-7.1.1.tar.gz", hash = "sha256:049d058491e228e03e67b390f311bbf88fce2dbaa8fa673e7aea87b7198b8d38"}, +] + +[package.dependencies] +mccabe = ">=0.7.0,<0.8.0" +pycodestyle = ">=2.12.0,<2.13.0" +pyflakes = ">=3.2.0,<3.3.0" + [[package]] name = "fonttools" version = "4.54.1" @@ -1357,6 +1373,17 @@ pillow = ">=6.2.0" pyparsing = ">=2.3.1" python-dateutil = ">=2.7" +[[package]] +name = "mccabe" +version = "0.7.0" +description = "McCabe checker, plugin for flake8" +optional = false +python-versions = ">=3.6" +files = [ + {file = "mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"}, + {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, +] + [[package]] name = "memory-profiler" version = "0.60.0" @@ -2007,6 +2034,17 @@ files = [ [package.dependencies] pyasn1 = ">=0.4.6,<0.7.0" +[[package]] +name = "pycodestyle" +version = "2.12.1" +description = "Python style guide checker" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3"}, + {file = "pycodestyle-2.12.1.tar.gz", hash = "sha256:6838eae08bbce4f6accd5d5572075c63626a15ee3e6f842df996bf62f6d73521"}, +] + [[package]] name = "pydantic" version = "1.9.2" @@ -2058,6 +2096,17 @@ typing-extensions = ">=3.7.4.3" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] +[[package]] +name = "pyflakes" +version = "3.2.0" +description = "passive checker of Python programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pyflakes-3.2.0-py2.py3-none-any.whl", hash = "sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"}, + {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, +] + [[package]] name = "pyparsing" version = "3.1.4" @@ -2583,5 +2632,5 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" -python-versions = ">=3.8,<3.10" -content-hash = "2ff33638394c0014c2e5df03ad30b6e2e57ec5f048c5087b75a2546e0e0bd9fa" +python-versions = ">=3.8.1,<3.10" +content-hash = "51e6481ee50e162cc336f8581791ba5a2864a56bf00d722091837931f1a75f0f" diff --git a/jobs/gtfs-rt-parser-v2/pyproject.toml b/jobs/gtfs-rt-parser-v2/pyproject.toml index aca94cbc89..7c94bdd1f5 100644 --- a/jobs/gtfs-rt-parser-v2/pyproject.toml +++ b/jobs/gtfs-rt-parser-v2/pyproject.toml @@ -5,7 +5,7 @@ description = "" authors = ["Andrew Vaccaro "] [tool.poetry.dependencies] -python = ">=3.8,<3.10" +python = ">=3.8.1,<3.10" gtfs-realtime-bindings = "0.0.7" google-auth = "1.32.1" pathy = {extras = ["gcs"], version = "^0.6.1"} @@ -26,6 +26,7 @@ types-protobuf = "^5.28.0.20240924" types-tqdm = "^4.66.0.20240417" isort = "^5.13.2" pytest-env = "^1.1.5" +flake8 = "^7.1.1" [build-system] requires = ["poetry-core>=1.0.0"]