Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
90fa8fd
Fix rendering of template fields with start from trigger
dabla Feb 2, 2026
5f7306d
refactor: Check if TaskInstance exists or not in BaseTrigger
dabla Feb 2, 2026
2fddf07
Revert "refactor: Check if TaskInstance exists or not in BaseTrigger"
dabla Feb 2, 2026
a21201f
refactor: Changed return type of task_instance property in BaseTrigger
dabla Feb 2, 2026
937a379
refactor: Make sure default values for start from trigger can be over…
dabla Feb 6, 2026
6819e63
refactor: Remove assert on start_date of TaskInstance
dabla Feb 6, 2026
f5dd331
refactor: Make sure to check if dag_data is not None in workloads bef…
dabla Feb 6, 2026
ed0594b
refactor: Only pass serialized dag model to workload if trigger conta…
dabla Feb 6, 2026
ec34481
refactor: Don't invoke _read_dag twice in get_dag method of DBDagBag …
dabla Feb 6, 2026
df75780
refactor: Don't invoke _read_dag twice in get_dag method of DBDagBag …
dabla Feb 6, 2026
23d7aea
refactor: Make _version_from_dag_run method of DBDagBag failsafe for …
dabla Feb 6, 2026
6684575
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 6, 2026
6c28707
refactor: Moved None check on start_state together with the task in o…
dabla Feb 6, 2026
96d1893
Revert "refactor: Make _version_from_dag_run method of DBDagBag fails…
dabla Feb 6, 2026
4ab221e
refactor: Fixed test_get_dag_model
dabla Feb 6, 2026
1da2ca1
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 6, 2026
0b4a36a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
67bfa1e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
bb80bfc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 7, 2026
558c88a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 9, 2026
d5d339f
refactor: Only pass serialized Dag model data to RunTrigger if start_…
dabla Feb 9, 2026
c7ce525
refactor: Added docstrings for start_from_trigger and start_trigger_args
dabla Feb 9, 2026
f631b37
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 9, 2026
499f463
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 10, 2026
259ae7b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
a7e7c69
refactor: Templated field must be checked on task of task instance
dabla Feb 11, 2026
a019832
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
b18e942
refactor: Added start_from_trigger property on Trigger
dabla Feb 11, 2026
2dc4f4b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
337b22f
refactor: Reformatted trigger unit test
dabla Feb 11, 2026
c2de568
refactor: Only the RuntimeTaskInstance has the task attribute, the ge…
dabla Feb 11, 2026
02e3413
refactor: Reformatted test trigger
dabla Feb 11, 2026
064e59e
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
9010f1c
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 11, 2026
a3a9964
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 12, 2026
af9553d
Update airflow-core/src/airflow/serialization/definitions/mappedopera…
dabla Feb 13, 2026
40b9b2d
refactor: Removed obsolete run method from TaskInstance
dabla Feb 13, 2026
5042c3c
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 13, 2026
3e92f47
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 23, 2026
f3ca48a
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Feb 24, 2026
85ac14d
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
3041c11
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
56f8cec
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
cc9b2cc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
f3ffb51
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 3, 2026
c2c98b7
refactor: Added dag_data field to RunTrigger and made ti field optional
dabla Mar 4, 2026
1a85868
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 4, 2026
76ee5a6
refactor: Reformatted RunTrigger
dabla Mar 4, 2026
2834afc
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
d19f690
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
399c94b
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
6790e65
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
5aace8f
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 10, 2026
5331992
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 12, 2026
39f0782
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 12, 2026
75dfc99
refactor: We cannot detect if a Trigger has a task associated with a …
dabla Mar 12, 2026
1436811
refactor: Re-added check on start_from_trigger from serialized Dag
dabla Mar 12, 2026
97b4d45
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
bac2d24
refactor: Fixed call to dag_bag in get_dag_for_run_or_latest_version …
dabla Mar 17, 2026
4796335
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
8e38dcf
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 17, 2026
19f975b
refactor: Extracted _do_render_template_fields method into Template s…
dabla Mar 19, 2026
869ba37
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
1d0c6fd
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
2684561
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 19, 2026
c2f8271
refactor: task_id should be an instance field instead of property
dabla Mar 20, 2026
8717575
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
f7fa0b0
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
cd23b0c
refactor: Added tests for _do_render_template_fields method in TestTe…
dabla Mar 20, 2026
66b94f5
refactor: Fixed templater unit tests
dabla Mar 20, 2026
f5e3289
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
2bfafa1
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
0f564af
Merge branch 'main' into fix/render-templated-fields-start-from-trigg…
dabla Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ repos:
^src/airflow/timetables/assets\.py$|
^src/airflow/timetables/base\.py$|
^src/airflow/timetables/simple\.py$|
^src/airflow/triggers/base\.py$|
^src/airflow/utils/cli\.py$|
^src/airflow/utils/context\.py$|
^src/airflow/utils/dag_cycle_tester\.py$|
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/common/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_dag_for_run_or_latest_version(
dag: SerializedDAG | None = None
if dag_run:
if dag_run.created_dag_version_id:
dag = dag_bag._get_dag(dag_run.created_dag_version_id, session=session)
dag = dag_bag.get_dag(dag_run.created_dag_version_id, session=session)
if not dag:
dag = dag_bag.get_dag_for_run(dag_run, session=session)
elif dag_id:
Expand Down
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/executors/workloads/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ class RunTrigger(BaseModel):
"""

id: int
ti: TaskInstanceDTO | None # Could be none for asset-based triggers.
classpath: str # Dot-separated name of the module+fn to import and run this workload.
encrypted_kwargs: str
ti: TaskInstanceDTO | None = None # Could be none for asset-based triggers.
timeout_after: datetime | None = None
type: Literal["RunTrigger"] = Field(init=False, default="RunTrigger")
dag_data: dict | None = (
None # Serialized Dag model in dict format so it can be deserialized in trigger subprocess.
)
197 changes: 136 additions & 61 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from airflow.executors.workloads.task import TaskInstanceDTO
from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.dagbag import DBDagBag
from airflow.models.trigger import Trigger
from airflow.observability.metrics import stats_utils
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
Expand Down Expand Up @@ -81,10 +82,12 @@
_RequestFrame,
)
from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.serialization.serialized_objects import DagSerialization
from airflow.triggers.base import BaseEventTrigger, BaseTrigger, DiscrimatedTriggerEvent, TriggerEvent
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.session import NEW_SESSION, create_session, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session
Expand All @@ -93,6 +96,7 @@
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.jobs.job import Job
from airflow.sdk.api.client import Client
from airflow.sdk.definitions.context import Context
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -626,6 +630,66 @@ def emit_metrics(self):
extra_tags={"hostname": self.job.hostname},
)

@provide_session
def create_workload(
self,
trigger: Trigger,
dag_bag: DBDagBag,
render_log_fname=log_filename_template_renderer(),
session: Session = NEW_SESSION,
) -> workloads.RunTrigger | None:
if trigger.task_instance is None:
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
)

if not trigger.task_instance.dag_version_id:
# This is to handle 2 to 3 upgrade where TI.dag_version_id can be none
log.warning(
"TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger",
ti_id=trigger.task_instance.id,
)
return None

log_path = render_log_fname(ti=trigger.task_instance)
ser_ti = TaskInstanceDTO.model_validate(trigger.task_instance, from_attributes=True)

# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[trigger.id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

serialized_dag_model = dag_bag.get_dag_model(
version_id=trigger.task_instance.dag_version_id,
session=session,
)

if serialized_dag_model:
task = serialized_dag_model.dag.get_task(trigger.task_instance.task_id)

# When a TaskInstance of a Trigger contains a task with start_from_trigger enabled,
# it means we need to load the SerializedDagModel so we can build a RuntimeTaskInstance later on which
# will allow us to build a context on which we will render the templated fields.
if task.start_from_trigger:
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=ser_ti,
timeout_after=trigger.task_instance.trigger_timeout,
dag_data=serialized_dag_model.data,
)
return workloads.RunTrigger(
id=trigger.id,
classpath=trigger.classpath,
encrypted_kwargs=trigger.encrypted_kwargs,
ti=ser_ti,
timeout_after=trigger.task_instance.trigger_timeout,
)

def update_triggers(self, requested_trigger_ids: set[int]):
"""
Request that we update what triggers we're running.
Expand All @@ -634,8 +698,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
adds them to the dequeues so the subprocess can actually mutate the running
trigger set.
"""
render_log_fname = log_filename_template_renderer()

dag_bag = DBDagBag()
known_trigger_ids = (
self.running_triggers.union(x[0] for x in self.events)
.union(self.cancelling_triggers)
Expand All @@ -646,60 +709,45 @@ def update_triggers(self, requested_trigger_ids: set[int]):
new_trigger_ids = requested_trigger_ids - known_trigger_ids
cancel_trigger_ids = self.running_triggers - requested_trigger_ids
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids)
trigger_ids_with_non_task_associations = Trigger.fetch_trigger_ids_with_non_task_associations()
to_create: list[workloads.RunTrigger] = []
# Add in new triggers
for new_id in new_trigger_ids:
# Check it didn't vanish in the meantime
if new_id not in new_triggers:
log.warning("Trigger disappeared before we could start it", id=new_id)
continue

new_trigger_orm = new_triggers[new_id]

# If the trigger is not associated to a task, an asset, or a callback, this means the TaskInstance
# row was updated by either Trigger.submit_event or Trigger.submit_failure
# and can happen when a single trigger Job is being run on multiple TriggerRunners
# in a High-Availability setup.
if new_trigger_orm.task_instance is None and new_id not in trigger_ids_with_non_task_associations:
log.info(
(
"TaskInstance Trigger is None. It was likely updated by another trigger job. "
"Skipping trigger instantiation."
),
id=new_id,
)
continue

workload = workloads.RunTrigger(
classpath=new_trigger_orm.classpath,
id=new_id,
encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
ti=None,
with create_session() as session:
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids, session=session)
trigger_ids_with_non_task_associations = Trigger.fetch_trigger_ids_with_non_task_associations(
session=session
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)
if not new_trigger_orm.task_instance.dag_version_id:
# This is to handle 2 to 3 upgrade where TI.dag_version_id can be none
log.warning(
"TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger",
ti_id=new_trigger_orm.task_instance.id,
)
to_create: list[workloads.RunTrigger] = []
# Add in new triggers
for new_trigger_id in new_trigger_ids:
# Check it didn't vanish in the meantime
if new_trigger_id not in new_triggers:
log.warning("Trigger disappeared before we could start it", id=new_trigger_id)
continue
ser_ti = TaskInstanceDTO.model_validate(new_trigger_orm.task_instance, from_attributes=True)
# When producing logs from TIs, include the job id producing the logs to disambiguate it.
self.logger_cache[new_id] = TriggerLoggingFactory(
log_path=f"{log_path}.trigger.{self.job.id}.log",
ti=ser_ti, # type: ignore
)

workload.ti = ser_ti
workload.timeout_after = new_trigger_orm.task_instance.trigger_timeout
new_trigger_orm = new_triggers[new_trigger_id]

# If the trigger is not associated to a task, an asset, or a callback, this means the TaskInstance
# row was updated by either Trigger.submit_event or Trigger.submit_failure
# and can happen when a single trigger Job is being run on multiple TriggerRunners
# in a High-Availability setup.
if (
new_trigger_orm.task_instance is None
and new_trigger_id not in trigger_ids_with_non_task_associations
):
log.info(
(
"TaskInstance of Trigger is None. It was likely updated by another trigger job. "
"Skipping trigger instantiation."
),
id=new_trigger_id,
)
continue

to_create.append(workload)
if workload := self.create_workload(
trigger=new_trigger_orm, dag_bag=dag_bag, session=session
):
to_create.append(workload)

self.creating_triggers.extend(to_create)
self.creating_triggers.extend(to_create)

if cancel_trigger_ids:
# Enqueue orphaned triggers for cancellation
Expand Down Expand Up @@ -954,9 +1002,19 @@ async def init_comms(self):
raise RuntimeError(f"Required first message to be a messages.StartTriggerer, it was {msg}")

async def create_triggers(self):
def create_runtime_ti(encoded_dag: dict) -> RuntimeTaskInstance:
task = DagSerialization.from_dict(encoded_dag).get_task(workload.ti.task_id)

# I need to recreate a TaskInstance from task_runner before invoking get_template_context (airflow.executors.workloads.TaskInstance)
return RuntimeTaskInstance.model_construct(
**workload.ti.model_dump(exclude_unset=True),
task=task,
)

"""Drain the to_create queue and create all new triggers that have been requested in the DB."""
while self.to_create:
await asyncio.sleep(0)
context: Context | None = None
workload = self.to_create.popleft()
trigger_id = workload.id
if trigger_id in self.triggers:
Expand Down Expand Up @@ -984,24 +1042,32 @@ async def create_triggers(self):
# that could cause None values in collections.
kw = Trigger._decrypt_kwargs(workload.encrypted_kwargs)
deserialised_kwargs = {k: smart_decode_trigger_kwargs(v) for k, v in kw.items()}
trigger_instance = trigger_class(**deserialised_kwargs)

if ti := workload.ti:
trigger_name = f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID {trigger_id})"
trigger_instance = trigger_class(**deserialised_kwargs)

if workload.dag_data:
runtime_ti = create_runtime_ti(workload.dag_data)
context = runtime_ti.get_template_context()
trigger_instance.task_instance = runtime_ti
else:
trigger_instance.task_instance = ti
else:
trigger_name = f"ID {trigger_id}"
trigger_instance = trigger_class(**deserialised_kwargs)
except TypeError as err:
self.log.error("Trigger failed to inflate", error=err)
self.failed_triggers.append((trigger_id, err))
continue
trigger_instance.trigger_id = trigger_id
trigger_instance.triggerer_job_id = self.job_id
trigger_instance.task_instance = ti = workload.ti
trigger_instance.timeout_after = workload.timeout_after

trigger_name = (
f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} (ID {trigger_id})"
if ti
else f"ID {trigger_id}"
)
self.triggers[trigger_id] = {
"task": asyncio.create_task(
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after), name=trigger_name
self.run_trigger(trigger_id, trigger_instance, workload.timeout_after, context),
name=trigger_name,
),
"is_watcher": isinstance(trigger_instance, BaseEventTrigger),
"name": trigger_name,
Expand Down Expand Up @@ -1168,7 +1234,13 @@ async def block_watchdog(self):
)
Stats.incr("triggers.blocked_main_thread")

async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None):
async def run_trigger(
self,
trigger_id: int,
trigger: BaseTrigger,
timeout_after: datetime | None = None,
context: Context | None = None,
):
"""Run a trigger (they are async generators) and push their events into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
import greenback
Expand All @@ -1180,6 +1252,9 @@ async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after
name = self.triggers[trigger_id]["name"]
self.log.info("trigger %s starting", name)
try:
if context:
trigger.render_template_fields(context=context)

async for event in trigger.run():
await self.log.ainfo(
"Trigger fired event", name=self.triggers[trigger_id]["name"], result=event
Expand Down
41 changes: 22 additions & 19 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,27 @@ class DBDagBag:
"""

def __init__(self, load_op_links: bool = True) -> None:
self._dags: dict[UUID, SerializedDAG] = {} # dag_version_id to dag
self._dags: dict[UUID, SerializedDagModel] = {} # dag_version_id to dag
self.load_op_links = load_op_links

def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
serdag.load_op_links = self.load_op_links
if dag := serdag.dag:
self._dags[serdag.dag_version_id] = dag
def _read_dag(self, serialized_dag_model: SerializedDagModel) -> SerializedDAG | None:
serialized_dag_model.load_op_links = self.load_op_links
if dag := serialized_dag_model.dag:
self._dags[serialized_dag_model.dag_version_id] = serialized_dag_model
return dag

def _get_dag(self, version_id: UUID, session: Session) -> SerializedDAG | None:
if dag := self._dags.get(version_id):
return dag
dag_version = session.get(DagVersion, version_id, options=[joinedload(DagVersion.serialized_dag)])
if not dag_version:
return None
if not (serdag := dag_version.serialized_dag):
return None
return self._read_dag(serdag)
def get_dag_model(self, version_id: UUID, session: Session) -> SerializedDagModel | None:
if not (serialized_dag_model := self._dags.get(version_id)):
dag_version = session.get(DagVersion, version_id, options=[joinedload(DagVersion.serialized_dag)])
if not dag_version or not (serialized_dag_model := dag_version.serialized_dag):
return None
self._read_dag(serialized_dag_model)
return serialized_dag_model

def get_dag(self, version_id: UUID, session: Session) -> SerializedDAG | None:
if serialized_dag_model := self.get_dag_model(version_id=version_id, session=session):
return serialized_dag_model.dag
return None

@staticmethod
def _version_from_dag_run(dag_run: DagRun, *, session: Session) -> UUID | None:
Expand All @@ -74,24 +77,24 @@ def _version_from_dag_run(dag_run: DagRun, *, session: Session) -> UUID | None:

def get_dag_for_run(self, dag_run: DagRun, session: Session) -> SerializedDAG | None:
if version_id := self._version_from_dag_run(dag_run=dag_run, session=session):
return self._get_dag(version_id=version_id, session=session)
return self.get_dag(version_id=version_id, session=session)
return None

def iter_all_latest_version_dags(self, *, session: Session) -> Generator[SerializedDAG, None, None]:
"""Walk through all latest version dags available in the database."""
from airflow.models.serialized_dag import SerializedDagModel

for sdm in session.scalars(select(SerializedDagModel)):
if dag := self._read_dag(sdm):
for serialized_dag_model in session.scalars(select(SerializedDagModel)):
if dag := self._read_dag(serialized_dag_model):
yield dag

def get_latest_version_of_dag(self, dag_id: str, *, session: Session) -> SerializedDAG | None:
"""Get the latest version of a dag by its id."""
from airflow.models.serialized_dag import SerializedDagModel

if not (serdag := SerializedDagModel.get(dag_id, session=session)):
if not (serialized_dag_model := SerializedDagModel.get(dag_id, session=session)):
return None
return self._read_dag(serdag)
return self._read_dag(serialized_dag_model)


def generate_md5_hash(context):
Expand Down
Loading
Loading