Skip to content

Commit 49ced46

Browse files
Supper filters in asset factory and decorator
1 parent 9591d5f commit 49ced46

File tree

3 files changed

+20
-5
lines changed

3 files changed

+20
-5
lines changed

python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from dagster import AssetsDefinition, multi_asset
44
from dagster._annotations import beta
55

6-
from dagster_fivetran.resources import FivetranWorkspace
6+
from dagster_fivetran.resources import FivetranFilter, FivetranWorkspace
77
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
88

99

@@ -15,6 +15,7 @@ def fivetran_assets(
1515
name: Optional[str] = None,
1616
group_name: Optional[str] = None,
1717
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
18+
fivetran_filter: Optional[FivetranFilter] = None,
1819
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
1920
"""Create a definition for how to sync the tables of a given Fivetran connector.
2021
@@ -27,6 +28,7 @@ def fivetran_assets(
2728
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
2829
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
2930
Defaults to :py:class:`DagsterFivetranTranslator`.
31+
fivetran_filter (Optional[FivetranFilter]): Filters the set of Fivetran objects to fetch.
3032
3133
Examples:
3234
Sync the tables of a Fivetran connector:
@@ -102,6 +104,7 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
102104
103105
"""
104106
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()
107+
fivetran_filter = fivetran_filter or FivetranFilter(connector_ids=frozenset({connector_id}))
105108

106109
return multi_asset(
107110
name=name,
@@ -110,7 +113,8 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
110113
specs=[
111114
spec
112115
for spec in workspace.load_asset_specs(
113-
dagster_fivetran_translator=dagster_fivetran_translator
116+
dagster_fivetran_translator=dagster_fivetran_translator,
117+
fivetran_filter=fivetran_filter,
114118
)
115119
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
116120
],

python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@
3232
from dagster._utils.log import get_dagster_logger
3333

3434
from dagster_fivetran.asset_decorator import fivetran_assets
35-
from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource, FivetranWorkspace
35+
from dagster_fivetran.resources import (
36+
DEFAULT_POLL_INTERVAL,
37+
FivetranFilter,
38+
FivetranResource,
39+
FivetranWorkspace,
40+
)
3641
from dagster_fivetran.translator import (
3742
DagsterFivetranTranslator,
3843
FivetranConnectorTableProps,
@@ -733,6 +738,7 @@ def build_fivetran_assets_definitions(
733738
*,
734739
workspace: FivetranWorkspace,
735740
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
741+
fivetran_filter: Optional[FivetranFilter] = None,
736742
) -> Sequence[AssetsDefinition]:
737743
"""The list of AssetsDefinition for all connectors in the Fivetran workspace.
738744
@@ -741,6 +747,7 @@ def build_fivetran_assets_definitions(
741747
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
742748
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
743749
Defaults to :py:class:`DagsterFivetranTranslator`.
750+
fivetran_filter (Optional[FivetranFilter]): Filters the set of Fivetran objects to fetch.
744751
745752
Returns:
746753
List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace.
@@ -807,9 +814,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
807814
808815
"""
809816
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()
817+
fivetran_filter = fivetran_filter or FivetranFilter()
810818

811819
all_asset_specs = workspace.load_asset_specs(
812-
dagster_fivetran_translator=dagster_fivetran_translator
820+
dagster_fivetran_translator=dagster_fivetran_translator, fivetran_filter=fivetran_filter
813821
)
814822

815823
connector_ids = {
@@ -826,6 +834,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
826834
name=connector_id,
827835
group_name=connector_id,
828836
dagster_fivetran_translator=dagster_fivetran_translator,
837+
fivetran_filter=fivetran_filter,
829838
)
830839
def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace):
831840
yield from fivetran.sync_and_poll(context=context)

python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,9 @@ def fetch_fivetran_workspace_data(
921921
fivetran_filter: Optional[FivetranFilter] = None,
922922
) -> FivetranWorkspaceData:
923923
"""Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object.
924-
fivetran_filter (Optional[FivetranFilter]): Filters the set of Fivetran objects to fetch.
924+
925+
Args:
926+
fivetran_filter (Optional[FivetranFilter]): Filters the set of Fivetran objects to fetch.
925927
926928
Returns:
927929
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.

0 commit comments

Comments
 (0)