Skip to content

dagrun_operator in Airflow Version 1.10.10 ERRORS _run_raw_task result = task_copy.execute(context=context) #9197

@shanit-saha

Description

@shanit-saha

On Migrating Airflow from V1.10.2 to V1.10.10 One of our DAG have a task which is of dagrun_operator type.

Code snippet of the task looks something as below. Please assume that DAG dag_process_pos exists

task_trigger_dag_positional = TriggerDagRunOperator(
        trigger_dag_id="dag_process_pos",
        python_callable=set_up_dag_run_preprocessing,
        task_id="trigger_preprocess_dag",
        on_failure_callback=log_failure,
        execution_date=datetime.now(),
        provide_context=False,
        owner='airflow') 

def set_up_dag_run_preprocessing(context, dag_run_obj):
        ti = context['ti']
        dag_name = context['ti'].task.trigger_dag_id
        dag_run = context['dag_run']
        trans_id = dag_run.conf['transaction_id']
        routing_info = ti.xcom_pull(task_ids="json_validation", key="route_info")
        new_file_path = routing_info['file_location']
        new_file_name = os.path.basename(routing_info['new_file_name'])
        file_path = os.path.join(new_file_path, new_file_name)
        batch_id = "123-AD-FF"
        dag_run_obj.payload = {'inputfilepath': file_path,
                               'transaction_id': trans_id,
                               'Id': batch_id}

The DAG runs all fine. In fact the python callable of the task mentioned until the last line. Then it errors out.

[2020-06-09 11:36:22,838] {taskinstance.py:1145} ERROR - No row was found for one()
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 95, in execute
    replace_microseconds=False)
  File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 141, in trigger_dag
    replace_microseconds=replace_microseconds,
  File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 98, in _trigger_dag
    external_trigger=True,
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1471, in create_dagrun
    run.refresh_from_db()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dagrun.py", line 109, in refresh_from_db
    DR.run_id == self.run_id
  File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3446, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

After which the on_failure_callback of that task is executed and all code of that callable runs perfectly ok as is expected. The query here is why did the dagrun_operator fail after the python callable.

P.S : The DAG that is being triggered by the TriggerDagRunOperator , in this case dag_process_pos starts with task of typedummy_operator. Also that the target dag that is invoked actually gets triggered. However the the Airflow task that triggers the DAG fails. [ There is no Dag dependency in this case ]

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions