Skip to content

Commit

Permalink
add dag_run_ids and task_ids filter for the batch task instance API e…
Browse files Browse the repository at this point in the history
…ndpoint (apache#32705)

* add dag_run_ids and task_ids as filter types for the batch task instance endpoint

* add version notice to the new filters

* Update the released version in OpenAPI spec

---------

Co-authored-by: pierrejeambrun <pierrejbrun@gmail.com>
  • Loading branch information
karakanb and pierrejeambrun authored Aug 7, 2023
1 parent 9311d34 commit 08b1e8d
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
base_query = select(TI).join(TI.dag_run)

base_query = _apply_array_filter(base_query, key=TI.dag_id, values=data["dag_ids"])
base_query = _apply_array_filter(base_query, key=TI.run_id, values=data["dag_run_ids"])
base_query = _apply_array_filter(base_query, key=TI.task_id, values=data["task_ids"])
base_query = _apply_range_filter(
base_query,
key=DR.execution_date,
Expand Down
20 changes: 20 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4410,6 +4410,26 @@ components:
Return objects with specific DAG IDs.

The value can be repeated to retrieve multiple matching values (OR condition).
dag_run_ids:
type: array
items:
type: string
description:
Return objects with specific DAG Run IDs.

The value can be repeated to retrieve multiple matching values (OR condition).

*New in version 2.7.1*
task_ids:
type: array
items:
type: string
description:
Return objects with specific task IDs.

The value can be repeated to retrieve multiple matching values (OR condition).

*New in version 2.7.1*
execution_date_gte:
type: string
format: date-time
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class TaskInstanceBatchFormSchema(Schema):
page_offset = fields.Int(load_default=0, validate=validate.Range(min=0))
page_limit = fields.Int(load_default=100, validate=validate.Range(min=1))
dag_ids = fields.List(fields.Str(), load_default=None)
dag_run_ids = fields.List(fields.Str(), load_default=None)
task_ids = fields.List(fields.Str(), load_default=None)
execution_date_gte = fields.DateTime(load_default=None, validate=validate_istimezone)
execution_date_lte = fields.DateTime(load_default=None, validate=validate_istimezone)
start_date_gte = fields.DateTime(load_default=None, validate=validate_istimezone)
Expand Down
12 changes: 12 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,18 @@ export interface components {
* The value can be repeated to retrieve multiple matching values (OR condition).
*/
dag_ids?: string[];
/**
* @description Return objects with specific DAG Run IDs.
* The value can be repeated to retrieve multiple matching values (OR condition).
* *New in version 2.7.1*
*/
dag_run_ids?: string[];
/**
* @description Return objects with specific task IDs.
* The value can be repeated to retrieve multiple matching values (OR condition).
* *New in version 2.7.1*
*/
task_ids?: string[];
/**
* Format: date-time
* @description Returns objects greater or equal to the specified date.
Expand Down
30 changes: 30 additions & 0 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,36 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"test",
id="with execution date filter",
),
pytest.param(
[
{"execution_date": DEFAULT_DATETIME_1},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1)},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2)},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=3)},
],
False,
{
"dag_run_ids": ["TEST_DAG_RUN_ID_0", "TEST_DAG_RUN_ID_1"],
},
2,
"test",
id="test dag run id filter",
),
pytest.param(
[
{"execution_date": DEFAULT_DATETIME_1},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=1)},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=2)},
{"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=3)},
],
False,
{
"task_ids": ["print_the_context", "log_sql_query"],
},
2,
"test",
id="test task id filter",
),
],
)
def test_should_respond_200(
Expand Down

0 comments on commit 08b1e8d

Please sign in to comment.