-
Notifications
You must be signed in to change notification settings - Fork 395
allows to pass Relation to run/extract method in Pipeline #3272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
docs | 3964f20 | Commit Preview URL Branch Preview URL |
Oct 31 2025, 11:17 AM |
|
For our follow-up discussion, I think this is roughly the algorithm. This may seem like a lot, but the details will be particularly important if we want to execute transformation DAG without round-trips between memory and backend flowchart TD
Start([Pipeline Execution Starts]) --> Execution
Execution[Execute on-backend] --> CheckDataset{relation.dataset == pipeline.dataset}
CheckDataset -->|No| ReadTable[Read table]
CheckDataset -->|Yes| CheckExec{Relation method called}
CheckExec -->|.arrow| ReadTable[Read table]
CheckExec -->|.iter_arrow| ReadBatch[Read batch]
CheckExec -->|None| Write
ReadBatch --> WriteBatch{Configured batch write}
ReadTable --> WriteBatch{Configured batch write}
WriteBatch -->|No| WriteTable[Write table]
WriteBatch -->|Yes| WriteBatches[Write batches]
WriteTable --> Write[Write data to dataset]
WriteBatches --> Write
Write --> End([Load completed])
Scenario 1Param:
pipeline = dlt.pipeline("ingest")
pipeline.run(rest_source())
dataset = pipeline.dataset()
relation = dataset.table("foo").select("a", "b", "c")
pipeline.run(relation, table_name="rel_table")Scenario 2Same as Scenario 1, but user wants to force in-memory execute (there's no good reason here). This must be done explicitly in code by calling
# ibidem
relation = dataset.table("foo").select("a", "b", "c")
pipeline.run(relation.arrow(), table_name="rel_table")Scenario 3Same as scenario 1, but we use a different write pipeline and dataset for namespacing. Same destination.
destination = dlt.destinations.duckdb("duck.duckdb")
pipeline = dlt.pipeline("ingest", destination=destination)
pipeline.run(rest_source())
dataset = pipeline.dataset()
relation = dataset.table("foo").select("a", "b", "c")
other_pipeline = dlt.pipeline("transform", destination=destination)
other_pipeline.run(relation, table_name="rel_table")Scenario 4Now, our second pipeline writes to a different destination and dataset. Data must be loaded in memory.
destination = dlt.destinations.duckdb("duck.duckdb")
pipeline = dlt.pipeline("ingest", destination=destination)
pipeline.run(rest_source())
dataset = pipeline.dataset()
relation = dataset.table("foo").select("a", "b", "c")
other_destination = dlt.destinations.bigquery()
other_pipeline = dlt.pipeline("transform", destination=other_destination)
other_pipeline.run(relation, table_name="rel_table")Scenario 5In scenarios where data is loaded in memory, the batch write vs full table write is configured via the dlt Input:
# ibidem
# because of config.toml, relation will read the full table and write in batch
other_pipeline.run(relation, table_name="rel_table")Scenario 6In scenarios where data is loaded in memory, it can be desirable to read in batch instead of loading the full table. This needs to be explicitly done via Python. Reusing scenario 3.
# ibidem
relation = dataset.table("foo").select("a", "b", "c")
pipeline.run(relation.iter_arrow(), table_name="rel_table") |
Description
Now relation can be passed like any other data to pipeline and to
dlt.resource. Alsodlt.resourcein typing (which was the case for a long time)