From b8d330bf4b8c9a2d4e630c62ba8ddbd02a819578 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 18 Apr 2024 17:10:20 -0400 Subject: [PATCH] TableLastModifiedMetadataBatch capability (#744) * TableLastModifiedMetadataBatch capability * pin ddtrace * open connection before calculate_freshness_from_metadata_batch * remove calculate_freshness_from_metadata_batch override * changelog entry * TestGetLastRelationModifiedBatch * restore dev-requirements.txt --- .../unreleased/Features-20240404-171441.yaml | 6 + dbt/adapters/redshift/impl.py | 1 + .../test_get_relation_last_modified.py | 128 ++++++++++++++++-- 3 files changed, 125 insertions(+), 10 deletions(-) create mode 100644 .changes/unreleased/Features-20240404-171441.yaml diff --git a/.changes/unreleased/Features-20240404-171441.yaml b/.changes/unreleased/Features-20240404-171441.yaml new file mode 100644 index 000000000..f1ac623c0 --- /dev/null +++ b/.changes/unreleased/Features-20240404-171441.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Support TableLastModifiedMetadataBatch capability +time: 2024-04-04T17:14:41.313087-07:00 +custom: + Author: michelleark + Issue: "755" diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index a77601895..18faee48c 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -58,6 +58,7 @@ class RedshiftAdapter(SQLAdapter): { Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full), } ) diff --git a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py index 6a77d22ae..c31e9ac61 100644 --- a/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py +++ b/tests/functional/adapter/sources_freshness_tests/test_get_relation_last_modified.py @@ -1,12 +1,24 @@ import os +import pytest +from unittest import mock +from dbt.adapters.redshift.impl import RedshiftAdapter +from dbt.adapters.capability import Capability, CapabilityDict +from dbt.cli.main import dbtRunner from dbt.tests.util import run_dbt -import pytest from tests.functional.adapter.sources_freshness_tests import files -class TestGetLastRelationModified: +class SetupGetLastRelationModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + + +class TestGetLastRelationModified(SetupGetLastRelationModified): @pytest.fixture(scope="class") def seeds(self): return { @@ -18,14 +30,6 @@ def seeds(self): def models(self): return {"schema.yml": files.SCHEMA_YML} - @pytest.fixture(scope="class", autouse=True) - def setup(self, project): - # we need the schema name for the sources section - os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema - run_dbt(["seed"]) - yield - del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] - @pytest.mark.parametrize( "source,status,expect_pass", [ @@ -34,9 +38,113 @@ def setup(self, project): ], ) def test_get_last_relation_modified(self, project, source, status, expect_pass): + run_dbt(["seed"]) + results = run_dbt( ["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass ) assert len(results) == 1 result = results[0] assert result.status == status + + +freshness_metadata_schema_batch_yml = """ +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table + - name: test_table2 + - name: test_table_with_loaded_at_field + loaded_at_field: my_loaded_at_field +""" + + +class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified): + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_metadata_schema_batch_yml} + + def get_freshness_result_for_table(self, table_name, results): + for result in results: + if result.node.name == table_name: + return result + return None + + def test_get_last_relation_modified_batch(self, project, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table as (select 1 as id, 'test' as name);" + ) + project.run_sql( + f"create table {custom_schema}.test_table2 as (select 1 as id, 'test' as name);" + ) + project.run_sql( + f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);" + ) + + runner = dbtRunner() + freshness_results_batch = runner.invoke(["source", "freshness"]).result + + assert len(freshness_results_batch) == 3 + test_table_batch_result = self.get_freshness_result_for_table( + "test_table", freshness_results_batch + ) + test_table2_batch_result = self.get_freshness_result_for_table( + "test_table2", freshness_results_batch + ) + test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table( + "test_table_with_loaded_at_field", freshness_results_batch + ) + + # Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy + capabilities_no_batch = CapabilityDict( + { + capability: support + for capability, support in RedshiftAdapter.capabilities().items() + if capability != Capability.TableLastModifiedMetadataBatch + } + ) + with mock.patch.object( + RedshiftAdapter, "capabilities", return_value=capabilities_no_batch + ): + freshness_results = runner.invoke(["source", "freshness"]).result + + assert len(freshness_results) == 3 + test_table_result = self.get_freshness_result_for_table("test_table", freshness_results) + test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results) + test_table_with_loaded_at_field_result = self.get_freshness_result_for_table( + "test_table_with_loaded_at_field", freshness_results + ) + + # assert results between batch vs non-batch freshness strategy are equivalent + assert test_table_result.status == test_table_batch_result.status + assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at + + assert test_table2_result.status == test_table2_batch_result.status + assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at + + assert ( + test_table_with_loaded_at_field_batch_result.status + == test_table_with_loaded_at_field_result.status + ) + assert ( + test_table_with_loaded_at_field_batch_result.max_loaded_at + == test_table_with_loaded_at_field_result.max_loaded_at + )