Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
airflow.cfg
storage
local_workdir
remote_workdir
logs
*.db
simple_auth_manager_passwords.json.generated
airflow.db-shm
airflow.db-wal
aiida-quantumespresso
scripts/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class


# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST


# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

80 changes: 80 additions & 0 deletions AUTO_DAG_SYSTEM.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Automatic DAG Generation System

## Overview

System that generates **physical DAG files** for all AiiDA plugins during installation.

**Result**: 30+ AiiDA plugins → 30+ physical DAG `.py` files

## Installation & Generation

### Generate DAG Files

```bash
# Run this after installing new AiiDA plugins
python3 scripts/generate_dags.py
```

This creates physical `.py` files in: `src/airflow_provider_aiida/auto_generated_dags/`

### Install Package

```bash
pip install -e .
```

## Files

### Generated (30+ files)
```
src/airflow_provider_aiida/auto_generated_dags/
├── __init__.py
├── aiida_calcjob_core_arithmetic_add.py
├── aiida_calcjob_core_stash.py
├── aiida_workchain_core_arithmetic_multiply_add.py
└── ... (27+ more)
```

### Source Files
```
scripts/generate_dags.py # Generator script
pyproject.toml # Entry point configuration
```

## Usage

### After Installing New AiiDA Plugin

```bash
# 1. Install new plugin
pip install aiida-new-plugin

# 2. Regenerate DAG files
python3 scripts/generate_dags.py

# 3. Restart Airflow
```

### Trigger a DAG

```python
from aiida import load_profile
from aiida.orm import load_code, Int
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation

load_profile()
code = load_code('bash@localhost')
inputs = {'code': code, 'x': Int(5), 'y': Int(3)}
process = ArithmeticAddCalculation(inputs=inputs)
process._save_checkpoint()

# Trigger: airflow dags trigger aiida_calcjob_core_arithmetic_add \
# --conf '{"node_pk": <process.node.pk>}'
```

## Benefits

✅ **Fast**: DAGs pre-generated, no startup penalty
✅ **Persistent**: Physical files on disk
✅ **Maintainable**: Regenerate when needed
✅ **Standard**: Normal Python files, standard Airflow workflow
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ provider_info = "airflow_provider_aiida.__init__:get_provider_info"

[project.entry-points."aiida.dags"]
aiida-standard = "airflow_provider_aiida.example_dags"
aiida-auto = "airflow_provider_aiida.auto_generated_dags"

[tool.hatch.version]
path = "src/airflow_provider_aiida/__init__.py"
Expand Down
142 changes: 142 additions & 0 deletions scripts/generate_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Generate physical DAG files for all registered AiiDA plugins.
Run once during installation to create DAG files.
"""
import os
from pathlib import Path
from importlib_metadata import entry_points


def generate_calcjob_dag_file(entry_point_name: str, class_path: str, output_dir: Path):
"""Generate a DAG file for a CalcJob."""
dag_id = f"aiida_calcjob_{entry_point_name.replace('.', '_')}"
filename = f"{dag_id}.py"

content = f'''"""Auto-generated DAG for AiiDA CalcJob: {entry_point_name}"""
from airflow import DAG
from airflow.models.param import Param
from airflow_provider_aiida.taskgroups.calcjob import CalcJobTaskGroup
from {class_path.rsplit('.', 1)[0]} import {class_path.rsplit('.', 1)[1]}

with DAG(
dag_id='{dag_id}',
description='Auto-generated DAG for AiiDA CalcJob: {entry_point_name}',
params={{
"node_pk": Param("", type="integer", description="AiiDA node PK to resume execution")
}},
render_template_as_native_obj=True,
tags=['aiida', 'auto-generated', 'calcjob', '{entry_point_name}'],
catchup=False,
) as dag:
task = CalcJobTaskGroup(
process_class={class_path.rsplit('.', 1)[1]},
node_pk="{{{{ params.node_pk }}}}",
)
'''

filepath = output_dir / filename
filepath.write_text(content)
return filename


def generate_workchain_dag_file(entry_point_name: str, class_path: str, output_dir: Path):
"""Generate a DAG file for a WorkChain."""
dag_id = f"aiida_workchain_{entry_point_name.replace('.', '_')}"
filename = f"{dag_id}.py"

content = f'''"""Auto-generated DAG for AiiDA WorkChain: {entry_point_name}"""
from airflow import DAG
from airflow.models.param import Param
from airflow_provider_aiida.taskgroups.workchain import WorkChainTaskGroup
from {class_path.rsplit('.', 1)[0]} import {class_path.rsplit('.', 1)[1]}

with DAG(
dag_id='{dag_id}',
description='Auto-generated DAG for AiiDA WorkChain: {entry_point_name}',
params={{
"node_pk": Param("", type="integer", description="AiiDA node PK to resume execution")
}},
render_template_as_native_obj=True,
tags=['aiida', 'auto-generated', 'workchain', '{entry_point_name}'],
catchup=False,
) as dag:
task = WorkChainTaskGroup(
process_class={class_path.rsplit('.', 1)[1]},
node_pk="{{{{ params.node_pk }}}}",
)
'''

filepath = output_dir / filename
filepath.write_text(content)
return filename


def main():
"""Generate all DAG files."""
# Output directory
package_root = Path(__file__).parent.parent
output_dir = package_root / "src" / "airflow_provider_aiida" / "auto_generated_dags"
output_dir.mkdir(exist_ok=True)

print(f"Generating DAG files in: {output_dir}")

generated_files = []

# Discover and generate CalcJob DAGs
eps = entry_points()
calc_eps = eps.select(group='aiida.calculations')

for ep in calc_eps:
try:
plugin_class = ep.load()
# Skip if not a class (some are functions)
if not isinstance(plugin_class, type):
continue

class_path = f"{plugin_class.__module__}.{plugin_class.__name__}"
filename = generate_calcjob_dag_file(ep.name, class_path, output_dir)
generated_files.append(filename)
print(f" ✓ Generated: {filename}")
except Exception as e:
print(f" ✗ Failed to generate DAG for {ep.name}: {e}")

# Discover and generate WorkChain DAGs
workflow_eps = eps.select(group='aiida.workflows')

for ep in workflow_eps:
try:
plugin_class = ep.load()
# Skip if not a class (some are functions)
if not isinstance(plugin_class, type):
continue

class_path = f"{plugin_class.__module__}.{plugin_class.__name__}"
filename = generate_workchain_dag_file(ep.name, class_path, output_dir)
generated_files.append(filename)
print(f" ✓ Generated: {filename}")
except Exception as e:
print(f" ✗ Failed to generate DAG for {ep.name}: {e}")

# Generate __init__.py to import all DAGs
init_content = '"""Auto-generated DAGs for AiiDA plugins."""\n\n'
init_content += "# Import all generated DAGs\n"
for filename in sorted(generated_files):
module_name = filename.replace('.py', '')
init_content += f"from .{module_name} import dag as {module_name}\n"

init_content += "\n# Export all DAGs\n__all__ = [\n"
for filename in sorted(generated_files):
module_name = filename.replace('.py', '')
init_content += f" '{module_name}',\n"
init_content += "]\n"

(output_dir / "__init__.py").write_text(init_content)
print(f"\n✅ Generated {len(generated_files)} DAG files + __init__.py")
return len(generated_files)


if __name__ == "__main__":
import sys
count = main()
sys.exit(0 if count > 0 else 1)
36 changes: 36 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Setup script to generate DAG files during installation."""
from setuptools import setup
from setuptools.command.install import install
from setuptools.command.develop import develop
import subprocess
import sys


class PostInstallCommand(install):
"""Post-installation for installation mode."""
def run(self):
install.run(self)
print("\n" + "="*60)
print("Generating AiiDA DAG files...")
print("="*60)
subprocess.check_call([sys.executable, "scripts/generate_dags.py"])
print("="*60 + "\n")


class PostDevelopCommand(develop):
"""Post-installation for development mode."""
def run(self):
develop.run(self)
print("\n" + "="*60)
print("Generating AiiDA DAG files...")
print("="*60)
subprocess.check_call([sys.executable, "scripts/generate_dags.py"])
print("="*60 + "\n")


setup(
cmdclass={
'install': PostInstallCommand,
'develop': PostDevelopCommand,
},
)
Loading