Skip to content

Commit e32946f

Browse files
authored
🐛Fix infinite Waiting for cluster when dask-scheduler is restarted (ITISFoundation#5252)
1 parent 42d7630 commit e32946f

File tree

8 files changed

+358
-256
lines changed

8 files changed

+358
-256
lines changed

services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from enum import Enum
2+
from typing import TypeAlias
23

34

45
# NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item]
@@ -11,3 +12,7 @@ class DaskClientTaskState(str, Enum):
1112
ERRED = "ERRED"
1213
ABORTED = "ABORTED"
1314
SUCCESS = "SUCCESS"
15+
16+
17+
DaskJobID: TypeAlias = str
18+
DaskResources: TypeAlias = dict[str, int | float]

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py

Lines changed: 70 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import asyncio
1414
import datetime
1515
import logging
16-
import traceback
1716
from abc import ABC, abstractmethod
1817
from dataclasses import dataclass, field
1918
from typing import Final
@@ -551,7 +550,7 @@ async def _start_tasks(
551550
project_id: ProjectID,
552551
scheduled_tasks: dict[NodeID, CompTaskAtDB],
553552
pipeline_params: ScheduledPipelineParams,
554-
) -> list:
553+
) -> None:
555554
...
556555

557556
@abstractmethod
@@ -663,7 +662,7 @@ async def _schedule_pipeline(
663662
user_id, project_id, iteration, RunningState.ABORTED
664663
)
665664
self.scheduled_pipelines.pop((user_id, project_id, iteration), None)
666-
except DaskClientAcquisisitonError:
665+
except (DaskClientAcquisisitonError, ClustersKeeperNotAvailableError):
667666
_logger.exception(
668667
"Unexpected error while connecting with computational backend, aborting pipeline"
669668
)
@@ -692,12 +691,14 @@ async def _schedule_tasks_to_stop(
692691
) -> None:
693692
# get any running task and stop them
694693
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
695-
await comp_tasks_repo.mark_project_published_tasks_as_aborted(project_id)
694+
await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted(
695+
project_id
696+
)
696697
# stop any remaining running task, these are already submitted
697698
tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES]
698699
await self._stop_tasks(user_id, tasks_to_stop, pipeline_params)
699700

700-
async def _schedule_tasks_to_start(
701+
async def _schedule_tasks_to_start( # noqa: C901
701702
self,
702703
user_id: UserID,
703704
project_id: ProjectID,
@@ -729,77 +730,32 @@ async def _schedule_tasks_to_start(
729730
return comp_tasks
730731

731732
try:
732-
results = await self._start_tasks(
733+
await self._start_tasks(
733734
user_id=user_id,
734735
project_id=project_id,
735736
scheduled_tasks=tasks_ready_to_start,
736737
pipeline_params=pipeline_params,
737738
)
739+
except (
740+
ComputationalBackendNotConnectedError,
741+
ComputationalSchedulerChangedError,
742+
):
743+
_logger.exception(
744+
"Issue with computational backend. Tasks are set back "
745+
"to WAITING_FOR_CLUSTER state until scheduler comes back!",
746+
)
747+
await CompTasksRepository.instance(
748+
self.db_engine
749+
).update_project_tasks_state(
750+
project_id,
751+
list(tasks_ready_to_start.keys()),
752+
RunningState.WAITING_FOR_CLUSTER,
753+
)
754+
for task in tasks_ready_to_start:
755+
comp_tasks[
756+
NodeIDStr(f"{task}")
757+
].state = RunningState.WAITING_FOR_CLUSTER
738758

739-
# Handling errors raised when _start_tasks(...)
740-
for r, t in zip(results, tasks_ready_to_start, strict=True):
741-
if isinstance(r, TaskSchedulingError):
742-
_logger.error(
743-
"Project '%s''s task '%s' could not be scheduled due to the following: %s",
744-
r.project_id,
745-
r.node_id,
746-
f"{r}",
747-
)
748-
749-
await CompTasksRepository.instance(
750-
self.db_engine
751-
).update_project_tasks_state(
752-
project_id,
753-
[r.node_id],
754-
RunningState.FAILED,
755-
r.get_errors(),
756-
optional_progress=1.0,
757-
optional_stopped=arrow.utcnow().datetime,
758-
)
759-
comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED
760-
elif isinstance(
761-
r,
762-
ComputationalBackendNotConnectedError
763-
| ComputationalSchedulerChangedError,
764-
):
765-
_logger.error(
766-
"Issue with computational backend: %s. Tasks are set back "
767-
"to WAITING_FOR_CLUSTER state until scheduler comes back!",
768-
r,
769-
)
770-
# we should try re-connecting.
771-
# in the meantime we cannot schedule tasks on the scheduler,
772-
# let's put these tasks back to WAITING_FOR_CLUSTER, so they might be re-submitted later
773-
await CompTasksRepository.instance(
774-
self.db_engine
775-
).update_project_tasks_state(
776-
project_id,
777-
list(tasks_ready_to_start.keys()),
778-
RunningState.WAITING_FOR_CLUSTER,
779-
)
780-
comp_tasks[
781-
NodeIDStr(f"{t}")
782-
].state = RunningState.WAITING_FOR_CLUSTER
783-
elif isinstance(r, Exception):
784-
_logger.error(
785-
"Unexpected error for %s with %s on %s happened when scheduling %s:\n%s\n%s",
786-
f"{user_id=}",
787-
f"{project_id=}",
788-
f"{pipeline_params.cluster_id=}",
789-
f"{tasks_ready_to_start.keys()=}",
790-
f"{r}",
791-
"".join(traceback.format_tb(r.__traceback__)),
792-
)
793-
await CompTasksRepository.instance(
794-
self.db_engine
795-
).update_project_tasks_state(
796-
project_id,
797-
[t],
798-
RunningState.FAILED,
799-
optional_progress=1.0,
800-
optional_stopped=arrow.utcnow().datetime,
801-
)
802-
comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED
803759
except ComputationalBackendOnDemandNotReadyError as exc:
804760
_logger.info(
805761
"The on demand computational backend is not ready yet: %s", exc
@@ -819,8 +775,10 @@ async def _schedule_tasks_to_start(
819775
list(tasks_ready_to_start.keys()),
820776
RunningState.WAITING_FOR_CLUSTER,
821777
)
822-
for task in comp_tasks.values():
823-
task.state = RunningState.WAITING_FOR_CLUSTER
778+
for task in tasks_ready_to_start:
779+
comp_tasks[
780+
NodeIDStr(f"{task}")
781+
].state = RunningState.WAITING_FOR_CLUSTER
824782
except ClustersKeeperNotAvailableError:
825783
_logger.exception("Unexpected error while starting tasks:")
826784
await publish_project_log(
@@ -840,8 +798,46 @@ async def _schedule_tasks_to_start(
840798
optional_progress=1.0,
841799
optional_stopped=arrow.utcnow().datetime,
842800
)
843-
for task in comp_tasks.values():
844-
task.state = RunningState.FAILED
801+
for task in tasks_ready_to_start:
802+
comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED
803+
raise
804+
except TaskSchedulingError as exc:
805+
_logger.exception(
806+
"Project '%s''s task '%s' could not be scheduled",
807+
exc.project_id,
808+
exc.node_id,
809+
)
810+
await CompTasksRepository.instance(
811+
self.db_engine
812+
).update_project_tasks_state(
813+
project_id,
814+
[exc.node_id],
815+
RunningState.FAILED,
816+
exc.get_errors(),
817+
optional_progress=1.0,
818+
optional_stopped=arrow.utcnow().datetime,
819+
)
820+
comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED
821+
except Exception:
822+
_logger.exception(
823+
"Unexpected error for %s with %s on %s happened when scheduling %s:",
824+
f"{user_id=}",
825+
f"{project_id=}",
826+
f"{pipeline_params.cluster_id=}",
827+
f"{tasks_ready_to_start.keys()=}",
828+
)
829+
await CompTasksRepository.instance(
830+
self.db_engine
831+
).update_project_tasks_state(
832+
project_id,
833+
list(tasks_ready_to_start.keys()),
834+
RunningState.FAILED,
835+
optional_progress=1.0,
836+
optional_stopped=arrow.utcnow().datetime,
837+
)
838+
for task in tasks_ready_to_start:
839+
comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED
840+
raise
845841

846842
return comp_tasks
847843

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from ...models.comp_runs import RunMetadataDict
3131
from ...models.comp_tasks import CompTaskAtDB
3232
from ...models.dask_subsystem import DaskClientTaskState
33-
from ...modules.dask_client import DaskClient
33+
from ...modules.dask_client import DaskClient, PublishedComputationTask
3434
from ...modules.dask_clients_pool import DaskClientsPool
3535
from ...modules.db.repositories.clusters import ClustersRepository
3636
from ...modules.db.repositories.comp_runs import CompRunsRepository
@@ -105,7 +105,7 @@ async def _start_tasks(
105105
project_id: ProjectID,
106106
scheduled_tasks: dict[NodeID, CompTaskAtDB],
107107
pipeline_params: ScheduledPipelineParams,
108-
) -> list:
108+
) -> None:
109109
# now transfer the pipeline to the dask scheduler
110110
async with _cluster_dask_client(user_id, pipeline_params, self) as client:
111111
# Change the tasks state to PENDING
@@ -116,9 +116,7 @@ async def _start_tasks(
116116
RunningState.PENDING,
117117
)
118118
# each task is started independently
119-
results: list[
120-
list[tuple[NodeID, str]] | BaseException
121-
] = await asyncio.gather(
119+
results: list[list[PublishedComputationTask]] = await asyncio.gather(
122120
*(
123121
client.send_computation_tasks(
124122
user_id=user_id,
@@ -131,20 +129,18 @@ async def _start_tasks(
131129
)
132130
for node_id, task in scheduled_tasks.items()
133131
),
134-
return_exceptions=True,
135132
)
136133

137134
# update the database so we do have the correct job_ids there
138135
await asyncio.gather(
139-
*[
136+
*(
140137
comp_tasks_repo.update_project_task_job_id(
141-
project_id, tasks_sent[0][0], tasks_sent[0][1]
138+
project_id, task.node_id, task.job_id
142139
)
143-
for tasks_sent in results
144-
if not isinstance(tasks_sent, BaseException)
145-
]
140+
for task_sents in results
141+
for task in task_sents
142+
)
146143
)
147-
return results
148144

149145
async def _get_tasks_status(
150146
self,

0 commit comments

Comments
 (0)