From 57def3be258a32de01ff362c02b00fba2416b130 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 2 Oct 2025 14:37:36 +0200 Subject: [PATCH 1/2] Add asynchronous version of calcjob taskgroup --- .../example_dags/async_arithmetic_add.py | 249 ++++++++++++++++++ .../operators/async_calcjob.py | 24 +- .../taskgroups/async_calcjob.py | 111 ++++++++ .../triggers/async_calcjob.py | 37 ++- 4 files changed, 414 insertions(+), 7 deletions(-) create mode 100644 src/airflow_provider_aiida/example_dags/async_arithmetic_add.py create mode 100644 src/airflow_provider_aiida/taskgroups/async_calcjob.py diff --git a/src/airflow_provider_aiida/example_dags/async_arithmetic_add.py b/src/airflow_provider_aiida/example_dags/async_arithmetic_add.py new file mode 100644 index 0000000..839f826 --- /dev/null +++ b/src/airflow_provider_aiida/example_dags/async_arithmetic_add.py @@ -0,0 +1,249 @@ +from pathlib import Path +from datetime import datetime +from typing import Dict, Any +from airflow import DAG +from airflow.decorators import task +from airflow.models.param import Param +from airflow_provider_aiida.taskgroups.async_calcjob import AsyncCalcJobTaskGroup + + +class AsyncAddJobTaskGroup(AsyncCalcJobTaskGroup): + """Async Addition job task group - uses deferrable operators""" + + def __init__(self, group_id: str, machine: str, local_workdir: str, remote_workdir: str, + x: int, y: int, sleep: int, **kwargs): + self.x = x + self.y = y + self.sleep = sleep + super().__init__(group_id, machine, local_workdir, remote_workdir, **kwargs) + + def prepare(self, **context) -> Dict[str, Any]: + """Prepare addition job inputs""" + to_upload_files = {} + + # Get rendered values from params + params = context.get('params', {}) + x = params.get('add_x', self.x) + y = params.get('add_y', self.y) + + submission_script = f""" +sleep {self.sleep} +echo "$(({x}+{y}))" > result.out + """ + + to_receive_files = {"result.out": "addition_result.txt"} + + # Push to XCom for the async calcjob operators to use + context['task_instance'].xcom_push(key='to_upload_files', value=to_upload_files) + context['task_instance'].xcom_push(key='submission_script', value=submission_script) + context['task_instance'].xcom_push(key='to_receive_files', value=to_receive_files) + + return { + "to_upload_files": to_upload_files, + "submission_script": submission_script, + "to_receive_files": to_receive_files + } + + def parse(self, local_workdir: str, **context) -> tuple[int, Dict[str, Any]]: + """Parse addition job results""" + to_receive_files = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.prepare', + key='to_receive_files' + ) + + results = {} + exit_status = 0 # Start with success + + try: + for file_key, received_file in to_receive_files.items(): + file_path = Path(local_workdir) / Path(received_file) + if not file_path.exists(): + print(f"ERROR: Expected file {received_file} not found") + exit_status = 1 + continue + + result_content = file_path.read_text().strip() + print(f"Addition result ({self.x} + {self.y}): {result_content}") + results[file_key] = int(result_content) + + except Exception as e: + print(f"ERROR parsing results: {e}") + exit_status = 2 + + # Store both exit status and results in XCom + final_result = (exit_status, results) + context['task_instance'].xcom_push(key='final_result', value=final_result) + return final_result + + +class AsyncMultiplyJobTaskGroup(AsyncCalcJobTaskGroup): + """Async Multiplication job task group - uses deferrable operators""" + + def __init__(self, group_id: str, machine: str, local_workdir: str, remote_workdir: str, + x: int, y: int, sleep: int, **kwargs): + self.x = x + self.y = y + self.sleep = sleep + super().__init__(group_id, machine, local_workdir, remote_workdir, **kwargs) + + def prepare(self, **context) -> Dict[str, Any]: + """Prepare multiplication job inputs""" + to_upload_files = {} + + # Get rendered values from params + params = context.get('params', {}) + x = params.get('multiply_x', self.x) + y = params.get('multiply_y', self.y) + + submission_script = f""" +sleep {self.sleep} +echo "$(({x}*{y}))" > multiply_result.out +echo "Operation: {x} * {y}" > operation.log + """ + + to_receive_files = { + "multiply_result.out": "multiply_result.txt", + "operation.log": "operation.log" + } + + # Push to XCom + context['task_instance'].xcom_push(key='to_upload_files', value=to_upload_files) + context['task_instance'].xcom_push(key='submission_script', value=submission_script) + context['task_instance'].xcom_push(key='to_receive_files', value=to_receive_files) + + return { + "to_upload_files": to_upload_files, + "submission_script": submission_script, + "to_receive_files": to_receive_files + } + + def parse(self, local_workdir: str, **context) -> tuple[int, Dict[str, Any]]: + """Parse multiplication job results""" + to_receive_files = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.prepare', + key='to_receive_files' + ) + + results = {} + exit_status = 0 # Start with success + + try: + for file_key, received_file in to_receive_files.items(): + file_path = Path(local_workdir) / Path(received_file) + if not file_path.exists(): + print(f"ERROR: Expected file {received_file} not found") + exit_status = 1 + continue + + content = file_path.read_text().strip() + print(f"File {file_key}: {content}") + if file_key == "multiply_result.out": + results['result'] = int(content) + else: + results['log'] = content + + if 'result' not in results: + print("ERROR: No multiplication result found") + exit_status = 1 + + print(f"Multiplication result ({self.x} * {self.y}): {results.get('result', 'N/A')}") + + except Exception as e: + print(f"ERROR parsing results: {e}") + exit_status = 2 + + # Store both exit status and results in XCom + final_result = (exit_status, results) + context['task_instance'].xcom_push(key='final_result', value=final_result) + return final_result + + +# Create DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2025, 1, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, +} + +with DAG( + 'async_arithmetic_add_multiply', + default_args=default_args, + description='Async CalcJob TaskGroup for ArithmeticAddMultiply using deferrable operators', + schedule=None, + catchup=False, + tags=['arithmetics', 'calcjob', 'async', 'taskgroup'], + params={ + "machine": Param("localhost", type="string"), + "local_workdir": Param("/tmp/airflow/local_workdir", type="string"), + "remote_workdir": Param("/tmp/airflow/remote_workdir", type="string"), + "add_x": Param(10, type="integer", description="First operand for addition"), + "add_y": Param(5, type="integer", description="Second operand for addition"), + "multiply_x": Param(7, type="integer", description="First operand for multiplication"), + "multiply_y": Param(8, type="integer", description="Second operand for multiplication"), + } +) as dag: + + # Create async task groups - these will use deferrable operators + async_add_job = AsyncAddJobTaskGroup( + group_id="async_addition_job", + machine="{{ params.machine }}", + local_workdir="{{ params.local_workdir }}/async_addition_job", + remote_workdir="{{ params.remote_workdir }}/async_addition_job", + x="{{ params.add_x }}", + y="{{ params.add_y }}", + sleep=2, + ) + + async_multiply_job = AsyncMultiplyJobTaskGroup( + group_id="async_multiplication_job", + machine="{{ params.machine }}", + local_workdir="{{ params.local_workdir }}/async_multiplication_job", + remote_workdir="{{ params.remote_workdir }}/async_multiplication_job", + x="{{ params.multiply_x }}", + y="{{ params.multiply_y }}", + sleep=1, + ) + + @task + def combine_results(): + """Combine results from both async job types""" + from airflow.sdk import get_current_context + context = get_current_context() + task_instance = context['task_instance'] + + add_result = task_instance.xcom_pull( + task_ids='async_addition_job.parse', + key='final_result' + ) + multiply_result = task_instance.xcom_pull( + task_ids='async_multiplication_job.parse', + key='final_result' + ) + + # Unpack tuples (exit_status, results) + add_exit_status, add_data = add_result + multiply_exit_status, multiply_data = multiply_result + + combined = { + 'addition': { + 'exit_status': add_exit_status, + 'success': add_exit_status == 0, + 'data': add_data + }, + 'multiplication': { + 'exit_status': multiply_exit_status, + 'success': multiply_exit_status == 0, + 'data': multiply_data + }, + 'overall_success': add_exit_status == 0 and multiply_exit_status == 0 + } + + print(f"Combined async results: {combined}") + return combined + + # Direct usage - async task groups ARE TaskGroups! + combine_task = combine_results() + [async_add_job, async_multiply_job] >> combine_task diff --git a/src/airflow_provider_aiida/operators/async_calcjob.py b/src/airflow_provider_aiida/operators/async_calcjob.py index 2bc847a..4ddecef 100644 --- a/src/airflow_provider_aiida/operators/async_calcjob.py +++ b/src/airflow_provider_aiida/operators/async_calcjob.py @@ -27,7 +27,13 @@ def execute(self, context: Context): # Pull to_upload_files from XCom if it's empty to_upload_files = self.to_upload_files if not to_upload_files: - to_upload_files = context['task_instance'].xcom_pull(task_ids='prepare', key='to_upload_files') + # Get the task group ID to construct the correct task_id + task_group_id = self.task_group.group_id if self.task_group else None + prepare_task_id = f'{task_group_id}.prepare' if task_group_id else 'prepare' + to_upload_files = context['task_instance'].xcom_pull(task_ids=prepare_task_id, key='to_upload_files') + + if to_upload_files is None: + raise ValueError("to_upload_files cannot be None. Either provide it as a parameter or ensure it's available in XCom.") self.defer( trigger=UploadTrigger( @@ -59,7 +65,13 @@ def execute(self, context: Context): # Pull submission_script from XCom if it's empty submission_script = self.submission_script if not submission_script: - submission_script = context['task_instance'].xcom_pull(task_ids='prepare', key='submission_script') + # Get the task group ID to construct the correct task_id + task_group_id = self.task_group.group_id if self.task_group else None + prepare_task_id = f'{task_group_id}.prepare' if task_group_id else 'prepare' + submission_script = context['task_instance'].xcom_pull(task_ids=prepare_task_id, key='submission_script') + + if submission_script is None: + raise ValueError("submission_script cannot be None. Either provide it as a parameter or ensure it's available in XCom.") self.defer( trigger=SubmitTrigger( @@ -123,7 +135,13 @@ def execute(self, context: Context): # Pull to_receive_files from XCom if it's empty to_receive_files = self.to_receive_files if not to_receive_files: - to_receive_files = context['task_instance'].xcom_pull(task_ids='prepare', key='to_receive_files') + # Get the task group ID to construct the correct task_id + task_group_id = self.task_group.group_id if self.task_group else None + prepare_task_id = f'{task_group_id}.prepare' if task_group_id else 'prepare' + to_receive_files = context['task_instance'].xcom_pull(task_ids=prepare_task_id, key='to_receive_files') + + if to_receive_files is None: + raise ValueError("to_receive_files cannot be None. Either provide it as a parameter or ensure it's available in XCom.") self.defer( trigger=ReceiveTrigger( diff --git a/src/airflow_provider_aiida/taskgroups/async_calcjob.py b/src/airflow_provider_aiida/taskgroups/async_calcjob.py new file mode 100644 index 0000000..959d115 --- /dev/null +++ b/src/airflow_provider_aiida/taskgroups/async_calcjob.py @@ -0,0 +1,111 @@ +""" +Async CalcJob TaskGroup using Deferrable Operators + +This demonstrates using deferrable operators with triggers for non-blocking execution. +Tasks defer to the triggerer process instead of blocking worker threads. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any + +from airflow.utils.task_group import TaskGroup +from airflow.operators.python import PythonOperator + +from airflow_provider_aiida.operators.async_calcjob import ( + AsyncUploadOperator, + AsyncSubmitOperator, + AsyncUpdateOperator, + AsyncReceiveOperator, +) + + +class AsyncCalcJobTaskGroup(TaskGroup, ABC): + """ + Abstract TaskGroup for async CalcJob workflows using deferrable operators. + + This version uses triggers to avoid blocking worker threads, allowing + better resource utilization for long-running jobs. + + Subclasses must implement prepare() and parse() methods. + """ + + def __init__(self, group_id: str, machine: str, local_workdir: str, remote_workdir: str, **kwargs): + super().__init__(group_id=group_id, **kwargs) + self.machine = machine + self.local_workdir = local_workdir + self.remote_workdir = remote_workdir + + # Build the task group when instantiated + self._build_tasks() + + def _build_tasks(self): + """Build all tasks within this task group""" + + # Create prepare task using the abstract method + prepare_task = PythonOperator( + task_id='prepare', + python_callable=self.prepare, + task_group=self, + ) + + # Create the async calcjob workflow using deferrable operators + upload_op = AsyncUploadOperator( + task_id="upload", + machine=self.machine, + local_workdir=self.local_workdir, + remote_workdir=self.remote_workdir, + to_upload_files={}, # Will be pulled from XCom + task_group=self, + ) + + submit_op = AsyncSubmitOperator( + task_id="submit", + machine=self.machine, + local_workdir=self.local_workdir, + remote_workdir=self.remote_workdir, + submission_script="", # Will be pulled from XCom + task_group=self, + ) + + update_op = AsyncUpdateOperator( + task_id="update", + machine=self.machine, + job_id=submit_op.output, + task_group=self, + ) + + receive_op = AsyncReceiveOperator( + task_id="receive", + machine=self.machine, + local_workdir=self.local_workdir, + remote_workdir=self.remote_workdir, + to_receive_files={}, # Will be pulled from XCom + task_group=self, + ) + + # Create parse task using the abstract method + parse_task = PythonOperator( + task_id='parse', + python_callable=self.parse, + op_kwargs={'local_workdir': self.local_workdir}, + task_group=self, + ) + + # Set up dependencies within the task group + prepare_task >> upload_op >> submit_op >> update_op >> receive_op >> parse_task + + @abstractmethod + def prepare(self, **context) -> Dict[str, Any]: + """Abstract method to prepare job inputs""" + pass + + @abstractmethod + def parse(self, local_workdir: str, **context) -> tuple[int, Dict[str, Any]]: + """Abstract method to parse job outputs + + Returns: + tuple[int, Dict[str, Any]]: (exit_status, results) + exit_status: 0 for success, non-zero for failure/error + results: Dictionary containing parsed results + """ + pass diff --git a/src/airflow_provider_aiida/triggers/async_calcjob.py b/src/airflow_provider_aiida/triggers/async_calcjob.py index a4df454..6445cc5 100644 --- a/src/airflow_provider_aiida/triggers/async_calcjob.py +++ b/src/airflow_provider_aiida/triggers/async_calcjob.py @@ -1,4 +1,5 @@ import asyncio +import logging from datetime import timedelta from pathlib import Path from typing import Any, AsyncIterator @@ -9,6 +10,8 @@ get_authinfo_cached, ) +logger = logging.getLogger(__name__) + class UploadTrigger(BaseTrigger): def __init__( @@ -46,6 +49,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: with transport_queue.request_transport(authinfo) as request: connection = await request + # Create remote workdir if it doesn't exist + connection.makedirs(str(remote_workdir), ignore_existing=True) for localpath, remotepath in self.to_upload_files.items(): connection.putfile( Path(localpath).absolute(), @@ -87,6 +92,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: local_workdir = Path(self.local_workdir) remote_workdir = Path(self.remote_workdir) + # Create local workdir if it doesn't exist + local_workdir.mkdir(parents=True, exist_ok=True) + submission_script_path = local_workdir / Path("submit.sh") submission_script_path.write_text(self.submission_script) @@ -94,13 +102,15 @@ async def run(self) -> AsyncIterator[TriggerEvent]: authinfo = get_authinfo_cached(self.machine or "localhost") with transport_queue.request_transport(authinfo) as request: connection = await request + # Create remote workdir if it doesn't exist + connection.makedirs(str(remote_workdir), ignore_existing=True) connection.putfile( submission_script_path, remote_workdir / "submit.sh" ) exit_code, stdout, stderr = connection.exec_command_wait( - f"(bash {submission_script_path} > /dev/null 2>&1 & echo $!) &", - workdir=remote_workdir + f"(bash submit.sh > /dev/null 2>&1 & echo $!) &", + workdir=str(remote_workdir) ) if exit_code != 0: @@ -195,14 +205,33 @@ async def run(self) -> AsyncIterator[TriggerEvent]: local_workdir = Path(self.local_workdir) remote_workdir = Path(self.remote_workdir) + # Create local workdir if it doesn't exist + local_workdir.mkdir(parents=True, exist_ok=True) + transport_queue = get_transport_queue() authinfo = get_authinfo_cached(self.machine or "localhost") with transport_queue.request_transport(authinfo) as request: connection = await request for remotepath, localpath in self.to_receive_files.items(): + remote_file = remote_workdir / Path(remotepath) + local_file = local_workdir / Path(localpath) + logger.info(f"ReceiveTrigger: Attempting to retrieve {remote_file} -> {local_file}") + # Check if remote file exists + try: + if not connection.isfile(str(remote_file)): + logger.error(f"ReceiveTrigger: Remote file does not exist: {remote_file}") + # List remote directory contents for debugging + try: + remote_files = connection.listdir(str(remote_workdir)) + logger.info(f"ReceiveTrigger: Remote workdir contents: {remote_files}") + except Exception as e: + logger.error(f"ReceiveTrigger: Could not list remote workdir: {e}") + except Exception as e: + logger.error(f"ReceiveTrigger: Could not check if file exists: {e}") + connection.getfile( - remote_workdir / Path(remotepath), - local_workdir / Path(localpath) + remote_file, + local_file ) yield TriggerEvent({"status": "success"}) From 5c7e55b80128c865ee8edae1f5b8580cba414622 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Thu, 2 Oct 2025 16:11:53 +0200 Subject: [PATCH 2/2] aiida calcjob using aiida tasks --- .../aiida_example_arithmetic_calcjob.py | 426 ++++++++ .../taskgroups/aiida_calcjob.py | 912 ++++++++++++++++++ 2 files changed, 1338 insertions(+) create mode 100644 src/airflow_provider_aiida/example_dags/aiida_example_arithmetic_calcjob.py create mode 100644 src/airflow_provider_aiida/taskgroups/aiida_calcjob.py diff --git a/src/airflow_provider_aiida/example_dags/aiida_example_arithmetic_calcjob.py b/src/airflow_provider_aiida/example_dags/aiida_example_arithmetic_calcjob.py new file mode 100644 index 0000000..15e8e8b --- /dev/null +++ b/src/airflow_provider_aiida/example_dags/aiida_example_arithmetic_calcjob.py @@ -0,0 +1,426 @@ +""" +Example DAG using AiiDA-style ArithmeticAddCalculation CalcJob. + +This demonstrates how to use the CalcJob TaskGroup interface in an Airflow DAG. +It mimics the interface of aiida.calculations.arithmetic.add.ArithmeticAddCalculation. +""" + +from datetime import datetime +from pathlib import Path +from typing import Optional +import io + +from airflow import DAG +from airflow.decorators import task +from airflow.models.param import Param + +from airflow_provider_aiida.taskgroups.aiida_calcjob import ( + CalcJob, + CalcInfo, + CodeInfo, + ExitCode, + Folder, +) + + +class ArithmeticAddCalculation(CalcJob): + """ + CalcJob to add two numbers using bash. + + This CalcJob: + 1. Creates a bash script that adds two numbers + 2. Executes the script + 3. Parses the output to extract the sum + 4. Validates the result + """ + + # Custom exit codes + ERROR_READING_OUTPUT_FILE = ExitCode( + 310, + "The output file could not be read.", + invalidates_cache=True + ) + ERROR_INVALID_OUTPUT = ExitCode( + 320, + "The output file contains invalid output.", + invalidates_cache=True + ) + ERROR_NEGATIVE_NUMBER = ExitCode( + 410, + "The sum of the operands is a negative number." + ) + + def __init__( + self, + group_id: str, + computer: str, + x: int, + y: int, + sleep: int = 0, + input_filename: str = 'aiida.in', + output_filename: str = 'aiida.out', + **kwargs + ): + super().__init__( + group_id=group_id, + computer=computer, + metadata={ + 'options': { + 'input_filename': input_filename, + 'output_filename': output_filename, + 'resources': {'num_machines': 1, 'num_mpiprocs_per_machine': 1}, + } + }, + **kwargs + ) + + self.x = x + self.y = y + self.sleep = sleep + self.input_filename = input_filename + self.output_filename = output_filename + + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + """ + Prepare the calculation for submission. + + Creates a bash script that: + 1. Optionally sleeps for specified time + 2. Echoes the sum of x and y + """ + script_content = "" + + if self.sleep > 0: + script_content += f"sleep {self.sleep}\n" + + script_content += f"echo $(({self.x} + {self.y}))\n" + + # Write input script + folder.create_file_from_filelike( + io.StringIO(script_content), + self.input_filename, + mode='w', + encoding='utf8' + ) + + # Create CodeInfo + codeinfo = CodeInfo() + codeinfo.stdin_name = self.input_filename + codeinfo.stdout_name = self.output_filename + codeinfo.cmdline_params = ['bash'] + + # Create CalcInfo + calcinfo = CalcInfo() + calcinfo.codes_info = [codeinfo] + calcinfo.local_copy_list = [ + ('', folder.get_abs_path(self.input_filename), self.input_filename) + ] + calcinfo.retrieve_list = [self.output_filename] + + return calcinfo + + def parse(self, retrieved_temporary_folder: str) -> Optional[ExitCode]: + """ + Parse the calculation output. + + Reads the output file and extracts the sum. + """ + output_file = Path(retrieved_temporary_folder) / self.output_filename + + if not output_file.exists(): + print(f"ERROR: Output file {self.output_filename} not found") + return self.ERROR_READING_OUTPUT_FILE + + try: + output_content = output_file.read_text().strip() + print(f"Raw output: {output_content}") + except Exception as e: + print(f"ERROR reading output file: {e}") + return self.ERROR_READING_OUTPUT_FILE + + try: + result = int(output_content) + print(f"Parsed result: {self.x} + {self.y} = {result}") + except ValueError: + print(f"ERROR: Invalid output, expected integer but got: {output_content}") + return self.ERROR_INVALID_OUTPUT + + if result < 0: + print(f"WARNING: Result is negative: {result}") + return self.ERROR_NEGATIVE_NUMBER + + # Store output + self.outputs['sum'] = result + + print(f"SUCCESS: {self.x} + {self.y} = {result}") + return ExitCode(0, "Calculation completed successfully") + + +class ArithmeticMultiplyCalculation(CalcJob): + """ + CalcJob to multiply two numbers using bash. + + Similar to ArithmeticAddCalculation but performs multiplication. + """ + + ERROR_READING_OUTPUT_FILE = ExitCode(310, "The output file could not be read.", invalidates_cache=True) + ERROR_INVALID_OUTPUT = ExitCode(320, "The output file contains invalid output.", invalidates_cache=True) + + def __init__( + self, + group_id: str, + computer: str, + x: int, + y: int, + sleep: int = 0, + input_filename: str = 'multiply.in', + output_filename: str = 'multiply.out', + **kwargs + ): + super().__init__( + group_id=group_id, + computer=computer, + metadata={ + 'options': { + 'input_filename': input_filename, + 'output_filename': output_filename, + } + }, + **kwargs + ) + + self.x = x + self.y = y + self.sleep = sleep + self.input_filename = input_filename + self.output_filename = output_filename + + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + """Prepare multiplication calculation.""" + script_content = "" + + if self.sleep > 0: + script_content += f"sleep {self.sleep}\n" + + script_content += f"echo $(({self.x} * {self.y}))\n" + + folder.create_file_from_filelike( + io.StringIO(script_content), + self.input_filename, + mode='w', + encoding='utf8' + ) + + codeinfo = CodeInfo() + codeinfo.stdin_name = self.input_filename + codeinfo.stdout_name = self.output_filename + codeinfo.cmdline_params = ['bash'] + + calcinfo = CalcInfo() + calcinfo.codes_info = [codeinfo] + calcinfo.local_copy_list = [ + ('', folder.get_abs_path(self.input_filename), self.input_filename) + ] + calcinfo.retrieve_list = [self.output_filename] + + return calcinfo + + def parse(self, retrieved_temporary_folder: str) -> Optional[ExitCode]: + """Parse multiplication results.""" + output_file = Path(retrieved_temporary_folder) / self.output_filename + + if not output_file.exists(): + print(f"ERROR: Output file {self.output_filename} not found") + return self.ERROR_READING_OUTPUT_FILE + + try: + output_content = output_file.read_text().strip() + result = int(output_content) + print(f"Multiplication result: {self.x} * {self.y} = {result}") + + self.outputs['product'] = result + + return ExitCode(0, "Multiplication completed successfully") + + except ValueError: + print(f"ERROR: Invalid output") + return self.ERROR_INVALID_OUTPUT + except Exception as e: + print(f"ERROR reading output: {e}") + return self.ERROR_READING_OUTPUT_FILE + + +# Create DAG +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime(2025, 1, 1), + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 0, +} + +with DAG( + 'aiida_example_arithmetic_calcjob', + default_args=default_args, + description='Example DAG using AiiDA-style CalcJob TaskGroups for arithmetic operations', + schedule=None, + catchup=False, + tags=['example', 'aiida', 'calcjob', 'arithmetic'], + params={ + "computer": Param( + "localhost", + type="string", + description="Computer to run calculations on" + ), + "add_x": Param( + 10, + type="integer", + description="First operand for addition" + ), + "add_y": Param( + 5, + type="integer", + description="Second operand for addition" + ), + "multiply_x": Param( + 7, + type="integer", + description="First operand for multiplication" + ), + "multiply_y": Param( + 8, + type="integer", + description="Second operand for multiplication" + ), + } +) as dag: + + # Create ArithmeticAddCalculation + # This TaskGroup will automatically create tasks: upload -> submit -> update -> retrieve -> parse + add_calc = ArithmeticAddCalculation( + group_id="addition_calc", + computer="{{ params.computer }}", + x="{{ params.add_x }}", + y="{{ params.add_y }}", + sleep=2, # Simulate some computation time + ) + + # Create ArithmeticMultiplyCalculation + multiply_calc = ArithmeticMultiplyCalculation( + group_id="multiplication_calc", + computer="{{ params.computer }}", + x="{{ params.multiply_x }}", + y="{{ params.multiply_y }}", + sleep=1, + ) + + @task + def combine_and_validate(**context): + """ + Combine and validate results from both calculations. + + This task: + 1. Retrieves results from both CalcJobs + 2. Validates they completed successfully + 3. Combines the results + 4. Returns final output + """ + task_instance = context['task_instance'] + + # Get results from addition calc + add_result = task_instance.xcom_pull( + task_ids='addition_calc.parse', + key='return_value' + ) + + # Get results from multiplication calc + multiply_result = task_instance.xcom_pull( + task_ids='multiplication_calc.parse', + key='return_value' + ) + + print(f"Addition result: {add_result}") + print(f"Multiplication result: {multiply_result}") + + # Validate both succeeded + add_success = add_result and add_result.get('exit_status', 1) == 0 + multiply_success = multiply_result and multiply_result.get('exit_status', 1) == 0 + + if not add_success: + raise ValueError(f"Addition calculation failed: {add_result}") + + if not multiply_success: + raise ValueError(f"Multiplication calculation failed: {multiply_result}") + + # Combine results + combined = { + 'addition': { + 'success': add_success, + 'exit_status': add_result.get('exit_status'), + 'message': add_result.get('exit_message'), + }, + 'multiplication': { + 'success': multiply_success, + 'exit_status': multiply_result.get('exit_status'), + 'message': multiply_result.get('exit_message'), + }, + 'overall_success': True + } + + print(f"\n{'='*60}") + print(f"FINAL RESULTS:") + print(f" Addition: {add_result.get('exit_message', 'N/A')}") + print(f" Multiplication: {multiply_result.get('exit_message', 'N/A')}") + print(f" Overall: SUCCESS") + print(f"{'='*60}\n") + + return combined + + @task + def report_summary(**context): + """ + Generate a summary report of all calculations. + + This demonstrates accessing CalcJob outputs for reporting. + """ + task_instance = context['task_instance'] + + combined = task_instance.xcom_pull( + task_ids='combine_and_validate' + ) + + params = context['params'] + + report = f""" +╔══════════════════════════════════════════════════════════╗ +║ Arithmetic CalcJob Execution Report ║ +╚══════════════════════════════════════════════════════════╝ + +INPUT PARAMETERS: + Addition: {params['add_x']} + {params['add_y']} + Multiplication: {params['multiply_x']} × {params['multiply_y']} + +EXECUTION STATUS: + Addition: {combined['addition']['message']} + Multiplication: {combined['multiplication']['message']} + +OVERALL STATUS: ✓ All calculations completed successfully + +═══════════════════════════════════════════════════════════ + """ + + print(report) + + return { + 'report': report, + 'timestamp': datetime.now().isoformat(), + 'status': 'SUCCESS' + } + + # Set up task dependencies + # Both CalcJobs run in parallel, then results are combined and reported + combine_task = combine_and_validate() + report_task = report_summary() + + [add_calc, multiply_calc] >> combine_task >> report_task diff --git a/src/airflow_provider_aiida/taskgroups/aiida_calcjob.py b/src/airflow_provider_aiida/taskgroups/aiida_calcjob.py new file mode 100644 index 0000000..3c3e0c0 --- /dev/null +++ b/src/airflow_provider_aiida/taskgroups/aiida_calcjob.py @@ -0,0 +1,912 @@ +""" +CalcJob TaskGroup - Airflow implementation of AiiDA CalcJob interface. + +This module provides a TaskGroup-based implementation that mimics the AiiDA CalcJob interface, +allowing CalcJob-style workflows to be executed in Airflow. + +The interface closely follows: +- aiida.engine.processes.calcjobs.calcjob.CalcJob +- aiida.engine.processes.calcjobs.tasks (upload, submit, update, retrieve) + +Uses actual AiiDA task functions from aiida.engine.processes.calcjobs.tasks +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Any, Optional, List, Union +import tempfile +import asyncio +import sys + +# Add aiida-core to path +sys.path.insert(0, '/Users/alexgo/code/airflow-provider-aiida/aiida-core-airflow-dev/src') + +from airflow.utils.task_group import TaskGroup +from airflow.operators.python import PythonOperator + +# Import AiiDA task functions +try: + from aiida.engine.processes.calcjobs import tasks as aiida_tasks + from aiida.engine.transports import TransportQueue + from aiida.engine.utils import InterruptableFuture + from aiida.orm import CalcJobNode, load_node + from aiida.common.folders import SandboxFolder + AIIDA_AVAILABLE = True +except ImportError as e: + print(f"Warning: AiiDA not available: {e}") + AIIDA_AVAILABLE = False + + +@dataclass +class CodeInfo: + """ + Information about a code to execute. + + Mimics aiida.common.datastructures.CodeInfo from AiiDA. + """ + code_uuid: Optional[str] = None + cmdline_params: List[str] = field(default_factory=list) + stdin_name: Optional[str] = None + stdout_name: Optional[str] = None + stderr_name: Optional[str] = None + join_files: bool = False + withmpi: Optional[bool] = None + + +@dataclass +class CalcInfo: + """ + Information needed to execute a calculation job. + + Mimics aiida.common.datastructures.CalcInfo from AiiDA. + Returned by prepare_for_submission() to specify how the job should be executed. + """ + # UUID of the calculation (set automatically) + uuid: Optional[str] = None + + # List of codes to run: List[CodeInfo] + codes_info: List[CodeInfo] = field(default_factory=list) + + # Files to copy from local to remote: List[Tuple[local_abs_path, remote_rel_path, depth]] + local_copy_list: List[tuple[str, str, str]] = field(default_factory=list) + + # Files to copy from remote to remote: List[Tuple[remote_machine_uuid, remote_abs_path, dest_rel_path]] + remote_copy_list: List[tuple[str, str, str]] = field(default_factory=list) + + # Remote symlinks: List[Tuple[remote_machine_uuid, remote_abs_path, dest_rel_path]] + remote_symlink_list: List[tuple[str, str, str]] = field(default_factory=list) + + # Files to retrieve after job completion: List[remote_rel_path] + retrieve_list: List[str] = field(default_factory=list) + + # Files to retrieve temporarily (deleted after parsing): List[remote_rel_path] + retrieve_temporary_list: List[str] = field(default_factory=list) + + # Text to prepend to submission script + prepend_text: str = "" + + # Text to append to submission script + append_text: str = "" + + # Skip submission (dry run mode) + skip_submit: bool = False + + # Execution mode for multiple codes + codes_run_mode: Optional[str] = None + + # File copy operation order + file_copy_operation_order: Optional[List[Any]] = None + + +@dataclass +class ExitCode: + """ + Exit code for a calculation job. + + Mimics aiida.engine.processes.exit_code.ExitCode from AiiDA. + """ + status: int = 0 + message: str = "" + invalidates_cache: bool = False + + def __bool__(self): + """Exit code is truthy if status is non-zero (error).""" + return self.status != 0 + + +class Folder: + """ + Simple folder abstraction that mimics aiida.common.folders.Folder. + + Wraps a pathlib.Path to provide compatibility with AiiDA's Folder interface. + """ + def __init__(self, abspath: str): + self.abspath = abspath + self._path = Path(abspath) + + def get_abs_path(self, relpath: str = ".") -> str: + """Get absolute path of a file/folder within this folder.""" + return str(self._path / relpath) + + def create_file_from_filelike(self, filelike, filename: str, mode: str = 'w', encoding: str = 'utf8'): + """Create a file from a filelike object.""" + filepath = self._path / filename + if 'b' in mode: + with open(filepath, mode) as f: + f.write(filelike.read()) + else: + with open(filepath, mode, encoding=encoding) as f: + f.write(filelike.read()) + + def get_subfolder(self, subfolder: str, create: bool = False): + """Get a subfolder, optionally creating it.""" + subfolder_path = self._path / subfolder + if create: + subfolder_path.mkdir(parents=True, exist_ok=True) + return Folder(str(subfolder_path)) + + +class CalcJob(TaskGroup, ABC): + """ + Abstract TaskGroup for CalcJob workflows that mimics the AiiDA CalcJob interface. + + This class provides the same interface as aiida.engine.processes.calcjobs.calcjob.CalcJob: + - prepare_for_submission(folder): Prepare calculation inputs and return CalcInfo + - parse(retrieved_temporary_folder): Parse calculation outputs and return ExitCode + + The execution flow matches AiiDA's CalcJob lifecycle (see tasks.py): + 1. UPLOAD: presubmit() → prepare_for_submission() → upload files to remote + 2. SUBMIT: submit job to scheduler + 3. UPDATE: monitor job status until completion + 4. STASH: optionally stash files on remote + 5. RETRIEVE: retrieve output files from remote + 6. PARSE: parse() to extract results + + Key differences from fixed workdir approach: + - Workdir is created dynamically using UUID-based sharding (like AiiDA) + - Each task operates on the calculation node and transport + - No fixed local_workdir/remote_workdir parameters + + Metadata options (simplified from AiiDA's full metadata.options): + - computer: Computer/SSH connection to use + - resources: dict with scheduler resources + - max_wallclock_seconds: job time limit + - queue_name: scheduler queue + - account: scheduler account + - parser_name: parser plugin to use (optional) + - additional_retrieve_list: extra files to retrieve + + Exit codes (matching AiiDA): + - 0: Success + - 100: ERROR_NO_RETRIEVED_FOLDER + - 110: ERROR_SCHEDULER_OUT_OF_MEMORY + - 120: ERROR_SCHEDULER_OUT_OF_WALLTIME + + Example: + class ArithmeticAddCalcJob(CalcJob): + def __init__(self, x: int, y: int, **kwargs): + super().__init__(**kwargs) + self.x = x + self.y = y + + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + # Write input file + import io + folder.create_file_from_filelike( + io.StringIO(f'{self.x}\\n{self.y}'), + 'input.txt' + ) + + # Create code info + code_info = CodeInfo() + code_info.cmdline_params = ['bash', 'add.sh'] + code_info.stdout_name = 'output.txt' + + # Build CalcInfo + calc_info = CalcInfo() + calc_info.codes_info = [code_info] + calc_info.local_copy_list = [ + ('', folder.get_abs_path('input.txt'), 'input.txt') + ] + calc_info.retrieve_list = ['output.txt'] + + return calc_info + + def parse(self, retrieved_temporary_folder: str) -> Optional[ExitCode]: + # Read output + output_file = Path(retrieved_temporary_folder) / 'output.txt' + if not output_file.exists(): + return self.ERROR_NO_RETRIEVED_FOLDER + + result = int(output_file.read_text()) + # Store result... + + return ExitCode(0, "Success") + """ + + # Class variables (like AiiDA) + link_label_retrieved: str = 'retrieved' + CACHE_VERSION: Optional[int] = None + + # Exit codes (mimicking AiiDA's standard exit codes) + EXIT_CODE_SUCCESS = ExitCode(0, "") + ERROR_NO_RETRIEVED_FOLDER = ExitCode(100, "The process did not have the required `retrieved` output") + ERROR_SCHEDULER_OUT_OF_MEMORY = ExitCode(110, "The job ran out of memory") + ERROR_SCHEDULER_OUT_OF_WALLTIME = ExitCode(120, "The job ran out of walltime") + + def __init__( + self, + group_id: str, + computer: str, + code: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + inputs: Optional[Dict[str, Any]] = None, + **kwargs + ): + """ + Initialize the CalcJob task group. + + Args: + group_id: Unique identifier for this task group + computer: Computer/SSH connection ID (like AiiDA's Computer) + code: Optional code identifier/path + metadata: Optional metadata dict with options (resources, wallclock, queue, etc.) + inputs: Optional input data nodes + **kwargs: Additional TaskGroup arguments + """ + super().__init__(group_id=group_id, **kwargs) + self.computer = computer + self.code = code + self.metadata = metadata or {} + self.inputs = inputs or {} + + # Outputs will be populated during execution + self.outputs = {} + + # Remote folder will be set after upload + self.remote_folder = None + + # Build the task group when instantiated + self._build_tasks() + + def _build_tasks(self): + """ + Build all tasks within this task group. + + Follows AiiDA's state machine: UPLOAD → SUBMIT → UPDATE → RETRIEVE → PARSE + (STASH is optional and skipped in this simplified version) + """ + + # UPLOAD task - mimics task_upload_job from tasks.py + # Calls presubmit() which calls prepare_for_submission() + upload_task = PythonOperator( + task_id='upload', + python_callable=self._task_upload, + task_group=self, + ) + + # SUBMIT task - mimics task_submit_job from tasks.py + # Submits the job to the scheduler + submit_task = PythonOperator( + task_id='submit', + python_callable=self._task_submit, + task_group=self, + ) + + # UPDATE task - mimics task_update_job from tasks.py + # Monitors job status until completion + update_task = PythonOperator( + task_id='update', + python_callable=self._task_update, + task_group=self, + ) + + # RETRIEVE task - mimics task_retrieve_job from tasks.py + # Retrieves output files from remote + retrieve_task = PythonOperator( + task_id='retrieve', + python_callable=self._task_retrieve, + task_group=self, + ) + + # PARSE task - calls parse() to extract results + parse_task = PythonOperator( + task_id='parse', + python_callable=self._task_parse, + task_group=self, + ) + + # Set up dependencies matching AiiDA's state machine + upload_task >> submit_task >> update_task >> retrieve_task >> parse_task + + def _task_upload(self, **context) -> Dict[str, Any]: + """ + UPLOAD task - uses AiiDA's task_upload_job when available. + + This task: + 1. Creates a temporary sandbox folder + 2. Calls presubmit() which calls prepare_for_submission() + 3. Uploads files to remote machine using AiiDA task + 4. Creates remote working directory + + Returns: + Dict with remote_folder path and calc_info + """ + import uuid + + if AIIDA_AVAILABLE: + # Use actual AiiDA async task + return self._task_upload_aiida(**context) + else: + # Fallback to simplified implementation + return self._task_upload_simple(**context) + + def _task_upload_simple(self, **context) -> Dict[str, Any]: + """Simplified upload implementation without AiiDA.""" + import uuid + + # Generate UUID for this calculation + calc_uuid = str(uuid.uuid4()) + + # Create temporary sandbox folder (persists until system shutdown) + sandbox_path = tempfile.mkdtemp(prefix=f'calcjob_sandbox_{calc_uuid}_') + folder = Folder(sandbox_path) + + # Call presubmit (which calls prepare_for_submission) + calc_info = self.presubmit(folder) + calc_info.uuid = calc_uuid + + # Create remote workdir path (using UUID sharding like AiiDA) + remote_workdir = f"/remote/work/{calc_uuid[:2]}/{calc_uuid[2:4]}/{calc_uuid[4:]}" + self.remote_folder = remote_workdir + + # Push data to XCom for downstream tasks + result = { + 'calc_uuid': calc_uuid, + 'remote_folder': remote_workdir, + 'sandbox_path': sandbox_path, + 'calc_info': { + 'codes_info': [ + { + 'cmdline_params': code_info.cmdline_params, + 'stdin_name': code_info.stdin_name, + 'stdout_name': code_info.stdout_name, + 'stderr_name': code_info.stderr_name, + } + for code_info in calc_info.codes_info + ], + 'local_copy_list': calc_info.local_copy_list, + 'remote_copy_list': calc_info.remote_copy_list, + 'retrieve_list': calc_info.retrieve_list, + 'retrieve_temporary_list': calc_info.retrieve_temporary_list, + 'prepend_text': calc_info.prepend_text, + 'append_text': calc_info.append_text, + }, + 'skip_submit': calc_info.skip_submit, + } + + context['task_instance'].xcom_push(key='upload_result', value=result) + return result + + def _run_async_task(self, async_func): + """Helper to run an async function in an event loop.""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No event loop running, create one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(async_func) + finally: + loop.close() + else: + # Event loop already running, use it + return asyncio.ensure_future(async_func) + + def _create_process_adapter(self, node): + """Create an adapter object that looks like an AiiDA CalcJob process.""" + class ProcessAdapter: + """Adapter to make TaskGroup look like an AiiDA CalcJob process.""" + def __init__(self, calcjob_taskgroup, calcjob_node): + self.taskgroup = calcjob_taskgroup + self.node = calcjob_node + self._outputs = {} + + def presubmit(self, folder): + """Delegate to taskgroup's presubmit.""" + return self.taskgroup.presubmit(folder) + + def out(self, label, value): + """Store output (mimics plumpy's out method).""" + self._outputs[label] = value + # In AiiDA, this would create an output link + # For Airflow, we store it for later retrieval + print(f"Process output: {label} = {value}") + + return ProcessAdapter(self, node) + + def _task_upload_aiida(self, **context) -> Dict[str, Any]: + """Upload using actual AiiDA task_upload_job. + + This requires: + - AiiDA database to be configured and running + - Computer and AuthInfo setup in AiiDA + - CalcJobNode will be created and used + """ + from aiida.orm import load_computer + from aiida.manage import get_manager + + # Get or create the computer + try: + computer = load_computer(self.computer) + except Exception as e: + raise ValueError(f"Could not load computer '{self.computer}': {e}. " + "Make sure AiiDA is configured with: verdi computer setup") + + # Create a CalcJobNode for this calculation + node = CalcJobNode(computer=computer, process_type=self.__class__.__name__) + node.store() + + # Create process adapter + process_adapter = self._create_process_adapter(node) + + # Get transport queue from AiiDA manager + manager = get_manager() + transport_queue = manager.get_transport_queue() + + # Create cancellable future + cancellable = InterruptableFuture() + + # Run the async upload task in event loop + async def run_upload(): + return await aiida_tasks.task_upload_job( + process=process_adapter, + transport_queue=transport_queue, + cancellable=cancellable + ) + + # Execute the async task + skip_submit = self._run_async_task(run_upload()) + + # Extract results from the adapter + remote_folder = process_adapter._outputs.get('remote_folder') + + # Store results for XCom + result = { + 'node_pk': node.pk, + 'node_uuid': node.uuid, + 'remote_folder': remote_folder.get_remote_path() if remote_folder else None, + 'skip_submit': skip_submit, + } + + context['task_instance'].xcom_push(key='upload_result', value=result) + return result + + def _task_submit(self, **context) -> Dict[str, Any]: + """SUBMIT task - uses AiiDA's task_submit_job when available.""" + if AIIDA_AVAILABLE: + return self._task_submit_aiida(**context) + else: + return self._task_submit_simple(**context) + + def _task_submit_simple(self, **context) -> Dict[str, Any]: + """Simplified submit implementation without AiiDA.""" + upload_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.upload', + key='upload_result' + ) + + if upload_result.get('skip_submit', False): + return {'job_id': None, 'skipped': True} + + # Placeholder job submission + job_id = "12345" + result = {'job_id': job_id, 'skipped': False} + context['task_instance'].xcom_push(key='submit_result', value=result) + return result + + def _task_submit_aiida(self, **context) -> Dict[str, Any]: + """Submit using actual AiiDA task_submit_job.""" + from aiida.manage import get_manager + + # Get upload result + upload_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.upload', + key='upload_result' + ) + + if upload_result.get('skip_submit', False): + return {'job_id': None, 'skipped': True} + + # Load the node + node = load_node(upload_result['node_uuid']) + + # Get transport queue + manager = get_manager() + transport_queue = manager.get_transport_queue() + cancellable = InterruptableFuture() + + # Run async submit task + async def run_submit(): + return await aiida_tasks.task_submit_job( + node=node, + transport_queue=transport_queue, + cancellable=cancellable + ) + + job_id = self._run_async_task(run_submit()) + + result = {'job_id': job_id, 'node_uuid': upload_result['node_uuid']} + context['task_instance'].xcom_push(key='submit_result', value=result) + return result + + def _task_update(self, **context) -> Dict[str, Any]: + """UPDATE task - uses AiiDA's task_update_job when available.""" + if AIIDA_AVAILABLE: + return self._task_update_aiida(**context) + else: + return self._task_update_simple(**context) + + def _task_update_simple(self, **context) -> Dict[str, Any]: + """Simplified update implementation without AiiDA.""" + submit_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.submit', + key='submit_result' + ) + + if submit_result.get('skipped', False): + return {'job_done': True} + + # Placeholder: assume job is done + job_done = True + result = {'job_done': job_done} + context['task_instance'].xcom_push(key='update_result', value=result) + return result + + def _task_update_aiida(self, **context) -> Dict[str, Any]: + """Update using actual AiiDA task_update_job.""" + from aiida.manage import get_manager + + # Get submit result + submit_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.submit', + key='submit_result' + ) + + if submit_result.get('skipped', False): + return {'job_done': True} + + # Load the node + node = load_node(submit_result['node_uuid']) + + # Get job manager + manager = get_manager() + job_manager = manager.get_job_manager() + cancellable = InterruptableFuture() + + # Run async update task + async def run_update(): + return await aiida_tasks.task_update_job( + node=node, + job_manager=job_manager, + cancellable=cancellable + ) + + job_done = self._run_async_task(run_update()) + + result = {'job_done': job_done, 'node_uuid': submit_result['node_uuid']} + context['task_instance'].xcom_push(key='update_result', value=result) + return result + + def _task_retrieve(self, **context) -> Dict[str, Any]: + """RETRIEVE task - uses AiiDA's task_retrieve_job when available.""" + if AIIDA_AVAILABLE: + return self._task_retrieve_aiida(**context) + else: + return self._task_retrieve_simple(**context) + + def _task_retrieve_simple(self, **context) -> Dict[str, Any]: + """Simplified retrieve implementation without AiiDA.""" + import subprocess + + # Get upload result + upload_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.upload', + key='upload_result' + ) + + calc_info = upload_result['calc_info'] + sandbox_path = upload_result.get('sandbox_path') + + # Create temporary folder for retrieved files + retrieved_temp_folder = tempfile.mkdtemp(prefix='aiida_retrieved_') + + # Execute calculation locally from sandbox + if sandbox_path and Path(sandbox_path).exists(): + print(f"Executing calculation from sandbox: {sandbox_path}") + + for code_info_dict in calc_info['codes_info']: + stdin_name = code_info_dict.get('stdin_name') + stdout_name = code_info_dict.get('stdout_name') + + if stdin_name and stdout_name: + input_file = Path(sandbox_path) / stdin_name + output_file = Path(retrieved_temp_folder) / stdout_name + + if input_file.exists(): + input_content = input_file.read_text() + print(f"Input file {stdin_name} content: {input_content}") + + try: + with open(input_file, 'r') as stdin_f, open(output_file, 'w') as stdout_f: + result = subprocess.run( + ['bash'], + stdin=stdin_f, + stdout=stdout_f, + stderr=subprocess.PIPE, + check=True, + timeout=60 + ) + + if output_file.exists(): + content = output_file.read_text().strip() + print(f"✓ Output: '{content}'") + except Exception as e: + print(f"✗ Execution failed: {e}") + + result = { + 'retrieved_folder': retrieved_temp_folder, + 'retrieve_list': calc_info['retrieve_list'], + } + context['task_instance'].xcom_push(key='retrieve_result', value=result) + return result + + def _task_retrieve_aiida(self, **context) -> Dict[str, Any]: + """Retrieve using actual AiiDA task_retrieve_job.""" + from aiida.manage import get_manager + + # Get update result + update_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.update', + key='update_result' + ) + + # Load the node + node = load_node(update_result['node_uuid']) + + # Create process adapter + process_adapter = self._create_process_adapter(node) + + # Get transport queue + manager = get_manager() + transport_queue = manager.get_transport_queue() + cancellable = InterruptableFuture() + + # Create temporary folder for retrieved files + retrieved_temp_folder = tempfile.mkdtemp(prefix='aiida_retrieved_') + + # Run async retrieve task + async def run_retrieve(): + return await aiida_tasks.task_retrieve_job( + process=process_adapter, + transport_queue=transport_queue, + retrieved_temporary_folder=retrieved_temp_folder, + cancellable=cancellable + ) + + retrieved = self._run_async_task(run_retrieve()) + + result = { + 'retrieved_folder': retrieved_temp_folder, + 'node_uuid': update_result['node_uuid'], + 'retrieved': retrieved, + } + context['task_instance'].xcom_push(key='retrieve_result', value=result) + return result + + def _task_parse(self, **context) -> Dict[str, Any]: + """ + PARSE task - calls parse() to extract results. + + This task: + 1. Gets retrieved folder path from XCom + 2. Calls user's parse() method + 3. Processes exit code + 4. Cleans up retrieved temporary folder + + Returns: + Dict with exit status and message + """ + # Get retrieve result from XCom + retrieve_result = context['task_instance'].xcom_pull( + task_ids=f'{self.group_id}.retrieve', + key='retrieve_result' + ) + + retrieved_folder = retrieve_result['retrieved_folder'] + + try: + # Call user's parse method + exit_code = self.parse(retrieved_folder) + + # Process exit code (default to success if None returned) + if exit_code is None: + exit_code = self.EXIT_CODE_SUCCESS + + # Store in XCom for downstream tasks + result = { + 'exit_status': exit_code.status, + 'exit_message': exit_code.message, + } + + # If exit code indicates error, raise exception + if exit_code.status != 0: + raise ValueError(f"CalcJob failed with exit code {exit_code.status}: {exit_code.message}") + + return result + + finally: + # Clean up retrieved temporary folder (like AiiDA does) + import shutil + if Path(retrieved_folder).exists(): + shutil.rmtree(retrieved_folder, ignore_errors=True) + + def presubmit(self, folder: Folder) -> CalcInfo: + """ + Presubmit wrapper that calls prepare_for_submission. + + INTERFACE MATCHES: aiida.engine.processes.calcjobs.calcjob.CalcJob.presubmit() + + This method: + 1. Calls prepare_for_submission() to get CalcInfo + 2. Performs validation + 3. Generates submission script from codes_info + 4. Returns CalcInfo + + In AiiDA, this also creates the job template and submission script. + + Args: + folder: Temporary folder (SandboxFolder in AiiDA) + + Returns: + CalcInfo object + """ + # Call user's prepare_for_submission + calc_info = self.prepare_for_submission(folder) + + # Validate calc_info + if not isinstance(calc_info, CalcInfo): + raise TypeError(f"prepare_for_submission must return CalcInfo, got {type(calc_info)}") + + # In AiiDA, this also creates the submission script and stores it in folder + # We'll do a simplified version here + + return calc_info + + def parse_scheduler_output(self, retrieved_folder: str) -> Optional[ExitCode]: + """ + Parse the scheduler output files to detect common scheduler errors. + + INTERFACE MATCHES: aiida.engine.processes.calcjobs.calcjob.CalcJob.parse_scheduler_output() + + This checks for scheduler-level errors like out-of-memory, walltime exceeded, etc. + Override in subclasses to implement scheduler-specific parsing. + + Args: + retrieved_folder: Path to folder containing retrieved files + + Returns: + ExitCode if a scheduler error was detected, None otherwise + """ + # Default implementation - can be overridden by subclasses + # In AiiDA, this parses _scheduler-stdout.txt and _scheduler-stderr.txt + return None + + @abstractmethod + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + """ + Prepare the calculation for submission. + + INTERFACE MATCHES: aiida.engine.processes.calcjobs.calcjob.CalcJob.prepare_for_submission() + + This method should: + 1. Write input files to the folder + 2. Build and return a CalcInfo object describing: + - codes_info: List[CodeInfo] - commands to execute + - local_copy_list: List[Tuple] - files to upload from local + - remote_copy_list: List[Tuple] - files to copy from another remote + - retrieve_list: List[str] - files to retrieve after completion + - prepend_text: str - text to add before code execution + - append_text: str - text to add after code execution + + Args: + folder: Temporary folder on the local file system where input files should be written + (aiida.common.folders.Folder equivalent) + + Returns: + CalcInfo object containing calculation metadata + + Example: + def prepare_for_submission(self, folder: Folder) -> CalcInfo: + import io + + # Write input file using folder interface + folder.create_file_from_filelike( + io.StringIO(f'{self.x}\\n{self.y}'), + 'input.txt' + ) + + # Create code info + code_info = CodeInfo() + code_info.cmdline_params = ['bash', 'add.sh'] + code_info.stdout_name = 'output.txt' + + # Build CalcInfo + calc_info = CalcInfo() + calc_info.codes_info = [code_info] + calc_info.local_copy_list = [ + ('', folder.get_abs_path('input.txt'), 'input.txt') + ] + calc_info.retrieve_list = ['output.txt'] + + return calc_info + + Note: + This method is called automatically during the upload phase via presubmit(). + The folder is temporary and will be uploaded to the remote machine. + """ + raise NotImplementedError('Subclasses must implement prepare_for_submission') + + @abstractmethod + def parse(self, retrieved_temporary_folder: str) -> Optional[ExitCode]: + """ + Parse the retrieved calculation outputs. + + INTERFACE MATCHES: The parsing flow from aiida.engine.processes.calcjobs.calcjob.CalcJob + (specifically the parse method that calls parse_retrieved_output) + + This method should: + 1. Read output files from retrieved_temporary_folder + 2. Extract and validate results + 3. Store outputs (via self.outputs dict, XCom, or database) + 4. Return an ExitCode (None or ExitCode(0) for success, non-zero for errors) + + Args: + retrieved_temporary_folder: Absolute path to temporary folder containing retrieved files + (equivalent to the retrieved FolderData in AiiDA) + This folder will be automatically cleaned up after parsing + + Returns: + ExitCode object indicating success or failure: + - None or ExitCode(0) for success + - Non-zero status for errors + + Example: + def parse(self, retrieved_temporary_folder: str) -> Optional[ExitCode]: + from pathlib import Path + + output_file = Path(retrieved_temporary_folder) / 'output.txt' + + # Check if output exists + if not output_file.exists(): + return self.ERROR_NO_RETRIEVED_FOLDER + + # Parse output + try: + result = int(output_file.read_text().strip()) + except ValueError: + return ExitCode(200, "Failed to parse output") + + # Store result + self.outputs['result'] = result + + return ExitCode(0, "Success") + + Note: + This method is called automatically after files are retrieved. + The retrieved_temporary_folder contains all files specified in CalcInfo.retrieve_list. + The folder is automatically deleted after this method returns. + """ + raise NotImplementedError('Subclasses must implement parse')