Skip to content

Commit 593888a

Browse files
committed
initial prototype
1 parent dfc072f commit 593888a

File tree

8 files changed

+2404
-5
lines changed

8 files changed

+2404
-5
lines changed

src/airflow_provider_aiida/aiida_core/engine/__init__.py

Whitespace-only changes.

src/airflow_provider_aiida/aiida_core/engine/calcjob/__init__.py

Whitespace-only changes.

src/airflow_provider_aiida/aiida_core/engine/calcjob/tasks.py

Lines changed: 767 additions & 0 deletions
Large diffs are not rendered by default.

src/airflow_provider_aiida/aiida_core/transport.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,18 @@ def get_authinfo_from_airflow_connection(conn_id: str):
105105

106106

107107
def get_transport_queue() -> TransportQueue:
108-
"""Return a per-process shared TransportQueue instance."""
109-
global _TRANSPORT_QUEUE
110-
if _TRANSPORT_QUEUE is None:
111-
_TRANSPORT_QUEUE = TransportQueue()
112-
return _TRANSPORT_QUEUE
108+
"""Return a TransportQueue instance using the current event loop.
109+
110+
Note: Always creates a new TransportQueue to ensure it uses the current
111+
event loop. This is necessary because Airflow triggers run in different
112+
async contexts with different event loops.
113+
"""
114+
import asyncio
115+
try:
116+
loop = asyncio.get_running_loop()
117+
except RuntimeError:
118+
loop = asyncio.get_event_loop()
119+
return TransportQueue(loop=loop)
113120

114121

115122
def get_authinfo_cached(conn_id: str):

0 commit comments

Comments
 (0)