Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(BA-594): Handle cancel and timeout when creating kernels (#3648) #3733

Open
wants to merge 2 commits into
base: 24.03
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3648.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle cancel and timeout when creating kernels
1 change: 1 addition & 0 deletions src/ai/backend/manager/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ class LockID(enum.IntEnum):
DEFAULT_KEYPAIR_RATE_LIMIT: Final = 10000

DEFAULT_SHARED_MEMORY_SIZE: Final[str] = "64m"
START_SESSION_TIMEOUT_SEC: Final[float] = 60 * 30 # 30 min
2 changes: 1 addition & 1 deletion src/ai/backend/manager/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ErrorStatusInfo(TypedDict):


def convert_to_status_data(
e: Exception,
e: BaseException,
is_debug: bool = False,
*,
src: str | None = None,
Expand Down
41 changes: 4 additions & 37 deletions src/ai/backend/manager/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
)
from .config import LocalConfig, SharedConfig
from .defs import DEFAULT_IMAGE_ARCH, DEFAULT_ROLE, DEFAULT_SHARED_MEMORY_SIZE, INTRINSIC_SLOTS
from .exceptions import MultiAgentError, convert_to_status_data
from .exceptions import MultiAgentError
from .models import (
AGENT_RESOURCE_OCCUPYING_KERNEL_STATUSES,
AGENT_RESOURCE_OCCUPYING_SESSION_STATUSES,
Expand Down Expand Up @@ -1802,42 +1802,9 @@ async def _update_kernel() -> None:
[binding.kernel.id for binding in items],
agent_alloc_ctx.agent_id,
)
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
except Exception as e:
# The agent has already cancelled or issued the destruction lifecycle event
# for this batch of kernels.
ex = e
for binding in items:
kernel_id = binding.kernel.id

async def _update_failure() -> None:
async with self.db.begin_session() as db_sess:
now = datetime.now(tzutc())
query = (
sa.update(KernelRow)
.where(KernelRow.id == kernel_id)
.values(
status=KernelStatus.ERROR,
status_info=f"other-error ({ex!r})",
status_changed=now,
terminated_at=now,
status_history=sql_json_merge(
KernelRow.status_history,
(),
{
KernelStatus.ERROR.name: (
now.isoformat()
), # ["PULLING", "PREPARING"]
},
),
status_data=convert_to_status_data(ex, self.debug),
)
)
await db_sess.execute(query)

await execute_with_retry(_update_failure)
raise
except (asyncio.TimeoutError, asyncio.CancelledError):
log.warning("_create_kernels_in_one_agent(s:{}) cancelled", scheduled_session.id)
raise

async def create_cluster_ssh_keypair(self) -> ClusterSSHKeyPair:
key = rsa.generate_private_key(
Expand Down
6 changes: 3 additions & 3 deletions src/ai/backend/manager/scheduler/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
InstanceNotAvailable,
SessionNotFound,
)
from ..defs import SERVICE_MAX_RETRIES, LockID
from ..defs import SERVICE_MAX_RETRIES, START_SESSION_TIMEOUT_SEC, LockID
from ..exceptions import convert_to_status_data
from ..models import (
AgentRow,
Expand Down Expand Up @@ -1338,7 +1338,7 @@ async def _mark_session_preparing() -> Sequence[SessionRow]:
scheduled_sessions = await execute_with_retry(_mark_session_preparing)
log.debug("prepare(): preparing {} session(s)", len(scheduled_sessions))
async with (
async_timeout.timeout(delay=50.0),
async_timeout.timeout(delay=START_SESSION_TIMEOUT_SEC),
aiotools.PersistentTaskGroup() as tg,
):
for scheduled_session in scheduled_sessions:
Expand Down Expand Up @@ -1593,7 +1593,7 @@ async def start_session(
try:
assert len(session.kernels) > 0
await self.registry.start_session(sched_ctx, session)
except Exception as e:
except (asyncio.CancelledError, Exception) as e:
status_data = convert_to_status_data(e, self.local_config["debug"]["enabled"])
log.warning(log_fmt + "failed-starting", *log_args, exc_info=True)
# TODO: instead of instantly cancelling upon exception, we could mark it as
Expand Down
Loading