Skip to content

Reschedule not working as expected with http_sensor #10082

@paramesh

Description

@paramesh

Apache Airflow version: 1.10.10

Environment:
Linux RHEL , Python 3.6 with Celery worker and Mysql/Redis.

What happened:
I am facing issues when trying to use Reschedule with the http_sensor.

The following works fine without issues.

task_http_sensor_check_1 = HttpSensor(
task_id='http_sensor_check_1',
http_conn_id='stub_default',
endpoint='/api/v1/products',
request_params={},
response_check=lambda response: "new185" in response.text,
poke_interval=5,
dag=dag)

If I just add reschedule to this, I see this just run into a continuous retry loop.

task_http_sensor_check_1 = HttpSensor(
    task_id='http_sensor_check_1',
    http_conn_id='stub_default',
    endpoint='/api/v1/products',
    request_params={},
    response_check=lambda response: "new185" in response.text,
    poke_interval=5,
    **mode='reschedule',**
    dag=dag)

The above one just runs into an indefinite loop even after the actual lambda response_check is successful and following is the log I am seeing

INFO - Task is not able to be run
[2020-07-31 18:40:33,549] {taskinstance.py:664} INFO - Dependencies not met for <TaskInstance: simplelineardagnovar185.http_sensor_check_1 2020-07-30T00:42:00+00:00 [up_for_retry]>, dependency 'Not In Retry Period' FAILED: Task is not ready for retry yet but will be retried automatically. Current date is 2020-07-31T18:40:33.548968+00:00 and task will be retried at 2020-07-31T18:41:02.308501+00:00.

On the Scheduler side I am seeing the following
2020-07-31 18:40:32,287 ERROR - Executor reports task instance <TaskInstance: simplelineardagnovar185.http_sensor_check_1 2020-07-30 00:42:00+00:00 [queued]> finished (success) although the task says its queued. Was the task killed externally?

What you expected to happen:
The task should be rescheduled and pass when the condition is met.

How to reproduce it:
Use http sensor with "mode=reschedule" to reproduce the issue.

Anything else we need to know:

PFB the complete DAG

My Dag is quite simple.

default_args = {
 'owner': 'Paramesh',
 'depends_on_past': True,
 'start_date': datetime(2020, 7, 5),
 'email': 'params85@gmail.com',
 'email_on_failure': False,
 'email_on_retry': False,
 'retries': 10,
 'retry_delay': timedelta(seconds=30),
}

# Catch-up is set to false as we don't want any backfill.
dag = DAG('simplelineardagnovar185', schedule_interval='*/7 * * * *', default_args=default_args, catchup=False)

POST_PRODUCTS = SimpleHttpOperator(
 task_id='POST_PRODUCTS',
 method='POST',
 http_conn_id='stub_default',
 endpoint='/api/v1/products',
 data=json.dumps(
 {"description": "new185", "name": "product3", "price": "500.0", "sleep": "300000"}),
 headers={"Content-Type": "application/json"},
 xcom_push=True,
 dag=dag)

task_http_sensor_check_1 = HttpSensor(
 task_id='http_sensor_check_1',
 http_conn_id='stub_default',
 endpoint='/api/v1/products',
 request_params={},
 response_check=lambda response: "new185" in response.text,
 mode='reschedule',
 poke_interval=5,
 dag=dag)

DELETE_PRODUCTS = SimpleHttpOperator(
 task_id='DELETE_PRODUCTS',
 method='DELETE',
 http_conn_id='stub_default',
 endpoint='/api/v1/products/desc/new185',
 headers={"Content-Type": "application/json"},
 dag=dag)

POST_PRODUCTS >> task_http_sensor_check_1 >> DELETE_PRODUCTS

The POST_PRODUCTS will post an async request and that will takes ~5 min to get committed to DB and the http_sensor_check just do the get to identify whether the POST is successful.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions