Skip to content

Commit

Permalink
TableLastModifiedMetadataBatch capability (#744)
Browse files Browse the repository at this point in the history
* TableLastModifiedMetadataBatch capability

* pin ddtrace

* open connection before calculate_freshness_from_metadata_batch

* remove calculate_freshness_from_metadata_batch override

* changelog entry

* TestGetLastRelationModifiedBatch

* restore dev-requirements.txt
  • Loading branch information
MichelleArk authored Apr 18, 2024
1 parent 663b8ed commit b8d330b
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-171441.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support TableLastModifiedMetadataBatch capability
time: 2024-04-04T17:14:41.313087-07:00
custom:
Author: michelleark
Issue: "755"
1 change: 1 addition & 0 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RedshiftAdapter(SQLAdapter):
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadataBatch: CapabilitySupport(support=Support.Full),
}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import os
import pytest
from unittest import mock

from dbt.adapters.redshift.impl import RedshiftAdapter
from dbt.adapters.capability import Capability, CapabilityDict
from dbt.cli.main import dbtRunner
from dbt.tests.util import run_dbt
import pytest

from tests.functional.adapter.sources_freshness_tests import files


class TestGetLastRelationModified:
class SetupGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]


class TestGetLastRelationModified(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def seeds(self):
return {
Expand All @@ -18,14 +30,6 @@ def seeds(self):
def models(self):
return {"schema.yml": files.SCHEMA_YML}

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
# we need the schema name for the sources section
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
run_dbt(["seed"])
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.mark.parametrize(
"source,status,expect_pass",
[
Expand All @@ -34,9 +38,113 @@ def setup(self, project):
],
)
def test_get_last_relation_modified(self, project, source, status, expect_pass):
run_dbt(["seed"])

results = run_dbt(
["source", "freshness", "--select", f"source:{source}"], expect_pass=expect_pass
)
assert len(results) == 1
result = results[0]
assert result.status == status


freshness_metadata_schema_batch_yml = """
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
- name: test_table2
- name: test_table_with_loaded_at_field
loaded_at_field: my_loaded_at_field
"""


class TestGetLastRelationModifiedBatch(SetupGetLastRelationModified):
@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_metadata_schema_batch_yml}

def get_freshness_result_for_table(self, table_name, results):
for result in results:
if result.node.name == table_name:
return result
return None

def test_get_last_relation_modified_batch(self, project, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table as (select 1 as id, 'test' as name);"
)
project.run_sql(
f"create table {custom_schema}.test_table2 as (select 1 as id, 'test' as name);"
)
project.run_sql(
f"create table {custom_schema}.test_table_with_loaded_at_field as (select 1 as id, timestamp '2009-09-15 10:59:43' as my_loaded_at_field);"
)

runner = dbtRunner()
freshness_results_batch = runner.invoke(["source", "freshness"]).result

assert len(freshness_results_batch) == 3
test_table_batch_result = self.get_freshness_result_for_table(
"test_table", freshness_results_batch
)
test_table2_batch_result = self.get_freshness_result_for_table(
"test_table2", freshness_results_batch
)
test_table_with_loaded_at_field_batch_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results_batch
)

# Remove TableLastModifiedMetadataBatch and run freshness on same input without batch strategy
capabilities_no_batch = CapabilityDict(
{
capability: support
for capability, support in RedshiftAdapter.capabilities().items()
if capability != Capability.TableLastModifiedMetadataBatch
}
)
with mock.patch.object(
RedshiftAdapter, "capabilities", return_value=capabilities_no_batch
):
freshness_results = runner.invoke(["source", "freshness"]).result

assert len(freshness_results) == 3
test_table_result = self.get_freshness_result_for_table("test_table", freshness_results)
test_table2_result = self.get_freshness_result_for_table("test_table2", freshness_results)
test_table_with_loaded_at_field_result = self.get_freshness_result_for_table(
"test_table_with_loaded_at_field", freshness_results
)

# assert results between batch vs non-batch freshness strategy are equivalent
assert test_table_result.status == test_table_batch_result.status
assert test_table_result.max_loaded_at == test_table_batch_result.max_loaded_at

assert test_table2_result.status == test_table2_batch_result.status
assert test_table2_result.max_loaded_at == test_table2_batch_result.max_loaded_at

assert (
test_table_with_loaded_at_field_batch_result.status
== test_table_with_loaded_at_field_result.status
)
assert (
test_table_with_loaded_at_field_batch_result.max_loaded_at
== test_table_with_loaded_at_field_result.max_loaded_at
)

0 comments on commit b8d330b

Please sign in to comment.