Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve tasks execution #332

Open
wants to merge 4 commits into
base: lyft-stable-2.3.4
Choose a base branch
from
Open

Conversation

luisglft
Copy link

@luisglft luisglft commented Oct 3, 2024

Currently, airflow scheduler takes max_tis_per_query number of tasks_instances at the time to check if they can be executed, it considers many things like

  1. Dag is not paused
  2. TasksInstance is in scheduled state
  3. Sorted by priority_weight
  4. Assigned pool has slots available
  5. DAG can execute more tasks (max_active_tasks) <<<<<<<
  6. tasks pool_slots are available in pool (a tasks can ask for more than 1 slot)
See code implementation

scheduler_job.py

query = (
    session.query(TI)
    .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
    .join(TI.dag_run)
    .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
    .join(TI.dag_model)
    .filter(not_(DM.is_paused))
    .filter(TI.state == TaskInstanceState.SCHEDULED)
    .options(selectinload('dag_model'))
    .order_by(-TI.priority_weight, DR.execution_date)
)

if starved_pools:
    query = query.filter(not_(TI.pool.in_(starved_pools)))

if starved_dags:
    query = query.filter(not_(TI.dag_id.in_(starved_dags)))

if starved_tasks:
    task_filter = tuple_in_condition((TaskInstance.dag_id, TaskInstance.task_id), starved_tasks)
    query = query.filter(not_(task_filter))

This PR focuses in improving number 5 (DAG can execute more tasks (max_active_tasks)).

The problem here is that this check is done after the tasks_instances are retrieved (starved_dags set is empty initially), because of this sometimes the first query gets 512 tasks_instances from which only a few (<10) can be executed, the rest are rejected with messages like this:

[2024-10-03 15:33:17,496] {scheduler_job.py:426} INFO - Not executing <TaskInstance: experimentatio...tion.dvr_to_pax_sms scheduled__2024-07-01T00:00:00+00:00 [scheduled]> since the number of tasks running or queued from DAG experimentatio...tion is >= to the DAG's max_active_tasks limit of 5
[2024-10-03 15:33:17,497] {scheduler_job.py:419} INFO - DAG experimentatio...tion has 5/5 running and queued tasks
[2024-10-03 15:33:17,497] {scheduler_job.py:426} INFO - Not executing <TaskInstance: experimentatio...tion.daily_active_visitors_rider scheduled__2024-07-01T00:00:00+00:00 [scheduled]> since the number of tasks running or queued from DAG experimentatio...tion is >= to the DAG's max_active_tasks limit of 5

The proposal change is is to first query for those starved_dags and apply that filter in the main query, this way those dags will be ignored and the rest of the tasks will be able to be executed.

The stuck in schedule happens because most of the 512 task_instances have high priority weight, and the same are taken for analysis every schedule cycle. By ignoring the ones that we already know won't be able to be executed, we open a ton of space for those who can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant