From d868c2bd43fb0bd006099f83990e27601f445c91 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 14 Oct 2025 00:10:19 +0530 Subject: [PATCH 1/5] add skip and fail safes to loader --- warehouse/metrics_tools/local/loader.py | 35 ++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/warehouse/metrics_tools/local/loader.py b/warehouse/metrics_tools/local/loader.py index cd19cff9e7..3ba9f30140 100644 --- a/warehouse/metrics_tools/local/loader.py +++ b/warehouse/metrics_tools/local/loader.py @@ -25,6 +25,8 @@ from pyiceberg.typedef import Identifier from sqlglot import exp from sqlmesh.core.dialect import parse_one +from google.api_core.exceptions import DeadlineExceeded + logger = logging.getLogger(__name__) @@ -168,10 +170,16 @@ def bq_try_read_with_options( increment = timedelta(days=1) # Exponential increments for reading from bigquery, in case the initial # restriction is too small - while result is None: - result = bq_read_with_options(start, end, source_table, dest, project_id) - start = start - increment - increment = increment * 2 + # while result is None: + # result = bq_read_with_options(start, end, source_table, dest, project_id) + # start = start - increment + # increment = increment * 2 + + result = bq_read_with_options(start, end, source_table, dest, project_id) + if result is None: + logger.warning(f"No results for {source_table} in {start} → {end}. Skipping.") + return None + return result.slice( 0, @@ -247,7 +255,16 @@ def load_from_bq( config = self._config # Load the schema from bigqouery - table_schema = self._bqclient.get_table(source_table).schema + # table_schema = self._bqclient.get_table(source_table).schema + # Load the schema from BigQuery + try: + table_schema = self._bqclient.get_table(source_table).schema + except Exception as e: + # Gracefully skip if table or project doesn't exist + if "Not found" in str(e) or "404" in str(e) or "Project" in str(e): + logger.warning(f"Skipping {source_table.path} — table not found in BigQuery") + return + raise if self.destination_table_exists(rewritten_destination): if self.has_schema_changed(rewritten_destination, table_schema): @@ -298,7 +315,13 @@ def load_from_bq( table_as_arrow = rows.to_arrow( create_bqstorage_client=True ) # noqa: F841 - + # ✅ GUARD: skip committing if no rows were read + if table_as_arrow is None: + logger.warning( + f"Skipping {source_name} -> {destination.table}: " + "no rows for the selected window/restrictions." + ) + return # Load the table self.commit_table( source_name, From b7003dc3eac60537566681d14f171d76086a4cee Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 14 Oct 2025 00:13:12 +0530 Subject: [PATCH 2/5] add pnl mart model and add quickstart file to gitignore --- .gitignore | 1 + .../project_pnl_quarterly_from_metrics_v1.sql | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql diff --git a/.gitignore b/.gitignore index 5cee0d88c3..0792de5113 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ logs/ dev-tests/ coverage.json dev-tests/ +warehouse/oso_sqlmesh/models/intermediate/int_my_first_model.sql # typescript *.tsbuildinfo diff --git a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql new file mode 100644 index 0000000000..ab2ca3d31a --- /dev/null +++ b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql @@ -0,0 +1,53 @@ +MODEL ( + name oso.project_pnl_quarterly_from_metrics_v1, + description 'Quarterly P&L per project using metrics layer: income=funding_received, expense=funding_awarded.', + dialect trino, + kind FULL, + partitioned_by year(quarter_start), + tags ( + 'model_stage=mart', + 'entity_category=project' + ), + audits (HAS_AT_LEAST_N_ROWS(threshold := 0)) +); + + +WITH src AS ( + SELECT + t.sample_date AS sample_date, + CAST(t.project_id AS VARCHAR) AS project_id, + LOWER(COALESCE(p.project_name, 'unknown')) AS project_name, + LOWER(m.metric) AS metric_name, + TRY_CAST(t.amount AS DOUBLE) AS amount_usd + FROM oso.timeseries_metrics_by_project_v0 t + JOIN oso.metrics_v0 m + ON m.metric_id = t.metric_id + LEFT JOIN oso.projects_v1 p + ON p.project_id = t.project_id + WHERE LOWER(m.metric) IN ('funding_received', 'funding_awarded') +), + +q AS ( + SELECT + DATE_TRUNC('quarter', CAST(sample_date AS TIMESTAMP)) AS quarter_start, + YEAR(CAST(sample_date AS TIMESTAMP)) AS year, + CONCAT('Q', CAST(QUARTER(CAST(sample_date AS TIMESTAMP)) AS VARCHAR)) AS quarter_label, + project_id, + project_name, + SUM(CASE WHEN metric_name = 'funding_received' THEN amount_usd ELSE 0 END) AS income_usd, + SUM(CASE WHEN metric_name = 'funding_awarded' THEN amount_usd ELSE 0 END) AS expense_usd + FROM src + GROUP BY 1,2,3,4,5 +) + +SELECT + quarter_start, + year, + quarter_label, + project_id, + project_name, + income_usd, + expense_usd, + (COALESCE(income_usd,0) - COALESCE(expense_usd,0)) AS net_usd +FROM q +ORDER BY quarter_start, project_name; From c941cf6805466da51a06445a708981ca2f700781 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 14 Oct 2025 00:53:56 +0530 Subject: [PATCH 3/5] ruff check and fix --- warehouse/metrics_tools/local/loader.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/warehouse/metrics_tools/local/loader.py b/warehouse/metrics_tools/local/loader.py index 3ba9f30140..de90a887b5 100644 --- a/warehouse/metrics_tools/local/loader.py +++ b/warehouse/metrics_tools/local/loader.py @@ -3,7 +3,7 @@ import os import re import typing as t -from datetime import datetime, timedelta +from datetime import datetime import duckdb import pyarrow as pa @@ -25,7 +25,6 @@ from pyiceberg.typedef import Identifier from sqlglot import exp from sqlmesh.core.dialect import parse_one -from google.api_core.exceptions import DeadlineExceeded logger = logging.getLogger(__name__) @@ -167,7 +166,7 @@ def bq_try_read_with_options( max_results_per_query: int, ): result = None - increment = timedelta(days=1) + # increment = timedelta(days=1) # Exponential increments for reading from bigquery, in case the initial # restriction is too small # while result is None: From 11efb712c0ef03ec817dc41efb24e5a28d9f91f3 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 14 Oct 2025 01:06:38 +0530 Subject: [PATCH 4/5] fix metric name --- .../finance/project_pnl_quarterly_from_metrics_v1.sql | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql index ab2ca3d31a..6a9cf61b38 100644 --- a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql +++ b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql @@ -4,10 +4,6 @@ MODEL ( dialect trino, kind FULL, partitioned_by year(quarter_start), - tags ( - 'model_stage=mart', - 'entity_category=project' - ), audits (HAS_AT_LEAST_N_ROWS(threshold := 0)) ); @@ -17,14 +13,14 @@ WITH src AS ( t.sample_date AS sample_date, CAST(t.project_id AS VARCHAR) AS project_id, LOWER(COALESCE(p.project_name, 'unknown')) AS project_name, - LOWER(m.metric) AS metric_name, + LOWER(m.metric_name) AS metric_name, TRY_CAST(t.amount AS DOUBLE) AS amount_usd FROM oso.timeseries_metrics_by_project_v0 t JOIN oso.metrics_v0 m ON m.metric_id = t.metric_id LEFT JOIN oso.projects_v1 p ON p.project_id = t.project_id - WHERE LOWER(m.metric) IN ('funding_received', 'funding_awarded') + WHERE LOWER(m.metric_name) IN ('funding_received', 'funding_awarded') ), q AS ( From 483984fdafc3a48204fdc7b1d11ec82cc6a45fdb Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 14 Oct 2025 01:19:56 +0530 Subject: [PATCH 5/5] add tags --- .../marts/finance/project_pnl_quarterly_from_metrics_v1.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql index 6a9cf61b38..e0b955fda6 100644 --- a/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql +++ b/warehouse/oso_sqlmesh/models/marts/finance/project_pnl_quarterly_from_metrics_v1.sql @@ -4,6 +4,7 @@ MODEL ( dialect trino, kind FULL, partitioned_by year(quarter_start), + tags ('model_stage=mart', 'entity_category=project'), audits (HAS_AT_LEAST_N_ROWS(threshold := 0)) );