diff --git a/.vscode/launch.json b/.vscode/launch.json index 2e46aee..9b034e6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,14 +19,14 @@ ] }, { - "name": "API Serve", + "name": "AIO-FLUID EXAMPLE Serve", "type": "python", "request": "launch", "program": "${workspaceFolder}/examples/main.py", "cwd": "${workspaceFolder}", "justMyCode": false, "args": [ - "ls" + "serve" ], "debugOptions": [ "RedirectOutput" diff --git a/fluid/scheduler/consumer.py b/fluid/scheduler/consumer.py index 5109b34..392c9ec 100644 --- a/fluid/scheduler/consumer.py +++ b/fluid/scheduler/consumer.py @@ -169,6 +169,7 @@ def __init__(self, **config: Any) -> None: self._queue_tasks_worker = WorkerFunction( self._queue_task, name="queue-task-worker" ) + self.add_workers(self._queue_tasks_worker) for i in range(self.config.max_concurrent_tasks): worker_name = f"task-worker-{i+1}" self.add_workers( diff --git a/fluid/scheduler/endpoints.py b/fluid/scheduler/endpoints.py index a3cd0b6..31dfb94 100644 --- a/fluid/scheduler/endpoints.py +++ b/fluid/scheduler/endpoints.py @@ -47,6 +47,17 @@ async def get_tasks(task_manager: TaskManagerDep) -> list[TaskInfo]: return await task_manager.broker.get_tasks_info() +@router.get( + "/tasks/status", + response_model=dict, + summary="Task consumer status", + description="Retrieve a list of tasks runs", +) +async def get_task_status(task_manager: TaskManagerDep) -> dict: + if isinstance(task_manager, Worker): + return await task_manager.status() + return {} + @router.post( "/tasks", response_model=TaskRun,