diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index d500355bc6..1f66ffeec9 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -136,7 +136,7 @@ jobs: - name: Run extract and pipeline tests run: | - pytest tests/extract tests/pipeline tests/libs tests/destinations tests/sources ${{ matrix.pytest_args }} + pytest tests/extract tests/pipeline tests/libs tests/destinations tests/dataset tests/sources ${{ matrix.pytest_args }} if: matrix.python-version != '3.14' # here we upgrade sql alchemy to 2 an run the sql_database tests again diff --git a/dlt/common/libs/ibis.py b/dlt/common/libs/ibis.py index b47a584957..5aaa07ceac 100644 --- a/dlt/common/libs/ibis.py +++ b/dlt/common/libs/ibis.py @@ -19,7 +19,7 @@ try: # ibis imports follow the convention used in the ibis source code import ibis - from ibis import util as ibis_util + from ibis import util as ibis_util, options as ibis_options import ibis.expr.datatypes as dt import ibis.expr.operations as ops import ibis.expr.schema as sch @@ -150,7 +150,10 @@ def compile( # noqa # expr r_ = self._dataset.query(expr) if limit: - r_ = r_.limit(int(limit)) + if limit == "default": + limit = ibis_options.sql.default_limit + if limit: + r_ = r_.limit(int(limit)) sql = r_.to_sql(pretty=pretty) self._log(sql) # TODO: allow for `params` diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index 5693baa3d2..34831d54fa 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -1356,7 +1356,7 @@ def __init__(self, reason: str) -> None: def add_arrow_metadata( item: Union[pyarrow.Table, pyarrow.RecordBatch], metadata: dict[str, Any] -) -> pyarrow.Table: +) -> Union[pyarrow.Table, pyarrow.RecordBatch]: # Get current metadata or initialize empty schema = item.schema current = schema.metadata or {} diff --git a/dlt/dataset/relation.py b/dlt/dataset/relation.py index 1d770f7211..6341abd662 100644 --- a/dlt/dataset/relation.py +++ b/dlt/dataset/relation.py @@ -1,33 +1,45 @@ from __future__ import annotations -from typing import overload, Union, Any, Generator, Optional, Sequence, Type, TYPE_CHECKING +from typing import ( + Iterator, + overload, + Union, + Any, + Generator, + Optional, + Sequence, + Type, + TYPE_CHECKING, +) from textwrap import indent from contextlib import contextmanager -from dlt.common.utils import simple_repr, without_none from sqlglot import maybe_parse from sqlglot.optimizer.merge_subqueries import merge_subqueries from sqlglot.expressions import ExpOrStr as SqlglotExprOrStr - import sqlglot.expressions as sge import dlt +from dlt.common import json from dlt.common.destination.dataset import TFilterOperation from dlt.common.libs.sqlglot import to_sqlglot_type, build_typed_literal, TSqlGlotDialect from dlt.common.libs.utils import is_instance_lib from dlt.common.schema.typing import TTableSchema, TTableSchemaColumns from dlt.common.typing import Self, TSortOrder from dlt.common.exceptions import ValueErrorWithKnownValues +from dlt.common.destination.dataset import SupportsDataAccess +from dlt.common.utils import simple_repr, without_none + from dlt.dataset import lineage +from dlt.extract.wrappers import wrap_additional_type, should_wrap_additional_type_in_list from dlt.destinations.sql_client import SqlClientBase, WithSqlClient from dlt.destinations.queries import _normalize_query, build_select_expr -from dlt.common.exceptions import MissingDependencyException -from dlt.common.destination.dataset import SupportsDataAccess if TYPE_CHECKING: from ibis import ir from dlt.helpers.ibis import Expr as IbisExpr + from pyarrow import Table, RecordBatch _FILTER_OP_MAP = { @@ -564,3 +576,30 @@ def _get_relation_output_columns_schema( allow_partial=allow_partial, ) return columns_schema, normalized_query + + +def iterate_relation_as_arrow( + relation: Relation, buffer_max_items: int +) -> Iterator[Union[Table, RecordBatch]]: + """Materializes and yields data from relation as Arrow batches. Preserves relation schema in + Arrow metadata. + """ + from dlt.common.libs.pyarrow import add_arrow_metadata + from dlt.extract.hints import DLT_HINTS_METADATA_KEY + + serialized_hints = json.dumps(relation.schema) + for chunk in relation.iter_arrow(chunk_size=buffer_max_items): + yield add_arrow_metadata(chunk, {DLT_HINTS_METADATA_KEY: serialized_hints}) + + +@wrap_additional_type.register(Relation) +def _(data: Any) -> Any: + gen_ = iterate_relation_as_arrow(data, buffer_max_items=50000) + # add name to generator so resource can pick it up as its own name + gen_.__name__ = data._table_name or "query_relation" # type: ignore[attr-defined] + return gen_ + + +@should_wrap_additional_type_in_list.register(Relation) +def _(data: Any) -> bool: + return True diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index e2f6da0a2c..a3c7669384 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -496,7 +496,7 @@ def resource( @overload def resource( - data: Union[List[Any], Iterator[Any]], + data: Any, /, name: str = None, table_name: TTableHintTemplate[str] = None, diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 60e3886206..12c5e555cb 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -58,6 +58,7 @@ from dlt.extract.extractors import ObjectExtractor, ArrowExtractor, Extractor, ModelExtractor from dlt.extract.state import reset_resource_state from dlt.extract.utils import get_data_item_format, make_schema_with_default_name +from dlt.extract.wrappers import should_wrap_additional_type_in_list def select_schema(pipeline: SupportsPipeline) -> Schema: @@ -133,8 +134,11 @@ def append_data(data_item: Any) -> None: ) if isinstance(data, C_Sequence) and len(data) > 0: - # if first element is source or resource - if isinstance(data[0], (DltResource, DltSource)): + # if first element is source or resource, assumes that lists are of single type + first_elem = data[0] + if isinstance(first_elem, (DltResource, DltSource)) or should_wrap_additional_type_in_list( + first_elem + ): for item in data: append_data(item) else: diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index ac771bcf07..0e4152eefa 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -7,9 +7,9 @@ Sequence, Mapping, List, - NamedTuple, ) from typing_extensions import Self +import sqlglot from dlt.common import logger from dlt.common.schema.typing import ( @@ -40,7 +40,6 @@ migrate_complex_types, new_column, new_table, - merge_table, ) from dlt.common.typing import TAny, TDataItem, TColumnNames from dlt.common.time import ensure_pendulum_datetime_utc @@ -57,8 +56,6 @@ from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator -import sqlglot - DLT_HINTS_METADATA_KEY = "dlt_resource_hints" diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index b5782137f2..11997e94c9 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -173,6 +173,9 @@ def from_data( if inject_config: r_._inject_config() else: + # wrap additional types + data = wrap_additional_type(data) + if callable(data): name = name or get_callable_name(data) @@ -184,9 +187,6 @@ def from_data( if not name: raise ResourceNameMissing() - # wrap additional types - data = wrap_additional_type(data) - # several iterable types are not allowed and must be excluded right away if isinstance(data, (str, dict)): raise InvalidResourceDataTypeBasic(name, data, type(data)) diff --git a/dlt/extract/wrappers.py b/dlt/extract/wrappers.py index e761fcdeab..e00bbfc479 100644 --- a/dlt/extract/wrappers.py +++ b/dlt/extract/wrappers.py @@ -1,4 +1,5 @@ from typing import Any +from functools import singledispatch from dlt.common.typing import NoneType from dlt.common.exceptions import MissingDependencyException @@ -19,9 +20,12 @@ ArrowTable, ArrowRecords = NoneType, NoneType +@singledispatch def wrap_additional_type(data: Any) -> Any: """Wraps any known additional type so it is accepted by DltResource""" # pass through None: if optional deps are not defined, they fallback to None type + # NOTE: arrow/pandas should be registered in their respective helpers via single dispatch + # but that creates circular dep that I do not want to resolve right now if data is None: return data @@ -29,3 +33,11 @@ def wrap_additional_type(data: Any) -> Any: return [data] return data + + +@singledispatch +def should_wrap_additional_type_in_list(data: Any) -> bool: + """Tells if list element is wrapable type for extraction. Used by `extract` step to wrap + individual list elements + """ + return False diff --git a/dlt/sources/sql_database/__init__.py b/dlt/sources/sql_database/__init__.py index 1c1ead4eba..f1569df5f0 100644 --- a/dlt/sources/sql_database/__init__.py +++ b/dlt/sources/sql_database/__init__.py @@ -25,6 +25,7 @@ table_to_resource_hints, ReflectionLevel, TTypeAdapter, + TReflectedHints, ) @@ -260,6 +261,7 @@ def sql_table( if table_obj is None and not defer_table_reflect: table_obj = Table(table, metadata, autoload_with=engine, resolve_fks=resolve_foreign_keys) + hints: TReflectedHints if table_obj is not None: if not defer_table_reflect: table_obj = _execute_table_adapter( @@ -280,7 +282,7 @@ def sql_table( # may be from what is found in the reflection, so it is set explicitly hints["primary_key"] = [primary_key] if isinstance(primary_key, str) else list(primary_key) - return decorators.resource( + return decorators.resource( # type: ignore[no-any-return] table_rows, name=str(table), write_disposition=write_disposition, diff --git a/tests/dataset/conftest.py b/tests/dataset/conftest.py new file mode 100644 index 0000000000..3df00184b1 --- /dev/null +++ b/tests/dataset/conftest.py @@ -0,0 +1,6 @@ +from tests.utils import ( + preserve_environ, + autouse_test_storage, + auto_test_run_context, + deactivate_pipeline, +) diff --git a/tests/dataset/test_relation.py b/tests/dataset/test_relation.py index b53dde17a7..040fe80432 100644 --- a/tests/dataset/test_relation.py +++ b/tests/dataset/test_relation.py @@ -3,6 +3,7 @@ import pytest import dlt +from dlt.common import Decimal # TODO move destination-independent tests from `test_read_interfaces.py` to this module @@ -17,8 +18,15 @@ def purchases(): {"id": 3, "name": "charlie", "city": "barcelona"}, ) + @dlt.resource + def items(): + yield from ( + {"id": 1, "name": "labubu", "price": Decimal("23.1")}, + {"id": 2, "name": "ububal", "price": Decimal("13.2")}, + ) + pipeline = dlt.pipeline("_relation_to_ibis", destination="duckdb") - pipeline.run([purchases]) + pipeline.run([purchases, items]) return pipeline.dataset() @@ -72,3 +80,43 @@ def test_transformed_relation_to_ibis_(purchases: dlt.Relation) -> None: assert isinstance(table, ir.Table) # executes without error table.execute() + + +def test_relation_extraction(purchases: dlt.Relation) -> None: + from dlt.extract.hints import DLT_HINTS_METADATA_KEY + + r_ = dlt.resource(purchases) + assert r_.name == "purchases" + results = list(r_) + assert len(results) == 1 + assert len(results[0]) == 3 + # assert column names are same as relation columns + arrow_table = results[0] + assert list(arrow_table.column_names) == purchases.columns + # assert that DLT_HINTS_METADATA_KEY is there in arrow metadata + assert DLT_HINTS_METADATA_KEY.encode("utf-8") in arrow_table.schema.metadata + + +def test_relation_list_extraction(dataset: dlt.Dataset) -> None: + purchases = dataset.table("purchases") + + # note that list element when passed to the decorator are not separately wrapped + r_ = dlt.resource([purchases], name="fail") + assert list(r_)[0] is purchases + + # but in pipeline they will + pipeline = dlt.pipeline("double_relation", destination="duckdb", dataset_name="_data") + pipeline.run([purchases]) + table = pipeline.dataset().table("purchases").arrow() + # assert column names are same as relation columns + assert list(table.column_names) == purchases.columns + assert len(table) == 3 + + items = dataset.table("items") + pipeline.run([purchases, items]) + table = pipeline.dataset().table("purchases").arrow() + assert list(table.column_names) == purchases.columns + assert len(table) == 6 + table = pipeline.dataset().table("items").arrow() + assert list(table.column_names) == items.columns + assert len(table) == 2 diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index f4b45506f7..e13e0969be 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -1027,7 +1027,7 @@ def top_level_resource(secret=dlt.secrets.value, config=dlt.config.value, opt: s def test_spec_generation() -> None: # outer resource does not take default params - SPEC = get_fun_spec(top_level_resource._pipe.gen) # type: ignore[arg-type] + SPEC = get_fun_spec(top_level_resource._pipe.gen) fields = SPEC.get_resolvable_fields() assert len(fields) == 3 @@ -1039,7 +1039,7 @@ def test_spec_generation() -> None: def inner_resource(secret=dlt.secrets.value, config=dlt.config.value, opt: str = "A"): yield 1 - SPEC = get_fun_spec(inner_resource("TS", "CFG")._pipe.gen) # type: ignore[arg-type] + SPEC = get_fun_spec(inner_resource("TS", "CFG")._pipe.gen) fields = SPEC.get_resolvable_fields() # resources inject full signature now assert len(fields) == 3 @@ -1559,8 +1559,8 @@ def will_decorate(secret: str = dlt.secrets.value, opt=1) -> List[int]: return [1, 2, 3] # get signature for all wrappers - sig_bottom = inspect.signature(will_decorate.__wrapped__.__wrapped__) # type: ignore[attr-defined] - sig_middle = inspect.signature(will_decorate.__wrapped__) # type: ignore[attr-defined] + sig_bottom = inspect.signature(will_decorate.__wrapped__.__wrapped__) + sig_middle = inspect.signature(will_decorate.__wrapped__) sig_top = inspect.signature(will_decorate, follow_wrapped=False) print("SIG YTOP SIG", will_decorate.__signature__) @@ -1648,15 +1648,15 @@ def some_gen(): resource = dlt.resource(some_gen, parallelized=True) # Generator func is wrapped with parallelized gen that yields callables - gen = resource._pipe.gen() # type: ignore - result = next(gen) # type: ignore[arg-type] + gen = resource._pipe.gen() + result = next(gen) assert result() == 1 assert list(resource) == [1, 2, 3] # Same but wrapping generator directly resource = dlt.resource(some_gen(), parallelized=True) - result = next(resource._pipe.gen) # type: ignore + result = next(resource._pipe.gen) assert result() == 1 # get remaining items assert list(resource) == [2, 3] @@ -1704,8 +1704,8 @@ async def some_data(): resource = dlt.resource(gen_orig, parallelized=True) gen = resource._pipe.gen - next(gen) # type: ignore - gen.close() # type: ignore + next(gen) + gen.close() with pytest.raises(StopIteration): # Inner generator is also closed diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 9be3d27d32..911088b3e1 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -2601,9 +2601,7 @@ def test_type_3(updated_at: dlt.sources.incremental[int]): data = [{"updated_at": d} for d in [1, 2, 3]] yield data_to_item_format(item_type, data) - r = test_type_3( - dlt.sources.incremental[float]("updated_at", allow_external_schedulers=True) # type: ignore[arg-type] - ) + r = test_type_3(dlt.sources.incremental[float]("updated_at", allow_external_schedulers=True)) list(r) assert r.incremental.incremental.get_incremental_value_type() is float diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 039466cc49..aff8b7753d 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -979,7 +979,7 @@ def test_limit_infinite_counter() -> None: @pytest.mark.parametrize("limit", (None, -1, 0, 10)) def test_limit_edge_cases(limit: int) -> None: - r = dlt.resource(range(20), name="resource").add_limit(limit) # type: ignore[call-overload] + r = dlt.resource(range(20), name="resource").add_limit(limit) @dlt.resource() async def r_async(): @@ -1313,7 +1313,7 @@ def test_source(): assert s.test_resource.state == {} with Container().injectable_context(StateInjectableContext(state={})) as state: - r.state["direct"] = True # type: ignore[index] + r.state["direct"] = True s.test_resource.state["in-source"] = True # type: ignore[index] # resource section is current module print(state.state)