Skip to content

Commit 501d436

Browse files
committed
Optimize sync call for MVP executor
(cherry picked from commit c2672ed2ac356de6f925f1be9474d4fba678ae48)
1 parent 6d49cac commit 501d436

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

airflow/providers/edge/executors/edge_executor.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,18 @@ def execute_async(
8989
@provide_session
9090
def sync(self, session: Session = NEW_SESSION) -> None:
9191
"""Sync will get called periodically by the heartbeat method."""
92-
jobs: list[EdgeJobModel] = session.query(EdgeJobModel).all()
92+
purged_marker = False
93+
job_success_purge = conf.getint("edge", "job_success_purge")
94+
job_fail_purge = conf.getint("edge", "job_fail_purge")
95+
jobs: list[EdgeJobModel] = (
96+
session.query(EdgeJobModel)
97+
.filter(
98+
EdgeJobModel.state.in_(
99+
[TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, TaskInstanceState.FAILED]
100+
)
101+
)
102+
.all()
103+
)
93104
for job in jobs:
94105
if job.key in self.running:
95106
if job.state == TaskInstanceState.RUNNING:
@@ -109,8 +120,6 @@ def sync(self, session: Session = NEW_SESSION) -> None:
109120
self.fail(job.key)
110121
else:
111122
self.last_reported_state[job.key] = job.state
112-
job_success_purge = conf.getint("edge", "job_success_purge")
113-
job_fail_purge = conf.getint("edge", "job_fail_purge")
114123
if (
115124
job.state == TaskInstanceState.SUCCESS
116125
and job.last_update_t < (datetime.now() - timedelta(minutes=job_success_purge)).timestamp()
@@ -120,6 +129,7 @@ def sync(self, session: Session = NEW_SESSION) -> None:
120129
):
121130
if job.key in self.last_reported_state:
122131
del self.last_reported_state[job.key]
132+
purged_marker = True
123133
session.delete(job)
124134
session.execute(
125135
delete(EdgeLogsModel).where(
@@ -130,7 +140,8 @@ def sync(self, session: Session = NEW_SESSION) -> None:
130140
EdgeLogsModel.try_number == job.try_number,
131141
)
132142
)
133-
session.commit()
143+
if purged_marker:
144+
session.commit()
134145

135146
def end(self) -> None:
136147
"""End the executor."""

0 commit comments

Comments
 (0)