From ce1aada072850aa202a5bc25503313db2e5a51e5 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 17 Feb 2025 18:12:44 +0900 Subject: [PATCH] fix(BA-594): Handle cancel and timeout when creating kernels (#3648) Backported-from: main (25.2) Backported-to: 24.03 Backport-of: 3648 --- changes/3648.fix.md | 1 + src/ai/backend/manager/defs.py | 1 + src/ai/backend/manager/exceptions.py | 2 +- src/ai/backend/manager/registry.py | 41 ++----------------- .../backend/manager/scheduler/dispatcher.py | 6 +-- 5 files changed, 10 insertions(+), 41 deletions(-) create mode 100644 changes/3648.fix.md diff --git a/changes/3648.fix.md b/changes/3648.fix.md new file mode 100644 index 00000000000..e42983e8b33 --- /dev/null +++ b/changes/3648.fix.md @@ -0,0 +1 @@ +Handle cancel and timeout when creating kernels diff --git a/src/ai/backend/manager/defs.py b/src/ai/backend/manager/defs.py index e43c543c4f1..d47bcc93f92 100644 --- a/src/ai/backend/manager/defs.py +++ b/src/ai/backend/manager/defs.py @@ -82,3 +82,4 @@ class LockID(enum.IntEnum): DEFAULT_KEYPAIR_RATE_LIMIT: Final = 10000 DEFAULT_SHARED_MEMORY_SIZE: Final[str] = "64m" +START_SESSION_TIMEOUT_SEC: Final[float] = 60 * 30 # 30 min diff --git a/src/ai/backend/manager/exceptions.py b/src/ai/backend/manager/exceptions.py index 02258a53244..f2441207af3 100644 --- a/src/ai/backend/manager/exceptions.py +++ b/src/ai/backend/manager/exceptions.py @@ -78,7 +78,7 @@ class ErrorStatusInfo(TypedDict): def convert_to_status_data( - e: Exception, + e: BaseException, is_debug: bool = False, *, src: str | None = None, diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index b9ce09dab78..781e182bf0c 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -130,7 +130,7 @@ ) from .config import LocalConfig, SharedConfig from .defs import DEFAULT_IMAGE_ARCH, DEFAULT_ROLE, DEFAULT_SHARED_MEMORY_SIZE, INTRINSIC_SLOTS -from .exceptions import MultiAgentError, convert_to_status_data +from .exceptions import MultiAgentError from .models import ( AGENT_RESOURCE_OCCUPYING_KERNEL_STATUSES, AGENT_RESOURCE_OCCUPYING_SESSION_STATUSES, @@ -1802,42 +1802,9 @@ async def _update_kernel() -> None: [binding.kernel.id for binding in items], agent_alloc_ctx.agent_id, ) - except (asyncio.TimeoutError, asyncio.CancelledError): - log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id) - except Exception as e: - # The agent has already cancelled or issued the destruction lifecycle event - # for this batch of kernels. - ex = e - for binding in items: - kernel_id = binding.kernel.id - - async def _update_failure() -> None: - async with self.db.begin_session() as db_sess: - now = datetime.now(tzutc()) - query = ( - sa.update(KernelRow) - .where(KernelRow.id == kernel_id) - .values( - status=KernelStatus.ERROR, - status_info=f"other-error ({ex!r})", - status_changed=now, - terminated_at=now, - status_history=sql_json_merge( - KernelRow.status_history, - (), - { - KernelStatus.ERROR.name: ( - now.isoformat() - ), # ["PULLING", "PREPARING"] - }, - ), - status_data=convert_to_status_data(ex, self.debug), - ) - ) - await db_sess.execute(query) - - await execute_with_retry(_update_failure) - raise + except (asyncio.TimeoutError, asyncio.CancelledError): + log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id) + raise async def create_cluster_ssh_keypair(self) -> ClusterSSHKeyPair: key = rsa.generate_private_key( diff --git a/src/ai/backend/manager/scheduler/dispatcher.py b/src/ai/backend/manager/scheduler/dispatcher.py index 1de10a98462..6f6799958b3 100644 --- a/src/ai/backend/manager/scheduler/dispatcher.py +++ b/src/ai/backend/manager/scheduler/dispatcher.py @@ -72,7 +72,7 @@ InstanceNotAvailable, SessionNotFound, ) -from ..defs import SERVICE_MAX_RETRIES, LockID +from ..defs import SERVICE_MAX_RETRIES, START_SESSION_TIMEOUT_SEC, LockID from ..exceptions import convert_to_status_data from ..models import ( AgentRow, @@ -1338,7 +1338,7 @@ async def _mark_session_preparing() -> Sequence[SessionRow]: scheduled_sessions = await execute_with_retry(_mark_session_preparing) log.debug("prepare(): preparing {} session(s)", len(scheduled_sessions)) async with ( - async_timeout.timeout(delay=50.0), + async_timeout.timeout(delay=START_SESSION_TIMEOUT_SEC), aiotools.PersistentTaskGroup() as tg, ): for scheduled_session in scheduled_sessions: @@ -1593,7 +1593,7 @@ async def start_session( try: assert len(session.kernels) > 0 await self.registry.start_session(sched_ctx, session) - except Exception as e: + except (asyncio.CancelledError, Exception) as e: status_data = convert_to_status_data(e, self.local_config["debug"]["enabled"]) log.warning(log_fmt + "failed-starting", *log_args, exc_info=True) # TODO: instead of instantly cancelling upon exception, we could mark it as