From 93bf8aef40b97853cc71df25dc84e3bebf796630 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 29 Nov 2022 16:53:50 +0100 Subject: [PATCH] Extract out kernels --- jupyter_server/base/handlers.py | 8 + jupyter_server/base/zmqhandlers.py | 2 +- jupyter_server/gateway/managers.py | 2 +- jupyter_server/kernelspecs/handlers.py | 2 +- jupyter_server/serverapp.py | 194 +---- jupyter_server/services/kernels/__init__.py | 0 .../services/kernels/connection/__init__.py | 0 .../services/kernels/connection/abc.py | 33 - .../services/kernels/connection/base.py | 156 ---- .../services/kernels/connection/channels.py | 812 ------------------ jupyter_server/services/kernels/handlers.py | 111 --- .../services/kernels/kernelmanager.py | 691 --------------- jupyter_server/services/kernels/websocket.py | 81 -- .../services/kernelspecs/__init__.py | 0 .../services/kernelspecs/handlers.py | 109 --- .../services/sessions/sessionmanager.py | 2 +- 16 files changed, 52 insertions(+), 2151 deletions(-) delete mode 100644 jupyter_server/services/kernels/__init__.py delete mode 100644 jupyter_server/services/kernels/connection/__init__.py delete mode 100644 jupyter_server/services/kernels/connection/abc.py delete mode 100644 jupyter_server/services/kernels/connection/base.py delete mode 100644 jupyter_server/services/kernels/connection/channels.py delete mode 100644 jupyter_server/services/kernels/handlers.py delete mode 100644 jupyter_server/services/kernels/kernelmanager.py delete mode 100644 jupyter_server/services/kernels/websocket.py delete mode 100644 jupyter_server/services/kernelspecs/__init__.py delete mode 100644 jupyter_server/services/kernelspecs/handlers.py diff --git a/jupyter_server/base/handlers.py b/jupyter_server/base/handlers.py index 945a3096e5..c24f63b8cf 100644 --- a/jupyter_server/base/handlers.py +++ b/jupyter_server/base/handlers.py @@ -322,6 +322,14 @@ def contents_manager(self): @property def session_manager(self): + if "session_manager" not in self.settings: + self.settings["session_manager"] = self.settings["session_manager_class"]( + parent=self.serverapp, + log=self.settings["log"], + kernel_manager=self.settings["kernel_manager"], + contents_manager=self.contents_manager, + ) + return self.settings["session_manager"] @property diff --git a/jupyter_server/base/zmqhandlers.py b/jupyter_server/base/zmqhandlers.py index 4490380a34..2a328a90f9 100644 --- a/jupyter_server/base/zmqhandlers.py +++ b/jupyter_server/base/zmqhandlers.py @@ -5,7 +5,7 @@ from tornado.websocket import WebSocketHandler from jupyter_server.base.websocket import WebSocketMixin -from jupyter_server.services.kernels.connection.base import ( +from jupyter_server_kernels.kernels.connection.base import ( deserialize_binary_message, deserialize_msg_from_ws_v1, serialize_binary_message, diff --git a/jupyter_server/gateway/managers.py b/jupyter_server/gateway/managers.py index e826670b08..ea4bdbd394 100644 --- a/jupyter_server/gateway/managers.py +++ b/jupyter_server/gateway/managers.py @@ -16,12 +16,12 @@ from jupyter_client.kernelspec import KernelSpecManager from jupyter_client.manager import AsyncKernelManager from jupyter_client.managerabc import KernelManagerABC +from jupyter_server_kernels.kernels.kernelmanager import AsyncMappingKernelManager from tornado import web from tornado.escape import json_decode, json_encode, url_escape, utf8 from traitlets import DottedObjectName, Instance, Type, default from .._tz import UTC -from ..services.kernels.kernelmanager import AsyncMappingKernelManager from ..services.sessions.sessionmanager import SessionManager from ..utils import ensure_async, url_path_join from .gateway_client import GatewayClient, gateway_request diff --git a/jupyter_server/kernelspecs/handlers.py b/jupyter_server/kernelspecs/handlers.py index 3ac8506a31..76cd69662e 100644 --- a/jupyter_server/kernelspecs/handlers.py +++ b/jupyter_server/kernelspecs/handlers.py @@ -3,7 +3,7 @@ from jupyter_server.auth import authorized from ..base.handlers import JupyterHandler -from ..services.kernelspecs.handlers import kernel_name_regex +from jupyter_server_kernels.kernelspecs.handlers import kernel_name_regex AUTH_RESOURCE = "kernelspecs" diff --git a/jupyter_server/serverapp.py b/jupyter_server/serverapp.py index 025c98746c..c09a758952 100644 --- a/jupyter_server/serverapp.py +++ b/jupyter_server/serverapp.py @@ -86,12 +86,12 @@ from jupyter_server.extension.config import ExtensionConfigManager from jupyter_server.extension.manager import ExtensionManager from jupyter_server.extension.serverextension import ServerExtensionApp -from jupyter_server.gateway.managers import ( - GatewayClient, - GatewayKernelSpecManager, - GatewayMappingKernelManager, - GatewaySessionManager, -) +#from jupyter_server.gateway.managers import ( +# GatewayClient, +# GatewayKernelSpecManager, +# GatewayMappingKernelManager, +# GatewaySessionManager, +#) from jupyter_server.log import log_request from jupyter_server.services.config import ConfigManager from jupyter_server.services.contents.filemanager import ( @@ -103,16 +103,6 @@ AsyncContentsManager, ContentsManager, ) -from jupyter_server.services.kernels.connection.base import ( - BaseKernelWebsocketConnection, -) -from jupyter_server.services.kernels.connection.channels import ( - ZMQChannelsWebsocketConnection, -) -from jupyter_server.services.kernels.kernelmanager import ( - AsyncMappingKernelManager, - MappingKernelManager, -) from jupyter_server.services.sessions.sessionmanager import SessionManager from jupyter_server.utils import ( check_pid, @@ -168,12 +158,12 @@ contents=["jupyter_server.services.contents.handlers"], files=["jupyter_server.files.handlers"], kernels=[ - "jupyter_server.services.kernels.handlers", - "jupyter_server.services.kernels.websocket", + "jupyter_server_kernels.kernels.handlers", + "jupyter_server_kernels.kernels.websocket", ], kernelspecs=[ "jupyter_server.kernelspecs.handlers", - "jupyter_server.services.kernelspecs.handlers", + "jupyter_server_kernels.kernelspecs.handlers", ], nbconvert=[ "jupyter_server.nbconvert.handlers", @@ -222,9 +212,8 @@ def __init__( self, jupyter_app, default_services, - kernel_manager, contents_manager, - session_manager, + session_manager_class, kernel_spec_manager, config_manager, event_logger, @@ -259,9 +248,8 @@ def __init__( settings = self.init_settings( jupyter_app, - kernel_manager, contents_manager, - session_manager, + session_manager_class, kernel_spec_manager, config_manager, event_logger, @@ -282,9 +270,8 @@ def __init__( def init_settings( self, jupyter_app, - kernel_manager, contents_manager, - session_manager, + session_manager_class, kernel_spec_manager, config_manager, event_logger, @@ -341,6 +328,7 @@ def init_settings( settings = dict( # basics + log=log, log_function=log_request, base_url=base_url, default_url=default_url, @@ -354,13 +342,6 @@ def init_settings( "no_cache_paths": [url_path_join(base_url, "static", "custom")], }, version_hash=version_hash, - # kernel message protocol over websocket - kernel_ws_protocol=jupyter_app.kernel_ws_protocol, - # rate limits - limit_rate=jupyter_app.limit_rate, - iopub_msg_rate_limit=jupyter_app.iopub_msg_rate_limit, - iopub_data_rate_limit=jupyter_app.iopub_data_rate_limit, - rate_limit_window=jupyter_app.rate_limit_window, # authentication cookie_secret=jupyter_app.cookie_secret, login_url=url_path_join(base_url, "/login"), @@ -370,15 +351,13 @@ def init_settings( local_hostnames=jupyter_app.local_hostnames, authenticate_prometheus=jupyter_app.authenticate_prometheus, # managers - kernel_manager=kernel_manager, contents_manager=contents_manager, - session_manager=session_manager, + session_manager_class=session_manager_class, kernel_spec_manager=kernel_spec_manager, config_manager=config_manager, authorizer=authorizer, identity_provider=identity_provider, event_logger=event_logger, - kernel_websocket_connection_class=kernel_websocket_connection_class, # handlers extra_services=extra_services, # Jupyter stuff @@ -435,15 +414,15 @@ def init_handlers(self, default_services, settings): handlers.extend(settings["identity_provider"].get_handlers()) # If gateway mode is enabled, replace appropriate handlers to perform redirection - if GatewayClient.instance().gateway_enabled: - # for each handler required for gateway, locate its pattern - # in the current list and replace that entry... - gateway_handlers = load_handlers("jupyter_server.gateway.handlers") - for _, gwh in enumerate(gateway_handlers): - for j, h in enumerate(handlers): - if gwh[0] == h[0]: - handlers[j] = (gwh[0], gwh[1]) - break + #if GatewayClient.instance().gateway_enabled: + # # for each handler required for gateway, locate its pattern + # # in the current list and replace that entry... + # gateway_handlers = load_handlers("jupyter_server.gateway.handlers") + # for _, gwh in enumerate(gateway_handlers): + # for j, h in enumerate(handlers): + # if gwh[0] == h[0]: + # handlers[j] = (gwh[0], gwh[1]) + # break # register base handlers last handlers.extend(load_handlers("jupyter_server.base.handlers")) @@ -775,21 +754,18 @@ class ServerApp(JupyterApp): classes = [ KernelManager, Session, - MappingKernelManager, KernelSpecManager, - AsyncMappingKernelManager, ContentsManager, FileContentsManager, AsyncContentsManager, AsyncFileContentsManager, NotebookNotary, - GatewayMappingKernelManager, - GatewayKernelSpecManager, - GatewaySessionManager, - GatewayClient, + #GatewayMappingKernelManager, + #GatewayKernelSpecManager, + #GatewaySessionManager, + #GatewayClient, Authorizer, EventLogger, - ZMQChannelsWebsocketConnection, ] subcommands = dict( @@ -1457,18 +1433,6 @@ def template_file_path(self): help=_i18n("The content manager class to use."), ) - kernel_manager_class = Type( - klass=MappingKernelManager, - config=True, - help=_i18n("The kernel manager class to use."), - ) - - @default("kernel_manager_class") - def _default_kernel_manager_class(self): - if self.gateway_config.gateway_enabled: - return "jupyter_server.gateway.managers.GatewayMappingKernelManager" - return AsyncMappingKernelManager - session_manager_class = Type( config=True, help=_i18n("The session manager class to use."), @@ -1476,17 +1440,10 @@ def _default_kernel_manager_class(self): @default("session_manager_class") def _default_session_manager_class(self): - if self.gateway_config.gateway_enabled: - return "jupyter_server.gateway.managers.GatewaySessionManager" + #if self.gateway_config.gateway_enabled: + # return "jupyter_server.gateway.managers.GatewaySessionManager" return SessionManager - kernel_websocket_connection_class = Type( - default_value=ZMQChannelsWebsocketConnection, - klass=BaseKernelWebsocketConnection, - config=True, - help=_i18n("The kernel websocket connection class to use."), - ) - config_manager_class = Type( default_value=ConfigManager, config=True, @@ -1508,8 +1465,8 @@ def _default_session_manager_class(self): @default("kernel_spec_manager_class") def _default_kernel_spec_manager_class(self): - if self.gateway_config.gateway_enabled: - return "jupyter_server.gateway.managers.GatewayKernelSpecManager" + #if self.gateway_config.gateway_enabled: + # return "jupyter_server.gateway.managers.GatewayKernelSpecManager" return KernelSpecManager login_handler_class = Type( @@ -1731,56 +1688,6 @@ def _update_server_extensions(self, change): help=_i18n("Reraise exceptions encountered loading server extensions?"), ) - kernel_ws_protocol = Unicode( - allow_none=True, - config=True, - help=_i18n("DEPRECATED. Use ZMQChannelsWebsocketConnection.kernel_ws_protocol"), - ) - - @observe("kernel_ws_protocol") - def _deprecated_kernel_ws_protocol(self, change): - self._warn_deprecated_config(change, "ZMQChannelsWebsocketConnection") - - limit_rate = Bool( - allow_none=True, - config=True, - help=_i18n("DEPRECATED. Use ZMQChannelsWebsocketConnection.limit_rate"), - ) - - @observe("limit_rate") - def _deprecated_limit_rate(self, change): - self._warn_deprecated_config(change, "ZMQChannelsWebsocketConnection") - - iopub_msg_rate_limit = Float( - allow_none=True, - config=True, - help=_i18n("DEPRECATED. Use ZMQChannelsWebsocketConnection.iopub_msg_rate_limit"), - ) - - @observe("iopub_msg_rate_limit") - def _deprecated_iopub_msg_rate_limit(self, change): - self._warn_deprecated_config(change, "ZMQChannelsWebsocketConnection") - - iopub_data_rate_limit = Float( - allow_none=True, - config=True, - help=_i18n("DEPRECATED. Use ZMQChannelsWebsocketConnection.iopub_data_rate_limit"), - ) - - @observe("iopub_data_rate_limit") - def _deprecated_iopub_data_rate_limit(self, change): - self._warn_deprecated_config(change, "ZMQChannelsWebsocketConnection") - - rate_limit_window = Float( - allow_none=True, - config=True, - help=_i18n("DEPRECATED. Use ZMQChannelsWebsocketConnection.rate_limit_window"), - ) - - @observe("rate_limit_window") - def _deprecated_rate_limit_window(self, change): - self._warn_deprecated_config(change, "ZMQChannelsWebsocketConnection") - shutdown_no_activity_timeout = Integer( 0, config=True, @@ -1858,14 +1765,7 @@ def init_configurables(self): # If gateway server is configured, replace appropriate managers to perform redirection. To make # this determination, instantiate the GatewayClient config singleton. - self.gateway_config = GatewayClient.instance(parent=self) - - if not issubclass(self.kernel_manager_class, AsyncMappingKernelManager): - warnings.warn( - "The synchronous MappingKernelManager class is deprecated and will not be supported in Jupyter Server 3.0", - DeprecationWarning, - stacklevel=2, - ) + #self.gateway_config = GatewayClient.instance(parent=self) if not issubclass(self.contents_manager_class, AsyncContentsManager): warnings.warn( @@ -1877,22 +1777,10 @@ def init_configurables(self): self.kernel_spec_manager = self.kernel_spec_manager_class( parent=self, ) - self.kernel_manager = self.kernel_manager_class( - parent=self, - log=self.log, - connection_dir=self.runtime_dir, - kernel_spec_manager=self.kernel_spec_manager, - ) self.contents_manager = self.contents_manager_class( parent=self, log=self.log, ) - self.session_manager = self.session_manager_class( - parent=self, - log=self.log, - kernel_manager=self.kernel_manager, - contents_manager=self.contents_manager, - ) self.config_manager = self.config_manager_class( parent=self, log=self.log, @@ -2034,9 +1922,8 @@ def init_webapp(self): self.web_app = ServerWebApplication( self, self.default_services, - self.kernel_manager, self.contents_manager, - self.session_manager, + self.session_manager_class, self.kernel_spec_manager, self.config_manager, self.event_logger, @@ -2048,7 +1935,6 @@ def init_webapp(self): self.jinja_environment_options, authorizer=self.authorizer, identity_provider=self.identity_provider, - kernel_websocket_connection_class=self.kernel_websocket_connection_class, ) if self.certfile: self.ssl_options["certfile"] = self.certfile @@ -2576,7 +2462,7 @@ def running_server_info(self, kernel_count=True): "Return the current working directory and the server url information" info = self.contents_manager.info_string() + "\n" if kernel_count: - n_kernels = len(self.kernel_manager.list_kernel_ids()) + n_kernels = len(self.web_app.settings["kernel_manager"].list_kernel_ids()) kernel_msg = trans.ngettext("%d active kernel", "%d active kernels", n_kernels) info += kernel_msg % n_kernels info += "\n" @@ -2586,11 +2472,11 @@ def running_server_info(self, kernel_count=True): version=ServerApp.version, url=self.display_url ) ) - if self.gateway_config.gateway_enabled: - info += ( - _i18n("\nKernels will be managed by the Gateway server running at:\n%s") - % self.gateway_config.url - ) + #if self.gateway_config.gateway_enabled: + # info += ( + # _i18n("\nKernels will be managed by the Gateway server running at:\n%s") + # % self.gateway_config.url + # ) return info def server_info(self): @@ -2848,7 +2734,7 @@ async def _cleanup(self): self.remove_browser_open_files() await self.cleanup_extensions() await self.cleanup_kernels() - await self.kernel_websocket_connection_class.close_all() + await self.web_app.settings["kernel_websocket_connection_class"].close_all() if getattr(self, "kernel_manager", None): self.kernel_manager.__del__() if getattr(self, "session_manager", None): diff --git a/jupyter_server/services/kernels/__init__.py b/jupyter_server/services/kernels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/jupyter_server/services/kernels/connection/__init__.py b/jupyter_server/services/kernels/connection/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/jupyter_server/services/kernels/connection/abc.py b/jupyter_server/services/kernels/connection/abc.py deleted file mode 100644 index 4bdf6e3edc..0000000000 --- a/jupyter_server/services/kernels/connection/abc.py +++ /dev/null @@ -1,33 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - - -class KernelWebsocketConnectionABC(ABC): - """ - This class defines a minimal interface that should - be used to bridge the connection between Jupyter - Server's websocket API and a kernel's ZMQ socket - interface. - """ - - websocket_handler: Any - - @abstractmethod - async def connect(self): - """Connect the kernel websocket to the kernel ZMQ connections""" - ... - - @abstractmethod - async def disconnect(self): - """Disconnect the kernel websocket from the kernel ZMQ connections""" - ... - - @abstractmethod - def handle_incoming_message(self, incoming_msg: str) -> None: - """Broker the incoming websocket message to the appropriate ZMQ channel.""" - ... - - @abstractmethod - def handle_outgoing_message(self, stream: str, outgoing_msg: list) -> None: - """Broker outgoing ZMQ messages to the kernel websocket.""" - ... diff --git a/jupyter_server/services/kernels/connection/base.py b/jupyter_server/services/kernels/connection/base.py deleted file mode 100644 index 880c5e69d3..0000000000 --- a/jupyter_server/services/kernels/connection/base.py +++ /dev/null @@ -1,156 +0,0 @@ -import json -import struct -import sys - -from jupyter_client.session import Session -from tornado.websocket import WebSocketHandler -from traitlets import Float, Instance, default -from traitlets.config import LoggingConfigurable - -try: - from jupyter_client.jsonutil import json_default -except ImportError: - from jupyter_client.jsonutil import date_default as json_default - -from jupyter_client.jsonutil import extract_dates - -from .abc import KernelWebsocketConnectionABC - - -def serialize_binary_message(msg): - """serialize a message as a binary blob - - Header: - - 4 bytes: number of msg parts (nbufs) as 32b int - 4 * nbufs bytes: offset for each buffer as integer as 32b int - - Offsets are from the start of the buffer, including the header. - - Returns - ------- - The message serialized to bytes. - - """ - # don't modify msg or buffer list in-place - msg = msg.copy() - buffers = list(msg.pop("buffers")) - if sys.version_info < (3, 4): - buffers = [x.tobytes() for x in buffers] - bmsg = json.dumps(msg, default=json_default).encode("utf8") - buffers.insert(0, bmsg) - nbufs = len(buffers) - offsets = [4 * (nbufs + 1)] - for buf in buffers[:-1]: - offsets.append(offsets[-1] + len(buf)) - offsets_buf = struct.pack("!" + "I" * (nbufs + 1), nbufs, *offsets) - buffers.insert(0, offsets_buf) - return b"".join(buffers) - - -def deserialize_binary_message(bmsg): - """deserialize a message from a binary blog - - Header: - - 4 bytes: number of msg parts (nbufs) as 32b int - 4 * nbufs bytes: offset for each buffer as integer as 32b int - - Offsets are from the start of the buffer, including the header. - - Returns - ------- - message dictionary - """ - nbufs = struct.unpack("!i", bmsg[:4])[0] - offsets = list(struct.unpack("!" + "I" * nbufs, bmsg[4 : 4 * (nbufs + 1)])) - offsets.append(None) - bufs = [] - for start, stop in zip(offsets[:-1], offsets[1:]): - bufs.append(bmsg[start:stop]) - msg = json.loads(bufs[0].decode("utf8")) - msg["header"] = extract_dates(msg["header"]) - msg["parent_header"] = extract_dates(msg["parent_header"]) - msg["buffers"] = bufs[1:] - return msg - - -def serialize_msg_to_ws_v1(msg_or_list, channel, pack=None): - if pack: - msg_list = [ - pack(msg_or_list["header"]), - pack(msg_or_list["parent_header"]), - pack(msg_or_list["metadata"]), - pack(msg_or_list["content"]), - ] - else: - msg_list = msg_or_list - channel = channel.encode("utf-8") - offsets: list = [] - offsets.append(8 * (1 + 1 + len(msg_list) + 1)) - offsets.append(len(channel) + offsets[-1]) - for msg in msg_list: - offsets.append(len(msg) + offsets[-1]) - offset_number = len(offsets).to_bytes(8, byteorder="little") - offsets = [offset.to_bytes(8, byteorder="little") for offset in offsets] - bin_msg = b"".join([offset_number] + offsets + [channel] + msg_list) - return bin_msg - - -def deserialize_msg_from_ws_v1(ws_msg): - offset_number = int.from_bytes(ws_msg[:8], "little") - offsets = [ - int.from_bytes(ws_msg[8 * (i + 1) : 8 * (i + 2)], "little") for i in range(offset_number) - ] - channel = ws_msg[offsets[0] : offsets[1]].decode("utf-8") - msg_list = [ws_msg[offsets[i] : offsets[i + 1]] for i in range(1, offset_number - 1)] - return channel, msg_list - - -class BaseKernelWebsocketConnection(LoggingConfigurable): - """A configurable base class for connecting Kernel WebSockets to ZMQ sockets.""" - - @property - def kernel_manager(self): - return self.parent - - @property - def multi_kernel_manager(self): - return self.kernel_manager.parent - - @property - def kernel_id(self): - return self.kernel_manager.kernel_id - - @property - def session_id(self): - return self.session.session - - kernel_info_timeout = Float() - - @default("kernel_info_timeout") - def _default_kernel_info_timeout(self): - return self.multi_kernel_manager.kernel_info_timeout - - session = Instance(klass=Session, config=True) - - @default("session") - def _default_session(self): - return Session(config=self.config) - - websocket_handler = Instance(WebSocketHandler) - - async def connect(self): - raise NotImplementedError() - - async def disconnect(self): - raise NotImplementedError() - - def handle_incoming_message(self, incoming_msg: str) -> None: - raise NotImplementedError() - - def handle_outgoing_message(self, stream: str, outgoing_msg: list) -> None: - raise NotImplementedError() - - -KernelWebsocketConnectionABC.register(BaseKernelWebsocketConnection) diff --git a/jupyter_server/services/kernels/connection/channels.py b/jupyter_server/services/kernels/connection/channels.py deleted file mode 100644 index 13beab3cc3..0000000000 --- a/jupyter_server/services/kernels/connection/channels.py +++ /dev/null @@ -1,812 +0,0 @@ -import asyncio -import json -import time -import weakref -from concurrent.futures import Future -from textwrap import dedent -from typing import MutableSet - -from jupyter_client import protocol_version as client_protocol_version -from tornado import gen, web -from tornado.ioloop import IOLoop -from tornado.websocket import WebSocketClosedError -from traitlets import Any, Bool, Dict, Float, Instance, Int, List, Unicode, default - -try: - from jupyter_client.jsonutil import json_default -except ImportError: - from jupyter_client.jsonutil import date_default as json_default - -from jupyter_client.utils import ensure_async - -from jupyter_server.transutils import _i18n - -from .abc import KernelWebsocketConnectionABC -from .base import ( - BaseKernelWebsocketConnection, - deserialize_binary_message, - deserialize_msg_from_ws_v1, - serialize_binary_message, - serialize_msg_to_ws_v1, -) - - -def _ensure_future(f): - """Wrap a concurrent future as an asyncio future if there is a running loop.""" - try: - asyncio.get_running_loop() - return asyncio.wrap_future(f) - except RuntimeError: - return f - - -class ZMQChannelsWebsocketConnection(BaseKernelWebsocketConnection): - """A Jupyter Server Websocket Connection""" - - limit_rate = Bool( - True, - config=True, - help=_i18n( - "Whether to limit the rate of IOPub messages (default: True). " - "If True, use iopub_msg_rate_limit, iopub_data_rate_limit and/or rate_limit_window " - "to tune the rate." - ), - ) - - iopub_msg_rate_limit = Float( - 1000, - config=True, - help=_i18n( - """(msgs/sec) - Maximum rate at which messages can be sent on iopub before they are - limited.""" - ), - ) - - iopub_data_rate_limit = Float( - 1000000, - config=True, - help=_i18n( - """(bytes/sec) - Maximum rate at which stream output can be sent on iopub before they are - limited.""" - ), - ) - - rate_limit_window = Float( - 3, - config=True, - help=_i18n( - """(sec) Time window used to - check the message and data rate limits.""" - ), - ) - - kernel_ws_protocol = Unicode( - None, - allow_none=True, - config=True, - help=_i18n( - "Preferred kernel message protocol over websocket to use (default: None). " - "If an empty string is passed, select the legacy protocol. If None, " - "the selected protocol will depend on what the front-end supports " - "(usually the most recent protocol supported by the back-end and the " - "front-end)." - ), - ) - - @property - def write_message(self): - """Alias to the websocket handler's write_message method.""" - return self.websocket_handler.write_message - - # class-level registry of open sessions - # allows checking for conflict on session-id, - # which is used as a zmq identity and must be unique. - _open_sessions: dict = {} - _open_sockets: MutableSet["ZMQChannelsWebsocketConnection"] = weakref.WeakSet() - - _kernel_info_future: Future - _close_future: Future - - channels = Dict({}) - kernel_info_channel = Any(allow_none=True) - - _kernel_info_future = Instance(klass=Future) - - @default("_kernel_info_future") - def _default_kernel_info_future(self): - return Future() - - _close_future = Instance(klass=Future) - - @default("_close_future") - def _default_close_future(self): - return Future() - - session_key = Unicode("") - - _iopub_window_msg_count = Int() - _iopub_window_byte_count = Int() - _iopub_msgs_exceeded = Bool(False) - _iopub_data_exceeded = Bool(False) - # Queue of (time stamp, byte count) - # Allows you to specify that the byte count should be lowered - # by a delta amount at some point in the future. - _iopub_window_byte_queue = List([]) - - @classmethod - async def close_all(cls): - """Tornado does not provide a way to close open sockets, so add one.""" - for connection in list(cls._open_sockets): - connection.disconnect() - await _ensure_future(connection._close_future) - - @property - def subprotocol(self): - try: - protocol = self.websocket_handler.selected_subprotocol - except Exception: - protocol = None - return protocol - - def create_stream(self): - identity = self.session.bsession - for channel in ("iopub", "shell", "control", "stdin"): - meth = getattr(self.kernel_manager, "connect_" + channel) - self.channels[channel] = stream = meth(identity=identity) - stream.channel = channel - - def nudge(self): - """Nudge the zmq connections with kernel_info_requests - Returns a Future that will resolve when we have received - a shell or control reply and at least one iopub message, - ensuring that zmq subscriptions are established, - sockets are fully connected, and kernel is responsive. - Keeps retrying kernel_info_request until these are both received. - """ - # Do not nudge busy kernels as kernel info requests sent to shell are - # queued behind execution requests. - # nudging in this case would cause a potentially very long wait - # before connections are opened, - # plus it is *very* unlikely that a busy kernel will not finish - # establishing its zmq subscriptions before processing the next request. - if getattr(self.kernel_manager, "execution_state", None) == "busy": - self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id) - f: Future = Future() - f.set_result(None) - return _ensure_future(f) - # Use a transient shell channel to prevent leaking - # shell responses to the front-end. - shell_channel = self.kernel_manager.connect_shell() - # Use a transient control channel to prevent leaking - # control responses to the front-end. - control_channel = self.kernel_manager.connect_control() - # The IOPub used by the client, whose subscriptions we are verifying. - iopub_channel = self.channels["iopub"] - - info_future: Future = Future() - iopub_future: Future = Future() - both_done = gen.multi([info_future, iopub_future]) - - def finish(_=None): - """Ensure all futures are resolved - which in turn triggers cleanup - """ - for f in (info_future, iopub_future): - if not f.done(): - f.set_result(None) - - def cleanup(_=None): - """Common cleanup""" - loop.remove_timeout(nudge_handle) - iopub_channel.stop_on_recv() - if not shell_channel.closed(): - shell_channel.close() - if not control_channel.closed(): - control_channel.close() - - # trigger cleanup when both message futures are resolved - both_done.add_done_callback(cleanup) - - def on_shell_reply(msg): - self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) - if not info_future.done(): - self.log.debug("Nudge: resolving shell future: %s", self.kernel_id) - info_future.set_result(None) - - def on_control_reply(msg): - self.log.debug("Nudge: control info reply received: %s", self.kernel_id) - if not info_future.done(): - self.log.debug("Nudge: resolving control future: %s", self.kernel_id) - info_future.set_result(None) - - def on_iopub(msg): - self.log.debug("Nudge: IOPub received: %s", self.kernel_id) - if not iopub_future.done(): - iopub_channel.stop_on_recv() - self.log.debug("Nudge: resolving iopub future: %s", self.kernel_id) - iopub_future.set_result(None) - - iopub_channel.on_recv(on_iopub) - shell_channel.on_recv(on_shell_reply) - control_channel.on_recv(on_control_reply) - loop = IOLoop.current() - - # Nudge the kernel with kernel info requests until we get an IOPub message - def nudge(count): - count += 1 - # check for stopped kernel - if self.kernel_id not in self.multi_kernel_manager: - self.log.debug("Nudge: cancelling on stopped kernel: %s", self.kernel_id) - finish() - return - - # check for closed zmq socket - if shell_channel.closed(): - self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id) - finish() - return - - # check for closed zmq socket - if control_channel.closed(): - self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id) - finish() - return - - if not both_done.done(): - log = self.log.warning if count % 10 == 0 else self.log.debug - log(f"Nudge: attempt {count} on kernel {self.kernel_id}") - self.session.send(shell_channel, "kernel_info_request") - self.session.send(control_channel, "kernel_info_request") - nonlocal nudge_handle # type: ignore[misc] - nudge_handle = loop.call_later(0.5, nudge, count) - - nudge_handle = loop.call_later(0, nudge, count=0) - - # resolve with a timeout if we get no response - future = gen.with_timeout(loop.time() + self.kernel_info_timeout, both_done) - # ensure we have no dangling resources or unresolved Futures in case of timeout - future.add_done_callback(finish) - return _ensure_future(future) - - async def _register_session(self): - """Ensure we aren't creating a duplicate session. - - If a previous identical session is still open, close it to avoid collisions. - This is likely due to a client reconnecting from a lost network connection, - where the socket on our side has not been cleaned up yet. - """ - self.session_key = f"{self.kernel_id}:{self.session.session}" - stale_handler = self._open_sessions.get(self.session_key) - if stale_handler: - self.log.warning("Replacing stale connection: %s", self.session_key) - stale_handler.close() - if ( - self.kernel_id in self.multi_kernel_manager - ): # only update open sessions if kernel is actively managed - self._open_sessions[self.session_key] = self.websocket_handler - - async def prepare(self): - # check session collision: - await self._register_session() - # then request kernel info, waiting up to a certain time before giving up. - # We don't want to wait forever, because browsers don't take it well when - # servers never respond to websocket connection requests. - - if hasattr(self.kernel_manager, "ready"): - ready = self.kernel_manager.ready - if not isinstance(ready, asyncio.Future): - ready = asyncio.wrap_future(ready) - try: - await ready - except Exception as e: - self.kernel_manager.execution_state = "dead" - self.kernel_manager.reason = str(e) - raise web.HTTPError(500, str(e)) from e - - t0 = time.time() - while not await ensure_async(self.kernel_manager.is_alive()): - await asyncio.sleep(0.1) - if (time.time() - t0) > self.multi_kernel_manager.kernel_info_timeout: - raise TimeoutError("Kernel never reached an 'alive' state.") - - self.session.key = self.kernel_manager.session.key - future = self.request_kernel_info() - - def give_up(): - """Don't wait forever for the kernel to reply""" - if future.done(): - return - self.log.warning("Timeout waiting for kernel_info reply from %s", self.kernel_id) - future.set_result({}) - - loop = IOLoop.current() - loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up) - # actually wait for it - await asyncio.wrap_future(future) - - def connect(self): - self.multi_kernel_manager.notify_connect(self.kernel_id) - - # on new connections, flush the message buffer - buffer_info = self.multi_kernel_manager.get_buffer(self.kernel_id, self.session_key) - if buffer_info and buffer_info["session_key"] == self.session_key: - self.log.info("Restoring connection for %s", self.session_key) - if self.multi_kernel_manager.ports_changed(self.kernel_id): - # If the kernel's ports have changed (some restarts trigger this) - # then reset the channels so nudge() is using the correct iopub channel - self.create_stream() - else: - # The kernel's ports have not changed; use the channels captured in the buffer - self.channels = buffer_info["channels"] - - connected = self.nudge() - - def replay(value): - replay_buffer = buffer_info["buffer"] - if replay_buffer: - self.log.info("Replaying %s buffered messages", len(replay_buffer)) - for channel, msg_list in replay_buffer: - stream = self.channels[channel] - self.handle_outgoing_message(stream, msg_list) - - connected.add_done_callback(replay) - else: - try: - self.create_stream() - connected = self.nudge() - except web.HTTPError as e: - # Do not log error if the kernel is already shutdown, - # as it's normal that it's not responding - try: - self.multi_kernel_manager.get_kernel(self.kernel_id) - self.log.error("Error opening stream: %s", e) - except KeyError: - pass - # WebSockets don't respond to traditional error codes so we - # close the connection. - for _, stream in self.channels.items(): - if not stream.closed(): - stream.close() - self.disconnect() - return - - self.multi_kernel_manager.add_restart_callback(self.kernel_id, self.on_kernel_restarted) - self.multi_kernel_manager.add_restart_callback( - self.kernel_id, self.on_restart_failed, "dead" - ) - - def subscribe(value): - for _, stream in self.channels.items(): - stream.on_recv_stream(self.handle_outgoing_message) - - connected.add_done_callback(subscribe) - ZMQChannelsWebsocketConnection._open_sockets.add(self) - return connected - - def close(self): - return self.disconnect() - - def disconnect(self): - self.log.debug("Websocket closed %s", self.session_key) - # unregister myself as an open session (only if it's really me) - if self._open_sessions.get(self.session_key) is self: - self._open_sessions.pop(self.session_key) - - if self.kernel_id in self.multi_kernel_manager: - self.multi_kernel_manager.notify_disconnect(self.kernel_id) - self.multi_kernel_manager.remove_restart_callback( - self.kernel_id, - self.on_kernel_restarted, - ) - self.multi_kernel_manager.remove_restart_callback( - self.kernel_id, - self.on_restart_failed, - "dead", - ) - - # start buffering instead of closing if this was the last connection - if self.multi_kernel_manager._kernel_connections[self.kernel_id] == 0: - self.multi_kernel_manager.start_buffering( - self.kernel_id, self.session_key, self.channels - ) - ZMQChannelsWebsocketConnection._open_sockets.remove(self) - self._close_future.set_result(None) - return - - # This method can be called twice, once by self.kernel_died and once - # from the WebSocket close event. If the WebSocket connection is - # closed before the ZMQ streams are setup, they could be None. - for _, stream in self.channels.items(): - if stream is not None and not stream.closed(): - stream.on_recv(None) - stream.close() - - self.channels = {} - try: - ZMQChannelsWebsocketConnection._open_sockets.remove(self) - self._close_future.set_result(None) - except Exception: - pass - - def handle_incoming_message(self, incoming_msg: str) -> None: - """Handle incoming messages from Websocket to ZMQ Sockets.""" - ws_msg = incoming_msg - if not self.channels: - # already closed, ignore the message - self.log.debug("Received message on closed websocket %r", ws_msg) - return - - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - channel, msg_list = deserialize_msg_from_ws_v1(ws_msg) - msg = { - "header": None, - } - else: - if isinstance(ws_msg, bytes): - msg = deserialize_binary_message(ws_msg) - else: - msg = json.loads(ws_msg) - msg_list = [] - channel = msg.pop("channel", None) - - if channel is None: - self.log.warning("No channel specified, assuming shell: %s", msg) - channel = "shell" - if channel not in self.channels: - self.log.warning("No such channel: %r", channel) - return - am = self.multi_kernel_manager.allowed_message_types - ignore_msg = False - if am: - msg["header"] = self.get_part("header", msg["header"], msg_list) - assert msg["header"] is not None - if msg["header"]["msg_type"] not in am: - self.log.warning( - 'Received message of type "%s", which is not allowed. Ignoring.' - % msg["header"]["msg_type"] - ) - ignore_msg = True - if not ignore_msg: - stream = self.channels[channel] - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - self.session.send_raw(stream, msg_list) - else: - self.session.send(stream, msg) - - def handle_outgoing_message(self, stream: str, outgoing_msg: list) -> None: - """Handle the outgoing messages from ZMQ sockets to Websocket.""" - msg_list = outgoing_msg - _, fed_msg_list = self.session.feed_identities(msg_list) - - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - msg = {"header": None, "parent_header": None, "content": None} - else: - msg = self.session.deserialize(fed_msg_list) - - if isinstance(stream, str): - stream = self.channels[stream] - - channel = getattr(stream, "channel", None) - parts = fed_msg_list[1:] - - self._on_error(channel, msg, parts) - - if self._limit_rate(channel, msg, parts): - return - - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - self._on_zmq_reply(stream, parts) - else: - self._on_zmq_reply(stream, msg) - - def get_part(self, field, value, msg_list): - if value is None: - field2idx = { - "header": 0, - "parent_header": 1, - "content": 3, - } - value = self.session.unpack(msg_list[field2idx[field]]) - return value - - def _reserialize_reply(self, msg_or_list, channel=None): - """Reserialize a reply message using JSON. - - msg_or_list can be an already-deserialized msg dict or the zmq buffer list. - If it is the zmq list, it will be deserialized with self.session. - - This takes the msg list from the ZMQ socket and serializes the result for the websocket. - This method should be used by self._on_zmq_reply to build messages that can - be sent back to the browser. - - """ - if isinstance(msg_or_list, dict): - # already unpacked - msg = msg_or_list - else: - _, msg_list = self.session.feed_identities(msg_or_list) - msg = self.session.deserialize(msg_list) - if channel: - msg["channel"] = channel - if msg["buffers"]: - buf = serialize_binary_message(msg) - return buf - else: - return json.dumps(msg, default=json_default) - - def select_subprotocol(self, subprotocols): - preferred_protocol = self.kernel_ws_protocol - if preferred_protocol is None: - preferred_protocol = "v1.kernel.websocket.jupyter.org" - elif preferred_protocol == "": - preferred_protocol = None - selected_subprotocol = preferred_protocol if preferred_protocol in subprotocols else None - # None is the default, "legacy" protocol - return selected_subprotocol - - def _on_zmq_reply(self, stream, msg_list): - # Sometimes this gets triggered when the on_close method is scheduled in the - # eventloop but hasn't been called. - if stream.closed(): - self.log.warning("zmq message arrived on closed channel") - self.disconnect() - return - channel = getattr(stream, "channel", None) - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - bin_msg = serialize_msg_to_ws_v1(msg_list, channel) - self.write_message(bin_msg, binary=True) - else: - try: - msg = self._reserialize_reply(msg_list, channel=channel) - except Exception: - self.log.critical("Malformed message: %r" % msg_list, exc_info=True) - else: - try: - self.write_message(msg, binary=isinstance(msg, bytes)) - except WebSocketClosedError as e: - self.log.warning(str(e)) - - def request_kernel_info(self): - """send a request for kernel_info""" - try: - # check for previous request - future = self.kernel_manager._kernel_info_future - except AttributeError: - self.log.debug("Requesting kernel info from %s", self.kernel_id) - # Create a kernel_info channel to query the kernel protocol version. - # This channel will be closed after the kernel_info reply is received. - if self.kernel_info_channel is None: - self.kernel_info_channel = self.multi_kernel_manager.connect_shell(self.kernel_id) - assert self.kernel_info_channel is not None - self.kernel_info_channel.on_recv(self._handle_kernel_info_reply) - self.session.send(self.kernel_info_channel, "kernel_info_request") - # store the future on the kernel, so only one request is sent - self.kernel_manager._kernel_info_future = self._kernel_info_future - else: - if not future.done(): - self.log.debug("Waiting for pending kernel_info request") - future.add_done_callback(lambda f: self._finish_kernel_info(f.result())) - return _ensure_future(self._kernel_info_future) - - def _handle_kernel_info_reply(self, msg): - """process the kernel_info_reply - - enabling msg spec adaptation, if necessary - """ - idents, msg = self.session.feed_identities(msg) - try: - msg = self.session.deserialize(msg) - except BaseException: - self.log.error("Bad kernel_info reply", exc_info=True) - self._kernel_info_future.set_result({}) - return - else: - info = msg["content"] - self.log.debug("Received kernel info: %s", info) - if msg["msg_type"] != "kernel_info_reply" or "protocol_version" not in info: - self.log.error("Kernel info request failed, assuming current %s", info) - info = {} - self._finish_kernel_info(info) - - # close the kernel_info channel, we don't need it anymore - if self.kernel_info_channel: - self.kernel_info_channel.close() - self.kernel_info_channel = None - - def _finish_kernel_info(self, info): - """Finish handling kernel_info reply - - Set up protocol adaptation, if needed, - and signal that connection can continue. - """ - protocol_version = info.get("protocol_version", client_protocol_version) - if protocol_version != client_protocol_version: - self.session.adapt_version = int(protocol_version.split(".")[0]) - self.log.info( - "Adapting from protocol version {protocol_version} (kernel {kernel_id}) to {client_protocol_version} (client).".format( - protocol_version=protocol_version, - kernel_id=self.kernel_id, - client_protocol_version=client_protocol_version, - ) - ) - if not self._kernel_info_future.done(): - self._kernel_info_future.set_result(info) - - def write_stderr(self, error_message, parent_header): - self.log.warning(error_message) - err_msg = self.session.msg( - "stream", - content={"text": error_message + "\n", "name": "stderr"}, - parent=parent_header, - ) - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - bin_msg = serialize_msg_to_ws_v1(err_msg, "iopub", self.session.pack) - self.write_message(bin_msg, binary=True) - else: - err_msg["channel"] = "iopub" - self.write_message(json.dumps(err_msg, default=json_default)) - - def _limit_rate(self, channel, msg, msg_list): - if not (self.limit_rate and channel == "iopub"): - return False - - msg["header"] = self.get_part("header", msg["header"], msg_list) - - msg_type = msg["header"]["msg_type"] - if msg_type == "status": - msg["content"] = self.get_part("content", msg["content"], msg_list) - if msg["content"].get("execution_state") == "idle": - # reset rate limit counter on status=idle, - # to avoid 'Run All' hitting limits prematurely. - self._iopub_window_byte_queue = [] - self._iopub_window_msg_count = 0 - self._iopub_window_byte_count = 0 - self._iopub_msgs_exceeded = False - self._iopub_data_exceeded = False - - if msg_type not in {"status", "comm_open", "execute_input"}: - # Remove the counts queued for removal. - now = IOLoop.current().time() - while len(self._iopub_window_byte_queue) > 0: - queued = self._iopub_window_byte_queue[0] - if now >= queued[0]: - self._iopub_window_byte_count -= queued[1] - self._iopub_window_msg_count -= 1 - del self._iopub_window_byte_queue[0] - else: - # This part of the queue hasn't be reached yet, so we can - # abort the loop. - break - - # Increment the bytes and message count - self._iopub_window_msg_count += 1 - if msg_type == "stream": - byte_count = sum(len(x) for x in msg_list) - else: - byte_count = 0 - self._iopub_window_byte_count += byte_count - - # Queue a removal of the byte and message count for a time in the - # future, when we are no longer interested in it. - self._iopub_window_byte_queue.append((now + self.rate_limit_window, byte_count)) - - # Check the limits, set the limit flags, and reset the - # message and data counts. - msg_rate = float(self._iopub_window_msg_count) / self.rate_limit_window - data_rate = float(self._iopub_window_byte_count) / self.rate_limit_window - - # Check the msg rate - if self.iopub_msg_rate_limit > 0 and msg_rate > self.iopub_msg_rate_limit: - if not self._iopub_msgs_exceeded: - self._iopub_msgs_exceeded = True - msg["parent_header"] = self.get_part( - "parent_header", msg["parent_header"], msg_list - ) - self.write_stderr( - dedent( - """\ - IOPub message rate exceeded. - The Jupyter server will temporarily stop sending output - to the client in order to avoid crashing it. - To change this limit, set the config variable - `--ServerApp.iopub_msg_rate_limit`. - - Current values: - ServerApp.iopub_msg_rate_limit={} (msgs/sec) - ServerApp.rate_limit_window={} (secs) - """.format( - self.iopub_msg_rate_limit, self.rate_limit_window - ) - ), - msg["parent_header"], - ) - else: - # resume once we've got some headroom below the limit - if self._iopub_msgs_exceeded and msg_rate < (0.8 * self.iopub_msg_rate_limit): - self._iopub_msgs_exceeded = False - if not self._iopub_data_exceeded: - self.log.warning("iopub messages resumed") - - # Check the data rate - if self.iopub_data_rate_limit > 0 and data_rate > self.iopub_data_rate_limit: - if not self._iopub_data_exceeded: - self._iopub_data_exceeded = True - msg["parent_header"] = self.get_part( - "parent_header", msg["parent_header"], msg_list - ) - self.write_stderr( - dedent( - """\ - IOPub data rate exceeded. - The Jupyter server will temporarily stop sending output - to the client in order to avoid crashing it. - To change this limit, set the config variable - `--ServerApp.iopub_data_rate_limit`. - - Current values: - ServerApp.iopub_data_rate_limit={} (bytes/sec) - ServerApp.rate_limit_window={} (secs) - """.format( - self.iopub_data_rate_limit, self.rate_limit_window - ) - ), - msg["parent_header"], - ) - else: - # resume once we've got some headroom below the limit - if self._iopub_data_exceeded and data_rate < (0.8 * self.iopub_data_rate_limit): - self._iopub_data_exceeded = False - if not self._iopub_msgs_exceeded: - self.log.warning("iopub messages resumed") - - # If either of the limit flags are set, do not send the message. - if self._iopub_msgs_exceeded or self._iopub_data_exceeded: - # we didn't send it, remove the current message from the calculus - self._iopub_window_msg_count -= 1 - self._iopub_window_byte_count -= byte_count - self._iopub_window_byte_queue.pop(-1) - return True - - return False - - def _send_status_message(self, status): - iopub = self.channels.get("iopub", None) - if iopub and not iopub.closed(): - # flush IOPub before sending a restarting/dead status message - # ensures proper ordering on the IOPub channel - # that all messages from the stopped kernel have been delivered - iopub.flush() - msg = self.session.msg("status", {"execution_state": status}) - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - bin_msg = serialize_msg_to_ws_v1(msg, "iopub", self.session.pack) - self.write_message(bin_msg, binary=True) - else: - msg["channel"] = "iopub" - self.write_message(json.dumps(msg, default=json_default)) - - def on_kernel_restarted(self): - self.log.warning("kernel %s restarted", self.kernel_id) - self._send_status_message("restarting") - - def on_restart_failed(self): - self.log.error("kernel %s restarted failed!", self.kernel_id) - self._send_status_message("dead") - - def _on_error(self, channel, msg, msg_list): - if self.multi_kernel_manager.allow_tracebacks: - return - - if channel == "iopub": - msg["header"] = self.get_part("header", msg["header"], msg_list) - if msg["header"]["msg_type"] == "error": - msg["content"] = self.get_part("content", msg["content"], msg_list) - msg["content"]["ename"] = "ExecutionError" - msg["content"]["evalue"] = "Execution error" - msg["content"]["traceback"] = [self.kernel_manager.traceback_replacement_message] - if self.subprotocol == "v1.kernel.websocket.jupyter.org": - msg_list[3] = self.session.pack(msg["content"]) - - -KernelWebsocketConnectionABC.register(ZMQChannelsWebsocketConnection) diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py deleted file mode 100644 index ef2f2e0c74..0000000000 --- a/jupyter_server/services/kernels/handlers.py +++ /dev/null @@ -1,111 +0,0 @@ -"""Tornado handlers for kernels. - -Preliminary documentation at https://github.com/ipython/ipython/wiki/IPEP-16%3A-Notebook-multi-directory-dashboard-and-URL-mapping#kernels-api -""" -# Copyright (c) Jupyter Development Team. -# Distributed under the terms of the Modified BSD License. -import json -from traceback import format_tb - -try: - from jupyter_client.jsonutil import json_default -except ImportError: - from jupyter_client.jsonutil import date_default as json_default - -from tornado import web - -from jupyter_server.auth import authorized -from jupyter_server.utils import ensure_async, url_escape, url_path_join - -from ...base.handlers import APIHandler - -AUTH_RESOURCE = "kernels" - - -class KernelsAPIHandler(APIHandler): - auth_resource = AUTH_RESOURCE - - -class MainKernelHandler(KernelsAPIHandler): - @web.authenticated - @authorized - async def get(self): - km = self.kernel_manager - kernels = await ensure_async(km.list_kernels()) - self.finish(json.dumps(kernels, default=json_default)) - - @web.authenticated - @authorized - async def post(self): - km = self.kernel_manager - model = self.get_json_body() - if model is None: - model = {"name": km.default_kernel_name} - else: - model.setdefault("name", km.default_kernel_name) - - kernel_id = await ensure_async( - km.start_kernel(kernel_name=model["name"], path=model.get("path")) - ) - model = await ensure_async(km.kernel_model(kernel_id)) - location = url_path_join(self.base_url, "api", "kernels", url_escape(kernel_id)) - self.set_header("Location", location) - self.set_status(201) - self.finish(json.dumps(model, default=json_default)) - - -class KernelHandler(KernelsAPIHandler): - @web.authenticated - @authorized - async def get(self, kernel_id): - km = self.kernel_manager - model = await ensure_async(km.kernel_model(kernel_id)) - self.finish(json.dumps(model, default=json_default)) - - @web.authenticated - @authorized - async def delete(self, kernel_id): - km = self.kernel_manager - await ensure_async(km.shutdown_kernel(kernel_id)) - self.set_status(204) - self.finish() - - -class KernelActionHandler(KernelsAPIHandler): - @web.authenticated - @authorized - async def post(self, kernel_id, action): - km = self.kernel_manager - if action == "interrupt": - await ensure_async(km.interrupt_kernel(kernel_id)) - self.set_status(204) - if action == "restart": - - try: - await km.restart_kernel(kernel_id) - except Exception as e: - message = "Exception restarting kernel" - self.log.error(message, exc_info=True) - traceback = format_tb(e.__traceback__) - self.write(json.dumps(dict(message=message, traceback=traceback))) - self.set_status(500) - else: - model = await ensure_async(km.kernel_model(kernel_id)) - self.write(json.dumps(model, default=json_default)) - self.finish() - - -# ----------------------------------------------------------------------------- -# URL to handler mappings -# ----------------------------------------------------------------------------- -_kernel_id_regex = r"(?P\w+-\w+-\w+-\w+-\w+)" -_kernel_action_regex = r"(?Prestart|interrupt)" - -default_handlers = [ - (r"/api/kernels", MainKernelHandler), - (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler), - ( - rf"/api/kernels/{_kernel_id_regex}/{_kernel_action_regex}", - KernelActionHandler, - ), -] diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py deleted file mode 100644 index a657f08562..0000000000 --- a/jupyter_server/services/kernels/kernelmanager.py +++ /dev/null @@ -1,691 +0,0 @@ -"""A MultiKernelManager for use in the Jupyter server - -- raises HTTPErrors -- creates REST API models -""" -# Copyright (c) Jupyter Development Team. -# Distributed under the terms of the Modified BSD License. -import asyncio -import os -import warnings -from collections import defaultdict -from datetime import datetime, timedelta -from functools import partial - -from jupyter_client.ioloop.manager import AsyncIOLoopKernelManager -from jupyter_client.multikernelmanager import ( - AsyncMultiKernelManager, - MultiKernelManager, -) -from jupyter_client.session import Session -from jupyter_core.paths import exists -from tornado import web -from tornado.concurrent import Future -from tornado.ioloop import IOLoop, PeriodicCallback -from traitlets import ( - Any, - Bool, - Dict, - Float, - Instance, - Integer, - List, - TraitError, - Unicode, - default, - validate, -) - -from jupyter_server._tz import isoformat, utcnow -from jupyter_server.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL -from jupyter_server.utils import ensure_async, import_item, to_os_path - - -class MappingKernelManager(MultiKernelManager): - """A KernelManager that handles - - File mapping - - HTTP error handling - - Kernel message filtering - """ - - @default("kernel_manager_class") - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.IOLoopKernelManager" - - kernel_argv = List(Unicode()) - - root_dir = Unicode(config=True) - - _kernel_connections = Dict() - - _kernel_ports = Dict() - - _culler_callback = None - - _initialized_culler = False - - @default("root_dir") - def _default_root_dir(self): - try: - return self.parent.root_dir - except AttributeError: - return os.getcwd() - - @validate("root_dir") - def _update_root_dir(self, proposal): - """Do a bit of validation of the root dir.""" - value = proposal["value"] - if not os.path.isabs(value): - # If we receive a non-absolute path, make it absolute. - value = os.path.abspath(value) - if not exists(value) or not os.path.isdir(value): - raise TraitError("kernel root dir %r is not a directory" % value) - return value - - cull_idle_timeout = Integer( - 0, - config=True, - help="""Timeout (in seconds) after which a kernel is considered idle and ready to be culled. - Values of 0 or lower disable culling. Very short timeouts may result in kernels being culled - for users with poor network connections.""", - ) - - cull_interval_default = 300 # 5 minutes - cull_interval = Integer( - cull_interval_default, - config=True, - help="""The interval (in seconds) on which to check for idle kernels exceeding the cull timeout value.""", - ) - - cull_connected = Bool( - False, - config=True, - help="""Whether to consider culling kernels which have one or more connections. - Only effective if cull_idle_timeout > 0.""", - ) - - cull_busy = Bool( - False, - config=True, - help="""Whether to consider culling kernels which are busy. - Only effective if cull_idle_timeout > 0.""", - ) - - buffer_offline_messages = Bool( - True, - config=True, - help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory. - - When True (default), messages are buffered and replayed on reconnect, - avoiding lost messages due to interrupted connectivity. - - Disable if long-running kernels will produce too much output while - no frontends are connected. - """, - ) - - kernel_info_timeout = Float( - 60, - config=True, - help="""Timeout for giving up on a kernel (in seconds). - - On starting and restarting kernels, we check whether the - kernel is running and responsive by sending kernel_info_requests. - This sets the timeout in seconds for how long the kernel can take - before being presumed dead. - This affects the MappingKernelManager (which handles kernel restarts) - and the ZMQChannelsHandler (which handles the startup). - """, - ) - - _kernel_buffers = Any() - - @default("_kernel_buffers") - def _default_kernel_buffers(self): - return defaultdict(lambda: {"buffer": [], "session_key": "", "channels": {}}) - - last_kernel_activity = Instance( - datetime, - help="The last activity on any kernel, including shutting down a kernel", - ) - - def __init__(self, **kwargs): - self.pinned_superclass = MultiKernelManager - self._pending_kernel_tasks = {} - self.pinned_superclass.__init__(self, **kwargs) - self.last_kernel_activity = utcnow() - - allowed_message_types = List( - trait=Unicode(), - config=True, - help="""White list of allowed kernel message types. - When the list is empty, all message types are allowed. - """, - ) - - allow_tracebacks = Bool( - True, config=True, help=("Whether to send tracebacks to clients on exceptions.") - ) - - traceback_replacement_message = Unicode( - "An exception occurred at runtime, which is not shown due to security reasons.", - config=True, - help=("Message to print when allow_tracebacks is False, and an exception occurs"), - ) - - # ------------------------------------------------------------------------- - # Methods for managing kernels and sessions - # ------------------------------------------------------------------------- - - def _handle_kernel_died(self, kernel_id): - """notice that a kernel died""" - self.log.warning("Kernel %s died, removing from map.", kernel_id) - self.remove_kernel(kernel_id) - - def cwd_for_path(self, path, **kwargs): - """Turn API path into absolute OS path.""" - os_path = to_os_path(path, self.root_dir) - # in the case of documents and kernels not being on the same filesystem, - # walk up to root_dir if the paths don't exist - while not os.path.isdir(os_path) and os_path != self.root_dir: - os_path = os.path.dirname(os_path) - return os_path - - async def _remove_kernel_when_ready(self, kernel_id, kernel_awaitable): - await super()._remove_kernel_when_ready(kernel_id, kernel_awaitable) - self._kernel_connections.pop(kernel_id, None) - self._kernel_ports.pop(kernel_id, None) - - async def _async_start_kernel(self, kernel_id=None, path=None, **kwargs): - """Start a kernel for a session and return its kernel_id. - - Parameters - ---------- - kernel_id : uuid - The uuid to associate the new kernel with. If this - is not None, this kernel will be persistent whenever it is - requested. - path : API path - The API path (unicode, '/' delimited) for the cwd. - Will be transformed to an OS path relative to root_dir. - kernel_name : str - The name identifying which kernel spec to launch. This is ignored if - an existing kernel is returned, but it may be checked in the future. - """ - if kernel_id is None or kernel_id not in self: - if path is not None: - kwargs["cwd"] = self.cwd_for_path(path, env=kwargs.get("env", {})) - if kernel_id is not None: - kwargs["kernel_id"] = kernel_id - kernel_id = await self.pinned_superclass._async_start_kernel(self, **kwargs) - self._kernel_connections[kernel_id] = 0 - task = asyncio.create_task(self._finish_kernel_start(kernel_id)) - if not getattr(self, "use_pending_kernels", None): - await task - else: - self._pending_kernel_tasks[kernel_id] = task - # add busy/activity markers: - kernel = self.get_kernel(kernel_id) - kernel.execution_state = "starting" - kernel.reason = "" - kernel.last_activity = utcnow() - self.log.info("Kernel started: %s", kernel_id) - self.log.debug("Kernel args: %r", kwargs) - - # Increase the metric of number of kernels running - # for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).inc() - - else: - self.log.info("Using existing kernel: %s", kernel_id) - - # Initialize culling if not already - if not self._initialized_culler: - self.initialize_culler() - - return kernel_id - - start_kernel = _async_start_kernel - - async def _finish_kernel_start(self, kernel_id): - km = self.get_kernel(kernel_id) - if hasattr(km, "ready"): - ready = km.ready - if not isinstance(ready, asyncio.Future): - ready = asyncio.wrap_future(ready) - try: - await ready - except Exception: - self.log.exception("Error waiting for kernel manager ready") - return - - self._kernel_ports[kernel_id] = km.ports - self.start_watching_activity(kernel_id) - # register callback for failed auto-restart - self.add_restart_callback( - kernel_id, - lambda: self._handle_kernel_died(kernel_id), - "dead", - ) - - def ports_changed(self, kernel_id): - """Used by ZMQChannelsHandler to determine how to coordinate nudge and replays. - - Ports are captured when starting a kernel (via MappingKernelManager). Ports - are considered changed (following restarts) if the referenced KernelManager - is using a set of ports different from those captured at startup. If changes - are detected, the captured set is updated and a value of True is returned. - - NOTE: Use is exclusive to ZMQChannelsHandler because this object is a singleton - instance while ZMQChannelsHandler instances are per WebSocket connection that - can vary per kernel lifetime. - """ - changed_ports = self._get_changed_ports(kernel_id) - if changed_ports: - # If changed, update captured ports and return True, else return False. - self.log.debug("Port change detected for kernel: %s", kernel_id) - self._kernel_ports[kernel_id] = changed_ports - return True - return False - - def _get_changed_ports(self, kernel_id): - """Internal method to test if a kernel's ports have changed and, if so, return their values. - - This method does NOT update the captured ports for the kernel as that can only be done - by ZMQChannelsHandler, but instead returns the new list of ports if they are different - than those captured at startup. This enables the ability to conditionally restart - activity monitoring immediately following a kernel's restart (if ports have changed). - """ - # Get current ports and return comparison with ports captured at startup. - km = self.get_kernel(kernel_id) - if km.ports != self._kernel_ports[kernel_id]: - return km.ports - return None - - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key : str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels : dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - - if not self.buffer_offline_messages: - for _, stream in channels.items(): - stream.close() - return - - self.log.info("Starting buffering for %s", session_key) - self._check_kernel_id(kernel_id) - # clear previous buffering state - self.stop_buffering(kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - # record the session key because only one session can buffer - buffer_info["session_key"] = session_key - # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple - buffer_info["buffer"] = [] - buffer_info["channels"] = channels - - # forward any future messages to the internal buffer - def buffer_msg(channel, msg_parts): - self.log.debug("Buffering msg on %s:%s", kernel_id, channel) - buffer_info["buffer"].append((channel, msg_parts)) - - for channel, stream in channels.items(): - stream.on_recv(partial(buffer_msg, channel)) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key : str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - self.log.debug("Getting buffer for %s", kernel_id) - if kernel_id not in self._kernel_buffers: - return - - buffer_info = self._kernel_buffers[kernel_id] - if buffer_info["session_key"] == session_key: - # remove buffer - self._kernel_buffers.pop(kernel_id) - # only return buffer_info if it's a match - return buffer_info - else: - self.stop_buffering(kernel_id) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - """ - self.log.debug("Clearing buffer for %s", kernel_id) - self._check_kernel_id(kernel_id) - - if kernel_id not in self._kernel_buffers: - return - buffer_info = self._kernel_buffers.pop(kernel_id) - # close buffering streams - for stream in buffer_info["channels"].values(): - if not stream.socket.closed: - stream.on_recv(None) - stream.close() - - msg_buffer = buffer_info["buffer"] - if msg_buffer: - self.log.info( - "Discarding %s buffered messages for %s", - len(msg_buffer), - buffer_info["session_key"], - ) - - async def _async_shutdown_kernel(self, kernel_id, now=False, restart=False): - """Shutdown a kernel by kernel_id""" - self._check_kernel_id(kernel_id) - - # Decrease the metric of number of kernels - # running for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).dec() - - if kernel_id in self._pending_kernel_tasks: - task = self._pending_kernel_tasks.pop(kernel_id) - task.cancel() - - self.stop_watching_activity(kernel_id) - self.stop_buffering(kernel_id) - - return await self.pinned_superclass._async_shutdown_kernel( - self, kernel_id, now=now, restart=restart - ) - - shutdown_kernel = _async_shutdown_kernel - - async def _async_restart_kernel(self, kernel_id, now=False): - """Restart a kernel by kernel_id""" - self._check_kernel_id(kernel_id) - await self.pinned_superclass._async_restart_kernel(self, kernel_id, now=now) - kernel = self.get_kernel(kernel_id) - # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() - future: Future = Future() - - def finish(): - """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() - loop.remove_timeout(timeout) - kernel.remove_restart_callback(on_restart_failed, "dead") - kernel._pending_restart_cleanup = None - - def on_reply(msg): - self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) - - def on_timeout(): - self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(TimeoutError("Timeout waiting for restart")) - - def on_restart_failed(): - self.log.warning("Restarting kernel failed: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(RuntimeError("Restart failed")) - - kernel.add_restart_callback(on_restart_failed, "dead") - kernel._pending_restart_cleanup = finish - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) - loop = IOLoop.current() - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) - # Re-establish activity watching if ports have changed... - if self._get_changed_ports(kernel_id) is not None: - self.stop_watching_activity(kernel_id) - self.start_watching_activity(kernel_id) - return future - - restart_kernel = _async_restart_kernel - - def notify_connect(self, kernel_id): - """Notice a new connection to a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] += 1 - - def notify_disconnect(self, kernel_id): - """Notice a disconnection from a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] -= 1 - - def kernel_model(self, kernel_id): - """Return a JSON-safe dict representing a kernel - - For use in representing kernels in the JSON APIs. - """ - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - - model = { - "id": kernel_id, - "name": kernel.kernel_name, - "last_activity": isoformat(kernel.last_activity), - "execution_state": kernel.execution_state, - "connections": self._kernel_connections.get(kernel_id, 0), - } - if getattr(kernel, "reason", None): - model["reason"] = kernel.reason - return model - - def list_kernels(self): - """Returns a list of kernel_id's of kernels running.""" - kernels = [] - kernel_ids = self.pinned_superclass.list_kernel_ids(self) - for kernel_id in kernel_ids: - try: - model = self.kernel_model(kernel_id) - kernels.append(model) - except (web.HTTPError, KeyError): - pass # Probably due to a (now) non-existent kernel, continue building the list - return kernels - - # override _check_kernel_id to raise 404 instead of KeyError - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: - raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id) - - # monitoring activity: - - def start_watching_activity(self, kernel_id): - """Start watching IOPub messages on a kernel for activity. - - - update last_activity on every message - - record execution_state from status messages - """ - kernel = self._kernels[kernel_id] - # add busy/activity markers: - kernel.execution_state = "starting" - kernel.reason = "" - kernel.last_activity = utcnow() - kernel._activity_stream = kernel.connect_iopub() - session = Session( - config=kernel.session.config, - key=kernel.session.key, - ) - - def record_activity(msg_list): - """Record an IOPub message arriving from a kernel""" - self.last_kernel_activity = kernel.last_activity = utcnow() - - idents, fed_msg_list = session.feed_identities(msg_list) - msg = session.deserialize(fed_msg_list, content=False) - - msg_type = msg["header"]["msg_type"] - if msg_type == "status": - msg = session.deserialize(fed_msg_list) - kernel.execution_state = msg["content"]["execution_state"] - self.log.debug( - "activity on %s: %s (%s)", - kernel_id, - msg_type, - kernel.execution_state, - ) - else: - self.log.debug("activity on %s: %s", kernel_id, msg_type) - - kernel._activity_stream.on_recv(record_activity) - - def stop_watching_activity(self, kernel_id): - """Stop watching IOPub messages on a kernel for activity.""" - kernel = self._kernels[kernel_id] - if getattr(kernel, "_activity_stream", None): - if not kernel._activity_stream.socket.closed: - kernel._activity_stream.close() - kernel._activity_stream = None - if getattr(kernel, "_pending_restart_cleanup", None): - kernel._pending_restart_cleanup() - - def initialize_culler(self): - """Start idle culler if 'cull_idle_timeout' is greater than zero. - - Regardless of that value, set flag that we've been here. - """ - if not self._initialized_culler and self.cull_idle_timeout > 0: - if self._culler_callback is None: - _ = IOLoop.current() - if self.cull_interval <= 0: # handle case where user set invalid value - self.log.warning( - "Invalid value for 'cull_interval' detected (%s) - using default value (%s).", - self.cull_interval, - self.cull_interval_default, - ) - self.cull_interval = self.cull_interval_default - self._culler_callback = PeriodicCallback( - self.cull_kernels, 1000 * self.cull_interval - ) - self.log.info( - "Culling kernels with idle durations > %s seconds at %s second intervals ...", - self.cull_idle_timeout, - self.cull_interval, - ) - if self.cull_busy: - self.log.info("Culling kernels even if busy") - if self.cull_connected: - self.log.info("Culling kernels even with connected clients") - self._culler_callback.start() - - self._initialized_culler = True - - async def cull_kernels(self): - self.log.debug( - "Polling every %s seconds for kernels idle > %s seconds...", - self.cull_interval, - self.cull_idle_timeout, - ) - """Create a separate list of kernels to avoid conflicting updates while iterating""" - for kernel_id in list(self._kernels): - try: - await self.cull_kernel_if_idle(kernel_id) - except Exception as e: - self.log.exception( - "The following exception was encountered while checking the idle duration of kernel %s: %s", - kernel_id, - e, - ) - - async def cull_kernel_if_idle(self, kernel_id): - kernel = self._kernels[kernel_id] - - if getattr(kernel, "execution_state", None) == "dead": - self.log.warning( - "Culling '%s' dead kernel '%s' (%s).", - kernel.execution_state, - kernel.kernel_name, - kernel_id, - ) - await ensure_async(self.shutdown_kernel(kernel_id)) - return - - if hasattr( - kernel, "last_activity" - ): # last_activity is monkey-patched, so ensure that has occurred - self.log.debug( - "kernel_id=%s, kernel_name=%s, last_activity=%s", - kernel_id, - kernel.kernel_name, - kernel.last_activity, - ) - dt_now = utcnow() - dt_idle = dt_now - kernel.last_activity - # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout) - is_idle_execute = self.cull_busy or (kernel.execution_state != "busy") - connections = self._kernel_connections.get(kernel_id, 0) - is_idle_connected = self.cull_connected or not connections - # Cull the kernel if all three criteria are met - if is_idle_time and is_idle_execute and is_idle_connected: - idle_duration = int(dt_idle.total_seconds()) - self.log.warning( - "Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - kernel.execution_state, - kernel.kernel_name, - kernel_id, - connections, - idle_duration, - ) - await ensure_async(self.shutdown_kernel(kernel_id)) - - -# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, -# overriding only what is different. -class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): - @default("kernel_manager_class") - def _default_kernel_manager_class(self): - return "jupyter_server.services.kernels.kernelmanager.ServerKernelManager" - - @validate("kernel_manager_class") - def _validate_kernel_manager_class(self, proposal): - km_class_value = proposal.value - km_class = import_item(km_class_value) - if not issubclass(km_class, ServerKernelManager): - warnings.warn( - f"KernelManager class '{km_class}' is not a subclass of 'ServerKernelManager'. Custom " - "KernelManager classes should derive from 'ServerKernelManager' beginning with jupyter-server 2.0 " - "or risk missing functionality. Continuing...", - FutureWarning, - stacklevel=3, - ) - return km_class_value - - def __init__(self, **kwargs): - self.pinned_superclass = MultiKernelManager - self._pending_kernel_tasks = {} - self.pinned_superclass.__init__(self, **kwargs) - self.last_kernel_activity = utcnow() - - -class ServerKernelManager(AsyncIOLoopKernelManager): - - # Define activity-related attributes: - execution_state = Unicode( - None, allow_none=True, help="The current execution state of the kernel" - ) - reason = Unicode("", help="The reason for the last failure against the kernel") - last_activity = Instance(datetime, help="The last activity on the kernel") diff --git a/jupyter_server/services/kernels/websocket.py b/jupyter_server/services/kernels/websocket.py deleted file mode 100644 index 2806053a98..0000000000 --- a/jupyter_server/services/kernels/websocket.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Tornado handlers for WebSocket <-> ZMQ sockets.""" -# Copyright (c) Jupyter Development Team. -# Distributed under the terms of the Modified BSD License. - -from tornado import web -from tornado.websocket import WebSocketHandler - -from jupyter_server.base.handlers import JupyterHandler -from jupyter_server.base.websocket import WebSocketMixin - -from .handlers import _kernel_id_regex - -AUTH_RESOURCE = "kernels" - - -class KernelWebsocketHandler(WebSocketMixin, WebSocketHandler, JupyterHandler): - """The kernels websocket should connecte""" - - auth_resource = AUTH_RESOURCE - - @property - def kernel_websocket_connection_class(self): - return self.settings.get("kernel_websocket_connection_class") - - def set_default_headers(self): - """Undo the set_default_headers in JupyterHandler - - which doesn't make sense for websockets - """ - pass - - def get_compression_options(self): - return self.settings.get("websocket_compression_options", None) - - async def pre_get(self): - # authenticate first - user = self.current_user - if user is None: - self.log.warning("Couldn't authenticate WebSocket connection") - raise web.HTTPError(403) - - # authorize the user. - if not self.authorizer.is_authorized(self, user, "execute", "kernels"): - raise web.HTTPError(403) - - kernel = self.kernel_manager.get_kernel(self.kernel_id) - self.connection = self.kernel_websocket_connection_class( - parent=kernel, websocket_handler=self, config=self.config - ) - - if self.get_argument("session_id", None): - self.connection.session.session = self.get_argument("session_id") - else: - self.log.warning("No session ID specified") - # For backwards compatibility with older versions - # of the websocket connection, call a prepare method if found. - if hasattr(self.connection, "prepare"): - await self.connection.prepare() - - async def get(self, kernel_id): - self.kernel_id = kernel_id - await self.pre_get() - await super().get(kernel_id=kernel_id) - - async def open(self, kernel_id): - # Wait for the kernel to emit an idle status. - self.log.info(f"Connecting to kernel {self.kernel_id}.") - await self.connection.connect() - - def on_message(self, ws_message): - """Get a kernel message from the websocket and turn it into a ZMQ message.""" - self.connection.handle_incoming_message(ws_message) - - def on_close(self): - self.connection.disconnect() - self.connection = None - - -default_handlers = [ - (r"/api/kernels/%s/channels" % _kernel_id_regex, KernelWebsocketHandler), -] diff --git a/jupyter_server/services/kernelspecs/__init__.py b/jupyter_server/services/kernelspecs/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/jupyter_server/services/kernelspecs/handlers.py b/jupyter_server/services/kernelspecs/handlers.py deleted file mode 100644 index 6cd5d9dcba..0000000000 --- a/jupyter_server/services/kernelspecs/handlers.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Tornado handlers for kernel specifications. - -Preliminary documentation at https://github.com/ipython/ipython/wiki/IPEP-25%3A-Registry-of-installed-kernels#rest-api -""" -# Copyright (c) Jupyter Development Team. -# Distributed under the terms of the Modified BSD License. -import glob -import json -import os - -pjoin = os.path.join - -from tornado import web - -from jupyter_server.auth import authorized - -from ...base.handlers import APIHandler -from ...utils import ensure_async, url_path_join, url_unescape - -AUTH_RESOURCE = "kernelspecs" - - -def kernelspec_model(handler, name, spec_dict, resource_dir): - """Load a KernelSpec by name and return the REST API model""" - d = {"name": name, "spec": spec_dict, "resources": {}} - - # Add resource files if they exist - resource_dir = resource_dir - for resource in ["kernel.js", "kernel.css"]: - if os.path.exists(pjoin(resource_dir, resource)): - d["resources"][resource] = url_path_join( - handler.base_url, "kernelspecs", name, resource - ) - for logo_file in glob.glob(pjoin(resource_dir, "logo-*")): - fname = os.path.basename(logo_file) - no_ext, _ = os.path.splitext(fname) - d["resources"][no_ext] = url_path_join(handler.base_url, "kernelspecs", name, fname) - return d - - -def is_kernelspec_model(spec_dict): - """Returns True if spec_dict is already in proper form. This will occur when using a gateway.""" - return ( - isinstance(spec_dict, dict) - and "name" in spec_dict - and "spec" in spec_dict - and "resources" in spec_dict - ) - - -class KernelSpecsAPIHandler(APIHandler): - auth_resource = AUTH_RESOURCE - - -class MainKernelSpecHandler(KernelSpecsAPIHandler): - @web.authenticated - @authorized - async def get(self): - ksm = self.kernel_spec_manager - km = self.kernel_manager - model = {} - model["default"] = km.default_kernel_name - model["kernelspecs"] = specs = {} - kspecs = await ensure_async(ksm.get_all_specs()) - for kernel_name, kernel_info in kspecs.items(): - try: - if is_kernelspec_model(kernel_info): - d = kernel_info - else: - d = kernelspec_model( - self, - kernel_name, - kernel_info["spec"], - kernel_info["resource_dir"], - ) - except Exception: - self.log.error("Failed to load kernel spec: '%s'", kernel_name, exc_info=True) - continue - specs[kernel_name] = d - self.set_header("Content-Type", "application/json") - self.finish(json.dumps(model)) - - -class KernelSpecHandler(KernelSpecsAPIHandler): - @web.authenticated - @authorized - async def get(self, kernel_name): - ksm = self.kernel_spec_manager - kernel_name = url_unescape(kernel_name) - try: - spec = await ensure_async(ksm.get_kernel_spec(kernel_name)) - except KeyError as e: - raise web.HTTPError(404, "Kernel spec %s not found" % kernel_name) from e - if is_kernelspec_model(spec): - model = spec - else: - model = kernelspec_model(self, kernel_name, spec.to_dict(), spec.resource_dir) - self.set_header("Content-Type", "application/json") - self.finish(json.dumps(model)) - - -# URL to handler mappings - -kernel_name_regex = r"(?P[\w\.\-%]+)" - -default_handlers = [ - (r"/api/kernelspecs", MainKernelSpecHandler), - (r"/api/kernelspecs/%s" % kernel_name_regex, KernelSpecHandler), -] diff --git a/jupyter_server/services/sessions/sessionmanager.py b/jupyter_server/services/sessions/sessionmanager.py index e14fcc3768..039dbf5955 100644 --- a/jupyter_server/services/sessions/sessionmanager.py +++ b/jupyter_server/services/sessions/sessionmanager.py @@ -180,7 +180,7 @@ def _validate_database_filepath(self, proposal): raise TraitError("The given file is not an SQLite database file.") return value - kernel_manager = Instance("jupyter_server.services.kernels.kernelmanager.MappingKernelManager") + kernel_manager = Instance("jupyter_server_kernels.kernels.kernelmanager.MappingKernelManager") contents_manager = InstanceFromClasses( [ "jupyter_server.services.contents.manager.ContentsManager",