diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab17e67 --- /dev/null +++ b/.gitignore @@ -0,0 +1,78 @@ +airflow.cfg +storage +local_workdir +remote_workdir +logs +*.db +simple_auth_manager_passwords.json.generated +airflow.db-shm +airflow.db-wal +aiida-quantumespresso +scripts/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + diff --git a/AUTO_DAG_SYSTEM.md b/AUTO_DAG_SYSTEM.md new file mode 100644 index 0000000..e48be0e --- /dev/null +++ b/AUTO_DAG_SYSTEM.md @@ -0,0 +1,80 @@ +# Automatic DAG Generation System + +## Overview + +System that generates **physical DAG files** for all AiiDA plugins during installation. + +**Result**: 30+ AiiDA plugins → 30+ physical DAG `.py` files + +## Installation & Generation + +### Generate DAG Files + +```bash +# Run this after installing new AiiDA plugins +python3 scripts/generate_dags.py +``` + +This creates physical `.py` files in: `src/airflow_provider_aiida/auto_generated_dags/` + +### Install Package + +```bash +pip install -e . +``` + +## Files + +### Generated (30+ files) +``` +src/airflow_provider_aiida/auto_generated_dags/ +├── __init__.py +├── aiida_calcjob_core_arithmetic_add.py +├── aiida_calcjob_core_stash.py +├── aiida_workchain_core_arithmetic_multiply_add.py +└── ... (27+ more) +``` + +### Source Files +``` +scripts/generate_dags.py # Generator script +pyproject.toml # Entry point configuration +``` + +## Usage + +### After Installing New AiiDA Plugin + +```bash +# 1. Install new plugin +pip install aiida-new-plugin + +# 2. Regenerate DAG files +python3 scripts/generate_dags.py + +# 3. Restart Airflow +``` + +### Trigger a DAG + +```python +from aiida import load_profile +from aiida.orm import load_code, Int +from aiida.calculations.arithmetic.add import ArithmeticAddCalculation + +load_profile() +code = load_code('bash@localhost') +inputs = {'code': code, 'x': Int(5), 'y': Int(3)} +process = ArithmeticAddCalculation(inputs=inputs) +process._save_checkpoint() + +# Trigger: airflow dags trigger aiida_calcjob_core_arithmetic_add \ +# --conf '{"node_pk": }' +``` + +## Benefits + +✅ **Fast**: DAGs pre-generated, no startup penalty +✅ **Persistent**: Physical files on disk +✅ **Maintainable**: Regenerate when needed +✅ **Standard**: Normal Python files, standard Airflow workflow diff --git a/pyproject.toml b/pyproject.toml index a053b9c..dc73aef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ provider_info = "airflow_provider_aiida.__init__:get_provider_info" [project.entry-points."aiida.dags"] aiida-standard = "airflow_provider_aiida.example_dags" +aiida-auto = "airflow_provider_aiida.auto_generated_dags" [tool.hatch.version] path = "src/airflow_provider_aiida/__init__.py" diff --git a/scripts/generate_dags.py b/scripts/generate_dags.py new file mode 100755 index 0000000..0809a45 --- /dev/null +++ b/scripts/generate_dags.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Generate physical DAG files for all registered AiiDA plugins. +Run once during installation to create DAG files. +""" +import os +from pathlib import Path +from importlib_metadata import entry_points + + +def generate_calcjob_dag_file(entry_point_name: str, class_path: str, output_dir: Path): + """Generate a DAG file for a CalcJob.""" + dag_id = f"aiida_calcjob_{entry_point_name.replace('.', '_')}" + filename = f"{dag_id}.py" + + content = f'''"""Auto-generated DAG for AiiDA CalcJob: {entry_point_name}""" +from airflow import DAG +from airflow.models.param import Param +from airflow_provider_aiida.taskgroups.calcjob import CalcJobTaskGroup +from {class_path.rsplit('.', 1)[0]} import {class_path.rsplit('.', 1)[1]} + +with DAG( + dag_id='{dag_id}', + description='Auto-generated DAG for AiiDA CalcJob: {entry_point_name}', + params={{ + "node_pk": Param("", type="integer", description="AiiDA node PK to resume execution") + }}, + render_template_as_native_obj=True, + tags=['aiida', 'auto-generated', 'calcjob', '{entry_point_name}'], + catchup=False, +) as dag: + task = CalcJobTaskGroup( + process_class={class_path.rsplit('.', 1)[1]}, + node_pk="{{{{ params.node_pk }}}}", + ) +''' + + filepath = output_dir / filename + filepath.write_text(content) + return filename + + +def generate_workchain_dag_file(entry_point_name: str, class_path: str, output_dir: Path): + """Generate a DAG file for a WorkChain.""" + dag_id = f"aiida_workchain_{entry_point_name.replace('.', '_')}" + filename = f"{dag_id}.py" + + content = f'''"""Auto-generated DAG for AiiDA WorkChain: {entry_point_name}""" +from airflow import DAG +from airflow.models.param import Param +from airflow_provider_aiida.taskgroups.workchain import WorkChainTaskGroup +from {class_path.rsplit('.', 1)[0]} import {class_path.rsplit('.', 1)[1]} + +with DAG( + dag_id='{dag_id}', + description='Auto-generated DAG for AiiDA WorkChain: {entry_point_name}', + params={{ + "node_pk": Param("", type="integer", description="AiiDA node PK to resume execution") + }}, + render_template_as_native_obj=True, + tags=['aiida', 'auto-generated', 'workchain', '{entry_point_name}'], + catchup=False, +) as dag: + task = WorkChainTaskGroup( + process_class={class_path.rsplit('.', 1)[1]}, + node_pk="{{{{ params.node_pk }}}}", + ) +''' + + filepath = output_dir / filename + filepath.write_text(content) + return filename + + +def main(): + """Generate all DAG files.""" + # Output directory + package_root = Path(__file__).parent.parent + output_dir = package_root / "src" / "airflow_provider_aiida" / "auto_generated_dags" + output_dir.mkdir(exist_ok=True) + + print(f"Generating DAG files in: {output_dir}") + + generated_files = [] + + # Discover and generate CalcJob DAGs + eps = entry_points() + calc_eps = eps.select(group='aiida.calculations') + + for ep in calc_eps: + try: + plugin_class = ep.load() + # Skip if not a class (some are functions) + if not isinstance(plugin_class, type): + continue + + class_path = f"{plugin_class.__module__}.{plugin_class.__name__}" + filename = generate_calcjob_dag_file(ep.name, class_path, output_dir) + generated_files.append(filename) + print(f" ✓ Generated: {filename}") + except Exception as e: + print(f" ✗ Failed to generate DAG for {ep.name}: {e}") + + # Discover and generate WorkChain DAGs + workflow_eps = eps.select(group='aiida.workflows') + + for ep in workflow_eps: + try: + plugin_class = ep.load() + # Skip if not a class (some are functions) + if not isinstance(plugin_class, type): + continue + + class_path = f"{plugin_class.__module__}.{plugin_class.__name__}" + filename = generate_workchain_dag_file(ep.name, class_path, output_dir) + generated_files.append(filename) + print(f" ✓ Generated: {filename}") + except Exception as e: + print(f" ✗ Failed to generate DAG for {ep.name}: {e}") + + # Generate __init__.py to import all DAGs + init_content = '"""Auto-generated DAGs for AiiDA plugins."""\n\n' + init_content += "# Import all generated DAGs\n" + for filename in sorted(generated_files): + module_name = filename.replace('.py', '') + init_content += f"from .{module_name} import dag as {module_name}\n" + + init_content += "\n# Export all DAGs\n__all__ = [\n" + for filename in sorted(generated_files): + module_name = filename.replace('.py', '') + init_content += f" '{module_name}',\n" + init_content += "]\n" + + (output_dir / "__init__.py").write_text(init_content) + print(f"\n✅ Generated {len(generated_files)} DAG files + __init__.py") + return len(generated_files) + + +if __name__ == "__main__": + import sys + count = main() + sys.exit(0 if count > 0 else 1) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b07895d --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +"""Setup script to generate DAG files during installation.""" +from setuptools import setup +from setuptools.command.install import install +from setuptools.command.develop import develop +import subprocess +import sys + + +class PostInstallCommand(install): + """Post-installation for installation mode.""" + def run(self): + install.run(self) + print("\n" + "="*60) + print("Generating AiiDA DAG files...") + print("="*60) + subprocess.check_call([sys.executable, "scripts/generate_dags.py"]) + print("="*60 + "\n") + + +class PostDevelopCommand(develop): + """Post-installation for development mode.""" + def run(self): + develop.run(self) + print("\n" + "="*60) + print("Generating AiiDA DAG files...") + print("="*60) + subprocess.check_call([sys.executable, "scripts/generate_dags.py"]) + print("="*60 + "\n") + + +setup( + cmdclass={ + 'install': PostInstallCommand, + 'develop': PostDevelopCommand, + }, +) diff --git a/src/airflow_provider_aiida/auto_generated_dags/__init__.py b/src/airflow_provider_aiida/auto_generated_dags/__init__.py new file mode 100644 index 0000000..55fbb2e --- /dev/null +++ b/src/airflow_provider_aiida/auto_generated_dags/__init__.py @@ -0,0 +1,67 @@ +"""Auto-generated DAGs for AiiDA plugins.""" + +# Import all generated DAGs +from .aiida_calcjob_core_arithmetic_add import dag as aiida_calcjob_core_arithmetic_add +from .aiida_calcjob_core_stash import dag as aiida_calcjob_core_stash +from .aiida_calcjob_core_templatereplacer import dag as aiida_calcjob_core_templatereplacer +from .aiida_calcjob_core_transfer import dag as aiida_calcjob_core_transfer +from .aiida_calcjob_core_unstash import dag as aiida_calcjob_core_unstash +from .aiida_calcjob_quantumespresso_bands import dag as aiida_calcjob_quantumespresso_bands +from .aiida_calcjob_quantumespresso_cp import dag as aiida_calcjob_quantumespresso_cp +from .aiida_calcjob_quantumespresso_dos import dag as aiida_calcjob_quantumespresso_dos +from .aiida_calcjob_quantumespresso_matdyn import dag as aiida_calcjob_quantumespresso_matdyn +from .aiida_calcjob_quantumespresso_namelists import dag as aiida_calcjob_quantumespresso_namelists +from .aiida_calcjob_quantumespresso_neb import dag as aiida_calcjob_quantumespresso_neb +from .aiida_calcjob_quantumespresso_open_grid import dag as aiida_calcjob_quantumespresso_open_grid +from .aiida_calcjob_quantumespresso_ph import dag as aiida_calcjob_quantumespresso_ph +from .aiida_calcjob_quantumespresso_pp import dag as aiida_calcjob_quantumespresso_pp +from .aiida_calcjob_quantumespresso_projwfc import dag as aiida_calcjob_quantumespresso_projwfc +from .aiida_calcjob_quantumespresso_pw import dag as aiida_calcjob_quantumespresso_pw +from .aiida_calcjob_quantumespresso_pw2gw import dag as aiida_calcjob_quantumespresso_pw2gw +from .aiida_calcjob_quantumespresso_pw2wannier90 import dag as aiida_calcjob_quantumespresso_pw2wannier90 +from .aiida_calcjob_quantumespresso_pwimmigrant import dag as aiida_calcjob_quantumespresso_pwimmigrant +from .aiida_calcjob_quantumespresso_q2r import dag as aiida_calcjob_quantumespresso_q2r +from .aiida_workchain_core_arithmetic_multiply_add import dag as aiida_workchain_core_arithmetic_multiply_add +from .aiida_workchain_quantumespresso_bands_base import dag as aiida_workchain_quantumespresso_bands_base +from .aiida_workchain_quantumespresso_matdyn_base import dag as aiida_workchain_quantumespresso_matdyn_base +from .aiida_workchain_quantumespresso_neb_base import dag as aiida_workchain_quantumespresso_neb_base +from .aiida_workchain_quantumespresso_pdos import dag as aiida_workchain_quantumespresso_pdos +from .aiida_workchain_quantumespresso_ph_base import dag as aiida_workchain_quantumespresso_ph_base +from .aiida_workchain_quantumespresso_pw_bands import dag as aiida_workchain_quantumespresso_pw_bands +from .aiida_workchain_quantumespresso_pw_base import dag as aiida_workchain_quantumespresso_pw_base +from .aiida_workchain_quantumespresso_pw_relax import dag as aiida_workchain_quantumespresso_pw_relax +from .aiida_workchain_quantumespresso_q2r_base import dag as aiida_workchain_quantumespresso_q2r_base + +# Export all DAGs +__all__ = [ + 'aiida_calcjob_core_arithmetic_add', + 'aiida_calcjob_core_stash', + 'aiida_calcjob_core_templatereplacer', + 'aiida_calcjob_core_transfer', + 'aiida_calcjob_core_unstash', + 'aiida_calcjob_quantumespresso_bands', + 'aiida_calcjob_quantumespresso_cp', + 'aiida_calcjob_quantumespresso_dos', + 'aiida_calcjob_quantumespresso_matdyn', + 'aiida_calcjob_quantumespresso_namelists', + 'aiida_calcjob_quantumespresso_neb', + 'aiida_calcjob_quantumespresso_open_grid', + 'aiida_calcjob_quantumespresso_ph', + 'aiida_calcjob_quantumespresso_pp', + 'aiida_calcjob_quantumespresso_projwfc', + 'aiida_calcjob_quantumespresso_pw', + 'aiida_calcjob_quantumespresso_pw2gw', + 'aiida_calcjob_quantumespresso_pw2wannier90', + 'aiida_calcjob_quantumespresso_pwimmigrant', + 'aiida_calcjob_quantumespresso_q2r', + 'aiida_workchain_core_arithmetic_multiply_add', + 'aiida_workchain_quantumespresso_bands_base', + 'aiida_workchain_quantumespresso_matdyn_base', + 'aiida_workchain_quantumespresso_neb_base', + 'aiida_workchain_quantumespresso_pdos', + 'aiida_workchain_quantumespresso_ph_base', + 'aiida_workchain_quantumespresso_pw_bands', + 'aiida_workchain_quantumespresso_pw_base', + 'aiida_workchain_quantumespresso_pw_relax', + 'aiida_workchain_quantumespresso_q2r_base', +]