Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ jobs:

- name: Run extract and pipeline tests
run: |
pytest tests/extract tests/pipeline tests/libs tests/destinations tests/sources ${{ matrix.pytest_args }}
pytest tests/extract tests/pipeline tests/libs tests/destinations tests/dataset tests/sources ${{ matrix.pytest_args }}
if: matrix.python-version != '3.14'

# here we upgrade sql alchemy to 2 an run the sql_database tests again
Expand Down
7 changes: 5 additions & 2 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
try:
# ibis imports follow the convention used in the ibis source code
import ibis
from ibis import util as ibis_util
from ibis import util as ibis_util, options as ibis_options
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
Expand Down Expand Up @@ -150,7 +150,10 @@ def compile( # noqa
# expr
r_ = self._dataset.query(expr)
if limit:
r_ = r_.limit(int(limit))
if limit == "default":
limit = ibis_options.sql.default_limit
if limit:
r_ = r_.limit(int(limit))
sql = r_.to_sql(pretty=pretty)
self._log(sql)
# TODO: allow for `params`
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ def __init__(self, reason: str) -> None:

def add_arrow_metadata(
item: Union[pyarrow.Table, pyarrow.RecordBatch], metadata: dict[str, Any]
) -> pyarrow.Table:
) -> Union[pyarrow.Table, pyarrow.RecordBatch]:
# Get current metadata or initialize empty
schema = item.schema
current = schema.metadata or {}
Expand Down
49 changes: 44 additions & 5 deletions dlt/dataset/relation.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
from __future__ import annotations

from typing import overload, Union, Any, Generator, Optional, Sequence, Type, TYPE_CHECKING
from typing import (
Iterator,
overload,
Union,
Any,
Generator,
Optional,
Sequence,
Type,
TYPE_CHECKING,
)
from textwrap import indent
from contextlib import contextmanager
from dlt.common.utils import simple_repr, without_none

from sqlglot import maybe_parse
from sqlglot.optimizer.merge_subqueries import merge_subqueries
from sqlglot.expressions import ExpOrStr as SqlglotExprOrStr

import sqlglot.expressions as sge

import dlt
from dlt.common import json
from dlt.common.destination.dataset import TFilterOperation
from dlt.common.libs.sqlglot import to_sqlglot_type, build_typed_literal, TSqlGlotDialect
from dlt.common.libs.utils import is_instance_lib
from dlt.common.schema.typing import TTableSchema, TTableSchemaColumns
from dlt.common.typing import Self, TSortOrder
from dlt.common.exceptions import ValueErrorWithKnownValues
from dlt.common.destination.dataset import SupportsDataAccess
from dlt.common.utils import simple_repr, without_none

from dlt.dataset import lineage
from dlt.extract.wrappers import wrap_additional_type, should_wrap_additional_type_in_list
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.destinations.queries import _normalize_query, build_select_expr
from dlt.common.exceptions import MissingDependencyException
from dlt.common.destination.dataset import SupportsDataAccess


if TYPE_CHECKING:
from ibis import ir
from dlt.helpers.ibis import Expr as IbisExpr
from pyarrow import Table, RecordBatch


_FILTER_OP_MAP = {
Expand Down Expand Up @@ -564,3 +576,30 @@ def _get_relation_output_columns_schema(
allow_partial=allow_partial,
)
return columns_schema, normalized_query


def iterate_relation_as_arrow(
relation: Relation, buffer_max_items: int
) -> Iterator[Union[Table, RecordBatch]]:
"""Materializes and yields data from relation as Arrow batches. Preserves relation schema in
Arrow metadata.
"""
from dlt.common.libs.pyarrow import add_arrow_metadata
from dlt.extract.hints import DLT_HINTS_METADATA_KEY

serialized_hints = json.dumps(relation.schema)
for chunk in relation.iter_arrow(chunk_size=buffer_max_items):
yield add_arrow_metadata(chunk, {DLT_HINTS_METADATA_KEY: serialized_hints})


@wrap_additional_type.register(Relation)
def _(data: Any) -> Any:
gen_ = iterate_relation_as_arrow(data, buffer_max_items=50000)
# add name to generator so resource can pick it up as its own name
gen_.__name__ = data._table_name or "query_relation" # type: ignore[attr-defined]
return gen_


@should_wrap_additional_type_in_list.register(Relation)
def _(data: Any) -> bool:
return True
2 changes: 1 addition & 1 deletion dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def resource(

@overload
def resource(
data: Union[List[Any], Iterator[Any]],
data: Any,
/,
name: str = None,
table_name: TTableHintTemplate[str] = None,
Expand Down
8 changes: 6 additions & 2 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from dlt.extract.extractors import ObjectExtractor, ArrowExtractor, Extractor, ModelExtractor
from dlt.extract.state import reset_resource_state
from dlt.extract.utils import get_data_item_format, make_schema_with_default_name
from dlt.extract.wrappers import should_wrap_additional_type_in_list


def select_schema(pipeline: SupportsPipeline) -> Schema:
Expand Down Expand Up @@ -133,8 +134,11 @@ def append_data(data_item: Any) -> None:
)

if isinstance(data, C_Sequence) and len(data) > 0:
# if first element is source or resource
if isinstance(data[0], (DltResource, DltSource)):
# if first element is source or resource, assumes that lists are of single type
first_elem = data[0]
if isinstance(first_elem, (DltResource, DltSource)) or should_wrap_additional_type_in_list(
first_elem
):
for item in data:
append_data(item)
else:
Expand Down
5 changes: 1 addition & 4 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
Sequence,
Mapping,
List,
NamedTuple,
)
from typing_extensions import Self
import sqlglot

from dlt.common import logger
from dlt.common.schema.typing import (
Expand Down Expand Up @@ -40,7 +40,6 @@
migrate_complex_types,
new_column,
new_table,
merge_table,
)
from dlt.common.typing import TAny, TDataItem, TColumnNames
from dlt.common.time import ensure_pendulum_datetime_utc
Expand All @@ -57,8 +56,6 @@
from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint
from dlt.extract.validation import create_item_validator

import sqlglot


DLT_HINTS_METADATA_KEY = "dlt_resource_hints"

Expand Down
6 changes: 3 additions & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def from_data(
if inject_config:
r_._inject_config()
else:
# wrap additional types
data = wrap_additional_type(data)

if callable(data):
name = name or get_callable_name(data)

Expand All @@ -184,9 +187,6 @@ def from_data(
if not name:
raise ResourceNameMissing()

# wrap additional types
data = wrap_additional_type(data)

# several iterable types are not allowed and must be excluded right away
if isinstance(data, (str, dict)):
raise InvalidResourceDataTypeBasic(name, data, type(data))
Expand Down
12 changes: 12 additions & 0 deletions dlt/extract/wrappers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any
from functools import singledispatch

from dlt.common.typing import NoneType
from dlt.common.exceptions import MissingDependencyException
Expand All @@ -19,13 +20,24 @@
ArrowTable, ArrowRecords = NoneType, NoneType


@singledispatch
def wrap_additional_type(data: Any) -> Any:
"""Wraps any known additional type so it is accepted by DltResource"""
# pass through None: if optional deps are not defined, they fallback to None type
# NOTE: arrow/pandas should be registered in their respective helpers via single dispatch
# but that creates circular dep that I do not want to resolve right now
if data is None:
return data

if isinstance(data, (PandaFrame, ArrowTable, ArrowRecords)):
return [data]

return data


@singledispatch
def should_wrap_additional_type_in_list(data: Any) -> bool:
"""Tells if list element is wrapable type for extraction. Used by `extract` step to wrap
individual list elements
"""
return False
4 changes: 3 additions & 1 deletion dlt/sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
table_to_resource_hints,
ReflectionLevel,
TTypeAdapter,
TReflectedHints,
)


Expand Down Expand Up @@ -260,6 +261,7 @@ def sql_table(
if table_obj is None and not defer_table_reflect:
table_obj = Table(table, metadata, autoload_with=engine, resolve_fks=resolve_foreign_keys)

hints: TReflectedHints
if table_obj is not None:
if not defer_table_reflect:
table_obj = _execute_table_adapter(
Expand All @@ -280,7 +282,7 @@ def sql_table(
# may be from what is found in the reflection, so it is set explicitly
hints["primary_key"] = [primary_key] if isinstance(primary_key, str) else list(primary_key)

return decorators.resource(
return decorators.resource( # type: ignore[no-any-return]
table_rows,
name=str(table),
write_disposition=write_disposition,
Expand Down
6 changes: 6 additions & 0 deletions tests/dataset/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from tests.utils import (
preserve_environ,
autouse_test_storage,
auto_test_run_context,
deactivate_pipeline,
)
50 changes: 49 additions & 1 deletion tests/dataset/test_relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

import dlt
from dlt.common import Decimal

# TODO move destination-independent tests from `test_read_interfaces.py` to this module

Expand All @@ -17,8 +18,15 @@ def purchases():
{"id": 3, "name": "charlie", "city": "barcelona"},
)

@dlt.resource
def items():
yield from (
{"id": 1, "name": "labubu", "price": Decimal("23.1")},
{"id": 2, "name": "ububal", "price": Decimal("13.2")},
)

pipeline = dlt.pipeline("_relation_to_ibis", destination="duckdb")
pipeline.run([purchases])
pipeline.run([purchases, items])
return pipeline.dataset()


Expand Down Expand Up @@ -72,3 +80,43 @@ def test_transformed_relation_to_ibis_(purchases: dlt.Relation) -> None:
assert isinstance(table, ir.Table)
# executes without error
table.execute()


def test_relation_extraction(purchases: dlt.Relation) -> None:
from dlt.extract.hints import DLT_HINTS_METADATA_KEY

r_ = dlt.resource(purchases)
assert r_.name == "purchases"
results = list(r_)
assert len(results) == 1
assert len(results[0]) == 3
# assert column names are same as relation columns
arrow_table = results[0]
assert list(arrow_table.column_names) == purchases.columns
# assert that DLT_HINTS_METADATA_KEY is there in arrow metadata
assert DLT_HINTS_METADATA_KEY.encode("utf-8") in arrow_table.schema.metadata


def test_relation_list_extraction(dataset: dlt.Dataset) -> None:
purchases = dataset.table("purchases")

# note that list element when passed to the decorator are not separately wrapped
r_ = dlt.resource([purchases], name="fail")
assert list(r_)[0] is purchases

# but in pipeline they will
pipeline = dlt.pipeline("double_relation", destination="duckdb", dataset_name="_data")
pipeline.run([purchases])
table = pipeline.dataset().table("purchases").arrow()
# assert column names are same as relation columns
assert list(table.column_names) == purchases.columns
assert len(table) == 3

items = dataset.table("items")
pipeline.run([purchases, items])
table = pipeline.dataset().table("purchases").arrow()
assert list(table.column_names) == purchases.columns
assert len(table) == 6
table = pipeline.dataset().table("items").arrow()
assert list(table.column_names) == items.columns
assert len(table) == 2
18 changes: 9 additions & 9 deletions tests/extract/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ def top_level_resource(secret=dlt.secrets.value, config=dlt.config.value, opt: s

def test_spec_generation() -> None:
# outer resource does not take default params
SPEC = get_fun_spec(top_level_resource._pipe.gen) # type: ignore[arg-type]
SPEC = get_fun_spec(top_level_resource._pipe.gen)
fields = SPEC.get_resolvable_fields()

assert len(fields) == 3
Expand All @@ -1039,7 +1039,7 @@ def test_spec_generation() -> None:
def inner_resource(secret=dlt.secrets.value, config=dlt.config.value, opt: str = "A"):
yield 1

SPEC = get_fun_spec(inner_resource("TS", "CFG")._pipe.gen) # type: ignore[arg-type]
SPEC = get_fun_spec(inner_resource("TS", "CFG")._pipe.gen)
fields = SPEC.get_resolvable_fields()
# resources inject full signature now
assert len(fields) == 3
Expand Down Expand Up @@ -1559,8 +1559,8 @@ def will_decorate(secret: str = dlt.secrets.value, opt=1) -> List[int]:
return [1, 2, 3]

# get signature for all wrappers
sig_bottom = inspect.signature(will_decorate.__wrapped__.__wrapped__) # type: ignore[attr-defined]
sig_middle = inspect.signature(will_decorate.__wrapped__) # type: ignore[attr-defined]
sig_bottom = inspect.signature(will_decorate.__wrapped__.__wrapped__)
sig_middle = inspect.signature(will_decorate.__wrapped__)
sig_top = inspect.signature(will_decorate, follow_wrapped=False)
print("SIG YTOP SIG", will_decorate.__signature__)

Expand Down Expand Up @@ -1648,15 +1648,15 @@ def some_gen():
resource = dlt.resource(some_gen, parallelized=True)

# Generator func is wrapped with parallelized gen that yields callables
gen = resource._pipe.gen() # type: ignore
result = next(gen) # type: ignore[arg-type]
gen = resource._pipe.gen()
result = next(gen)
assert result() == 1
assert list(resource) == [1, 2, 3]

# Same but wrapping generator directly
resource = dlt.resource(some_gen(), parallelized=True)

result = next(resource._pipe.gen) # type: ignore
result = next(resource._pipe.gen)
assert result() == 1
# get remaining items
assert list(resource) == [2, 3]
Expand Down Expand Up @@ -1704,8 +1704,8 @@ async def some_data():
resource = dlt.resource(gen_orig, parallelized=True)
gen = resource._pipe.gen

next(gen) # type: ignore
gen.close() # type: ignore
next(gen)
gen.close()

with pytest.raises(StopIteration):
# Inner generator is also closed
Expand Down
Loading
Loading