Skip to content

Commit

Permalink
fix(BA-594): Handle cancel and timeout when creating kernels (#3648)
Browse files Browse the repository at this point in the history
Backported-from: main (25.2)
Backported-to: 24.03
Backport-of: 3648
  • Loading branch information
fregataa committed Feb 17, 2025
1 parent f509d7d commit ce1aada
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 41 deletions.
1 change: 1 addition & 0 deletions changes/3648.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle cancel and timeout when creating kernels
1 change: 1 addition & 0 deletions src/ai/backend/manager/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ class LockID(enum.IntEnum):
DEFAULT_KEYPAIR_RATE_LIMIT: Final = 10000

DEFAULT_SHARED_MEMORY_SIZE: Final[str] = "64m"
START_SESSION_TIMEOUT_SEC: Final[float] = 60 * 30 # 30 min
2 changes: 1 addition & 1 deletion src/ai/backend/manager/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ErrorStatusInfo(TypedDict):


def convert_to_status_data(
e: Exception,
e: BaseException,
is_debug: bool = False,
*,
src: str | None = None,
Expand Down
41 changes: 4 additions & 37 deletions src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
)
from .config import LocalConfig, SharedConfig
from .defs import DEFAULT_IMAGE_ARCH, DEFAULT_ROLE, DEFAULT_SHARED_MEMORY_SIZE, INTRINSIC_SLOTS
from .exceptions import MultiAgentError, convert_to_status_data
from .exceptions import MultiAgentError
from .models import (
AGENT_RESOURCE_OCCUPYING_KERNEL_STATUSES,
AGENT_RESOURCE_OCCUPYING_SESSION_STATUSES,
Expand Down Expand Up @@ -1802,42 +1802,9 @@ async def _update_kernel() -> None:
[binding.kernel.id for binding in items],
agent_alloc_ctx.agent_id,
)
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
except Exception as e:
# The agent has already cancelled or issued the destruction lifecycle event
# for this batch of kernels.
ex = e
for binding in items:
kernel_id = binding.kernel.id

async def _update_failure() -> None:
async with self.db.begin_session() as db_sess:
now = datetime.now(tzutc())
query = (
sa.update(KernelRow)
.where(KernelRow.id == kernel_id)
.values(
status=KernelStatus.ERROR,
status_info=f"other-error ({ex!r})",
status_changed=now,
terminated_at=now,
status_history=sql_json_merge(
KernelRow.status_history,
(),
{
KernelStatus.ERROR.name: (
now.isoformat()
), # ["PULLING", "PREPARING"]
},
),
status_data=convert_to_status_data(ex, self.debug),
)
)
await db_sess.execute(query)

await execute_with_retry(_update_failure)
raise
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
raise

async def create_cluster_ssh_keypair(self) -> ClusterSSHKeyPair:
key = rsa.generate_private_key(
Expand Down
6 changes: 3 additions & 3 deletions src/ai/backend/manager/scheduler/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
InstanceNotAvailable,
SessionNotFound,
)
from ..defs import SERVICE_MAX_RETRIES, LockID
from ..defs import SERVICE_MAX_RETRIES, START_SESSION_TIMEOUT_SEC, LockID
from ..exceptions import convert_to_status_data
from ..models import (
AgentRow,
Expand Down Expand Up @@ -1338,7 +1338,7 @@ async def _mark_session_preparing() -> Sequence[SessionRow]:
scheduled_sessions = await execute_with_retry(_mark_session_preparing)
log.debug("prepare(): preparing {} session(s)", len(scheduled_sessions))
async with (
async_timeout.timeout(delay=50.0),
async_timeout.timeout(delay=START_SESSION_TIMEOUT_SEC),
aiotools.PersistentTaskGroup() as tg,
):
for scheduled_session in scheduled_sessions:
Expand Down Expand Up @@ -1593,7 +1593,7 @@ async def start_session(
try:
assert len(session.kernels) > 0
await self.registry.start_session(sched_ctx, session)
except Exception as e:
except (asyncio.CancelledError, Exception) as e:
status_data = convert_to_status_data(e, self.local_config["debug"]["enabled"])
log.warning(log_fmt + "failed-starting", *log_args, exc_info=True)
# TODO: instead of instantly cancelling upon exception, we could mark it as
Expand Down

0 comments on commit ce1aada

Please sign in to comment.