From f49a8ea1d1299cde8524089e6a6336dea3ec7b02 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Sun, 26 Jan 2025 18:46:38 +0100 Subject: [PATCH] wip --- .github/workflows/lint.yml | 14 +- .github/workflows/test.yml | 58 ++-- Makefile | 3 +- granian/__init__.py | 2 +- granian/_compat.py | 5 + granian/_granian.pyi | 1 + granian/_internal.py | 2 +- granian/_loops.py | 17 +- granian/cli.py | 4 +- granian/server/__init__.py | 7 + granian/{server.py => server/common.py} | 373 ++++-------------------- granian/server/mp.py | 299 +++++++++++++++++++ granian/server/mt.py | 291 ++++++++++++++++++ pyproject.toml | 32 +- src/asgi/callbacks.rs | 71 +++++ src/asgi/serve.rs | 18 +- src/lib.rs | 8 +- src/rsgi/callbacks.rs | 41 +++ src/rsgi/serve.rs | 18 +- src/runtime.rs | 102 ++++++- src/workers.rs | 166 +++-------- src/wsgi/serve.rs | 237 ++++++++++++++- 22 files changed, 1245 insertions(+), 524 deletions(-) create mode 100644 granian/_compat.py create mode 100644 granian/server/__init__.py rename granian/{server.py => server/common.py} (57%) create mode 100644 granian/server/mp.py create mode 100644 granian/server/mt.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 6de785a7..138e7d39 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -13,18 +13,18 @@ jobs: lint: runs-on: ubuntu-latest + env: + UV_PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v4 - - name: Set up Python ${{ env.PYTHON_VERSION }} - uses: actions/setup-python@v5 + - uses: astral-sh/setup-uv@v5 with: - python-version: ${{ env.PYTHON_VERSION }} + enable-cache: false - name: Install run: | - python -m venv .venv - source .venv/bin/activate - pip install maturin - maturin develop --extras=lint + uv python install ${{ env.UV_PYTHON }} + uv venv .venv + uv sync --group lint - name: Lint run: | source .venv/bin/activate diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2d2bb819..7041c9aa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,24 +21,25 @@ jobs: - '3.11' - '3.12' - '3.13' + - '3.13t' + env: + UV_PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + - uses: astral-sh/setup-uv@v5 with: - python-version: ${{ matrix.python-version }} - allow-prereleases: true + enable-cache: false - name: Install run: | - python -m venv .venv - source .venv/bin/activate - pip install maturin - maturin develop --extras=test + uv python install ${{ env.UV_PYTHON }} + uv venv .venv + uv sync --group all + uv run --no-sync maturin develop --uv - name: Test run: | source .venv/bin/activate - py.test -v tests + make test macos: runs-on: macos-latest @@ -51,24 +52,25 @@ jobs: - '3.11' - '3.12' - '3.13' + - '3.13t' + env: + UV_PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + - uses: astral-sh/setup-uv@v5 with: - python-version: ${{ matrix.python-version }} - allow-prereleases: true + enable-cache: false - name: Install run: | - python -m venv .venv - source .venv/bin/activate - pip install maturin - maturin develop --extras=test + uv python install ${{ env.UV_PYTHON }} + uv venv .venv + uv sync --group all + uv run --no-sync maturin develop --uv - name: Test run: | source .venv/bin/activate - py.test -v tests + make test windows: runs-on: windows-latest @@ -81,21 +83,21 @@ jobs: - '3.11' - '3.12' - '3.13' + - '3.13t' + env: + UV_PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v4 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + - uses: astral-sh/setup-uv@v5 with: - python-version: ${{ matrix.python-version }} - allow-prereleases: true + enable-cache: false - name: Install run: | - python -m venv venv - venv/Scripts/Activate.ps1 - pip install maturin - maturin develop --extras=test + uv python install ${{ env.UV_PYTHON }} + uv venv .venv + uv sync --group all + uv run --no-sync maturin develop --uv - name: Test run: | - venv/Scripts/Activate.ps1 - py.test -v tests + uv run --no-sync pytest -v tests diff --git a/Makefile b/Makefile index 304c0eeb..37f342e1 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,8 @@ pysources = granian tests .PHONY: build-dev build-dev: @rm -f granian/*.so - maturin develop --extras lint,test + uv sync --group all + maturin develop .PHONY: format format: diff --git a/granian/__init__.py b/granian/__init__.py index be65c048..074353f9 100644 --- a/granian/__init__.py +++ b/granian/__init__.py @@ -1,2 +1,2 @@ from ._granian import __version__ # noqa: F401 -from .server import Granian as Granian +from .server import Server as Granian # noqa: F401 diff --git a/granian/_compat.py b/granian/_compat.py new file mode 100644 index 00000000..ba320145 --- /dev/null +++ b/granian/_compat.py @@ -0,0 +1,5 @@ +import sys + + +_PYV = int(sys.version_info.major * 100 + sys.version_info.minor) +_PY_39 = 309 diff --git a/granian/_granian.pyi b/granian/_granian.pyi index e095f60b..6321a75c 100644 --- a/granian/_granian.pyi +++ b/granian/_granian.pyi @@ -5,6 +5,7 @@ from ._types import WebsocketMessage from .http import HTTP1Settings, HTTP2Settings __version__: str +BUILD_GIL: bool class RSGIHeaders: def __contains__(self, key: str) -> bool: ... diff --git a/granian/_internal.py b/granian/_internal.py index 89bee15c..ba8f44e3 100644 --- a/granian/_internal.py +++ b/granian/_internal.py @@ -41,7 +41,7 @@ def load_module(module_name: str, raise_on_failure: bool = True) -> Optional[Mod except ImportError: if sys.exc_info()[-1].tb_next: raise RuntimeError( - f"While importing '{module_name}', an ImportError was raised:" f'\n\n{traceback.format_exc()}' + f"While importing '{module_name}', an ImportError was raised:\n\n{traceback.format_exc()}" ) elif raise_on_failure: raise RuntimeError(f"Could not import '{module_name}'.") diff --git a/granian/_loops.py b/granian/_loops.py index 4e19cd8b..5e6212d7 100644 --- a/granian/_loops.py +++ b/granian/_loops.py @@ -3,6 +3,8 @@ import sys from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple +from ._compat import _PY_39, _PYV + WrappableT = Callable[..., Any] LoopBuilderT = Callable[..., asyncio.AbstractEventLoop] @@ -71,23 +73,24 @@ def get(self, key: str) -> asyncio.AbstractEventLoop: @loops.register('asyncio') def build_asyncio_loop(): loop = asyncio.new_event_loop() if os.name != 'nt' else asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) + if _PYV == _PY_39: + asyncio.set_event_loop(loop) return loop @loops.register('uvloop', packages=['uvloop']) def build_uv_loop(uvloop): - asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = uvloop.new_event_loop() + if _PYV == _PY_39: + asyncio.set_event_loop(loop) return loop @loops.register('rloop', packages=['rloop']) def build_rloop(rloop): - asyncio.set_event_loop_policy(rloop.EventLoopPolicy()) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = rloop.new_event_loop() + if _PYV == _PY_39: + asyncio.set_event_loop(loop) return loop diff --git a/granian/cli.py b/granian/cli.py index 882dda00..e23602ae 100644 --- a/granian/cli.py +++ b/granian/cli.py @@ -9,7 +9,7 @@ from .errors import FatalError from .http import HTTP1Settings, HTTP2Settings from .log import LogLevels -from .server import Granian +from .server import Server _AnyCallable = Callable[..., Any] @@ -313,7 +313,7 @@ def cli( print('Unable to parse provided logging config.') raise click.exceptions.Exit(1) - server = Granian( + server = Server( app, address=host, port=port, diff --git a/granian/server/__init__.py b/granian/server/__init__.py new file mode 100644 index 00000000..50cb725c --- /dev/null +++ b/granian/server/__init__.py @@ -0,0 +1,7 @@ +from .._granian import BUILD_GIL + + +if BUILD_GIL: + from .mp import MPServer as Server +else: + from .mt import MTServer as Server # noqa: F401 diff --git a/granian/server.py b/granian/server/common.py similarity index 57% rename from granian/server.py rename to granian/server/common.py index 77294a75..fd051ee7 100644 --- a/granian/server.py +++ b/granian/server/common.py @@ -10,28 +10,25 @@ import time from functools import partial from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type +from typing import Any, Callable, Dict, Generic, List, Optional, Sequence, Type, TypeVar -from ._futures import _future_watcher_wrapper, _new_cbscheduler -from ._granian import ASGIWorker, RSGIWorker, WSGIWorker -from ._imports import anyio, setproctitle, watchfiles -from ._internal import load_target -from ._signals import set_main_signals -from .asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap -from .constants import HTTPModes, Interfaces, Loops, TaskImpl, ThreadModes -from .errors import ConfigurationError, PidFileError -from .http import HTTP1Settings, HTTP2Settings -from .log import DEFAULT_ACCESSLOG_FMT, LogLevels, configure_logging, logger -from .net import SocketHolder -from .rsgi import _callback_wrapper as _rsgi_call_wrap -from .wsgi import _callback_wrapper as _wsgi_call_wrap +from .._imports import anyio, setproctitle, watchfiles +from .._internal import load_target +from .._signals import set_main_signals +from ..constants import HTTPModes, Interfaces, Loops, TaskImpl, ThreadModes +from ..errors import ConfigurationError, PidFileError +from ..http import HTTP1Settings, HTTP2Settings +from ..log import DEFAULT_ACCESSLOG_FMT, LogLevels, configure_logging, logger +from ..net import SocketHolder -multiprocessing.allow_connection_pickling() +WT = TypeVar('WT') -class Worker: - def __init__(self, parent: Granian, idx: int, target: Any, args: Any): +class AbstractWorker: + _idl = 'id' + + def __init__(self, parent: AbstractServer, idx: int, target: Any, args: Any): self.parent = parent self.idx = idx self.interrupt_by_parent = False @@ -39,10 +36,13 @@ def __init__(self, parent: Granian, idx: int, target: Any, args: Any): self._spawn(target, args) def _spawn(self, target, args): - self.proc = multiprocessing.get_context().Process(name='granian-worker', target=target, args=args) + raise NotImplementedError + + def _id(self): + raise NotImplementedError def _watcher(self): - self.proc.join() + self.inner.join() if not self.interrupt_by_parent: logger.error(f'Unexpected exit from worker-{self.idx + 1}') self.parent.interrupt_children.append(self.idx) @@ -53,23 +53,24 @@ def _watch(self): watcher.start() def start(self): - self.proc.start() - logger.info(f'Spawning worker-{self.idx + 1} with pid: {self.proc.pid}') + self.inner.start() + logger.info(f'Spawning worker-{self.idx + 1} with {self._idl}: {self._id()}') self._watch() + def is_alive(self): + return self.inner.is_alive() + def terminate(self): - self.interrupt_by_parent = True - self.proc.terminate() + raise NotImplementedError def kill(self): - self.interrupt_by_parent = True - self.proc.kill() + raise NotImplementedError def join(self, timeout=None): - self.proc.join(timeout=timeout) + self.inner.join(timeout=timeout) -class Granian: +class AbstractServer(Generic[WT]): def __init__( self, target: str, @@ -158,11 +159,11 @@ def __init__( self.build_ssl_context(ssl_cert, ssl_key, ssl_key_password) self._shd = None self._sfd = None - self.procs: List[Worker] = [] + self.wrks: List[WT] = [] self.main_loop_interrupt = threading.Event() self.interrupt_signal = False self.interrupt_children = [] - self.respawned_procs = {} + self.respawned_wrks = {} self.reload_signal = False self.lifetime_signal = False self.pid = None @@ -179,230 +180,6 @@ def build_ssl_context(self, cert: Optional[Path], key: Optional[Path], password: # key_contents = f.read() self.ssl_ctx = (True, str(cert.resolve()), str(key.resolve()), password) - @staticmethod - def _spawn_asgi_worker( - worker_id: int, - process_name: Optional[str], - callback_loader: Callable[..., Any], - socket: socket.socket, - loop_impl: Loops, - threads: int, - blocking_threads: int, - backpressure: int, - threading_mode: ThreadModes, - task_impl: TaskImpl, - http_mode: HTTPModes, - http1_settings: Optional[HTTP1Settings], - http2_settings: Optional[HTTP2Settings], - websockets: bool, - log_enabled: bool, - log_level: LogLevels, - log_config: Dict[str, Any], - log_access_fmt: Optional[str], - ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], - scope_opts: Dict[str, Any], - ): - from granian._loops import loops - from granian._signals import set_loop_signals - - if process_name: - setproctitle.setproctitle(f'{process_name} worker-{worker_id}') - configure_logging(log_level, log_config, log_enabled) - - loop = loops.get(loop_impl) - sfd = socket.fileno() - callback = callback_loader() - shutdown_event = set_loop_signals(loop) - wcallback = _asgi_call_wrap(callback, scope_opts, {}, log_access_fmt) - - worker = ASGIWorker( - worker_id, - sfd, - threads, - blocking_threads, - backpressure, - http_mode, - http1_settings, - http2_settings, - websockets, - *ssl_ctx, - ) - serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) - scheduler = _new_cbscheduler( - loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio - ) - serve(scheduler, loop, shutdown_event) - - @staticmethod - def _spawn_asgi_lifespan_worker( - worker_id: int, - process_name: Optional[str], - callback_loader: Callable[..., Any], - socket: socket.socket, - loop_impl: Loops, - threads: int, - blocking_threads: int, - backpressure: int, - threading_mode: ThreadModes, - task_impl: TaskImpl, - http_mode: HTTPModes, - http1_settings: Optional[HTTP1Settings], - http2_settings: Optional[HTTP2Settings], - websockets: bool, - log_enabled: bool, - log_level: LogLevels, - log_config: Dict[str, Any], - log_access_fmt: Optional[str], - ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], - scope_opts: Dict[str, Any], - ): - from granian._loops import loops - from granian._signals import set_loop_signals - - if process_name: - setproctitle.setproctitle(f'{process_name} worker-{worker_id}') - configure_logging(log_level, log_config, log_enabled) - - loop = loops.get(loop_impl) - sfd = socket.fileno() - callback = callback_loader() - lifespan_handler = LifespanProtocol(callback) - - loop.run_until_complete(lifespan_handler.startup()) - if lifespan_handler.interrupt: - logger.error('ASGI lifespan startup failed', exc_info=lifespan_handler.exc) - sys.exit(1) - - shutdown_event = set_loop_signals(loop) - wcallback = _asgi_call_wrap(callback, scope_opts, lifespan_handler.state, log_access_fmt) - - worker = ASGIWorker( - worker_id, - sfd, - threads, - blocking_threads, - backpressure, - http_mode, - http1_settings, - http2_settings, - websockets, - *ssl_ctx, - ) - serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) - scheduler = _new_cbscheduler( - loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio - ) - serve(scheduler, loop, shutdown_event) - loop.run_until_complete(lifespan_handler.shutdown()) - - @staticmethod - def _spawn_rsgi_worker( - worker_id: int, - process_name: Optional[str], - callback_loader: Callable[..., Any], - socket: socket.socket, - loop_impl: Loops, - threads: int, - blocking_threads: int, - backpressure: int, - threading_mode: ThreadModes, - task_impl: TaskImpl, - http_mode: HTTPModes, - http1_settings: Optional[HTTP1Settings], - http2_settings: Optional[HTTP2Settings], - websockets: bool, - log_enabled: bool, - log_level: LogLevels, - log_config: Dict[str, Any], - log_access_fmt: Optional[str], - ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], - scope_opts: Dict[str, Any], - ): - from granian._loops import loops - from granian._signals import set_loop_signals - - if process_name: - setproctitle.setproctitle(f'{process_name} worker-{worker_id}') - configure_logging(log_level, log_config, log_enabled) - - loop = loops.get(loop_impl) - sfd = socket.fileno() - target = callback_loader() - callback = getattr(target, '__rsgi__') if hasattr(target, '__rsgi__') else target - callback_init = ( - getattr(target, '__rsgi_init__') if hasattr(target, '__rsgi_init__') else lambda *args, **kwargs: None - ) - callback_del = ( - getattr(target, '__rsgi_del__') if hasattr(target, '__rsgi_del__') else lambda *args, **kwargs: None - ) - callback = _rsgi_call_wrap(callback, log_access_fmt) - shutdown_event = set_loop_signals(loop) - callback_init(loop) - - worker = RSGIWorker( - worker_id, - sfd, - threads, - blocking_threads, - backpressure, - http_mode, - http1_settings, - http2_settings, - websockets, - *ssl_ctx, - ) - serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) - scheduler = _new_cbscheduler( - loop, _future_watcher_wrapper(callback), impl_asyncio=task_impl == TaskImpl.asyncio - ) - serve(scheduler, loop, shutdown_event) - callback_del(loop) - - @staticmethod - def _spawn_wsgi_worker( - worker_id: int, - process_name: Optional[str], - callback_loader: Callable[..., Any], - socket: socket.socket, - loop_impl: Loops, - threads: int, - blocking_threads: int, - backpressure: int, - threading_mode: ThreadModes, - task_impl: TaskImpl, - http_mode: HTTPModes, - http1_settings: Optional[HTTP1Settings], - http2_settings: Optional[HTTP2Settings], - websockets: bool, - log_enabled: bool, - log_level: LogLevels, - log_config: Dict[str, Any], - log_access_fmt: Optional[str], - ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], - scope_opts: Dict[str, Any], - ): - from granian._loops import loops - from granian._signals import set_sync_signals - - if process_name: - setproctitle.setproctitle(f'{process_name} worker-{worker_id}') - configure_logging(log_level, log_config, log_enabled) - - loop = loops.get(loop_impl) - sfd = socket.fileno() - callback = callback_loader() - shutdown_event = set_sync_signals() - - worker = WSGIWorker( - worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx - ) - serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) - scheduler = _new_cbscheduler( - loop, _wsgi_call_wrap(callback, scope_opts, log_access_fmt), impl_asyncio=task_impl == TaskImpl.asyncio - ) - serve(scheduler, loop, shutdown_event) - shutdown_event.qs.wait() - def _init_shared_socket(self): self._shd = SocketHolder.from_address(self.bind_addr, self.bind_port, self.backlog) self._sfd = self._shd.get_fd() @@ -415,88 +192,62 @@ def signal_handler_reload(self, *args, **kwargs): self.reload_signal = True self.main_loop_interrupt.set() - def _spawn_proc(self, idx, target, callback_loader, socket_loader) -> Worker: - return Worker( - parent=self, - idx=idx, - target=target, - args=( - idx + 1, - self.process_name, - callback_loader, - socket_loader(), - self.loop, - self.threads, - self.blocking_threads, - self.backpressure, - self.threading_mode, - self.task_impl, - self.http, - self.http1_settings, - self.http2_settings, - self.websockets, - self.log_enabled, - self.log_level, - self.log_config, - self.log_access_format if self.log_access else None, - self.ssl_ctx, - {'url_path_prefix': self.url_path_prefix}, - ), - ) + def _spawn_worker(self, idx, target, callback_loader, socket_loader) -> WT: + raise NotImplementedError def _spawn_workers(self, sock, spawn_target, target_loader): def socket_loader(): return sock for idx in range(self.workers): - proc = self._spawn_proc( + wrk = self._spawn_worker( idx=idx, target=spawn_target, callback_loader=target_loader, socket_loader=socket_loader ) - proc.start() - self.procs.append(proc) + wrk.start() + self.wrks.append(wrk) def _respawn_workers(self, workers, sock, spawn_target, target_loader, delay: float = 0): def socket_loader(): return sock for idx in workers: - self.respawned_procs[idx] = time.time() + self.respawned_wrks[idx] = time.time() logger.info(f'Respawning worker-{idx + 1}') - old_proc = self.procs.pop(idx) - proc = self._spawn_proc( + old_wrk = self.wrks.pop(idx) + wrk = self._spawn_worker( idx=idx, target=spawn_target, callback_loader=target_loader, socket_loader=socket_loader ) - proc.start() - self.procs.insert(idx, proc) + wrk.start() + self.wrks.insert(idx, wrk) time.sleep(delay) logger.info(f'Stopping old worker-{idx + 1}') - old_proc.terminate() - old_proc.join(self.workers_kill_timeout) + old_wrk.terminate() + old_wrk.join(self.workers_kill_timeout) if self.workers_kill_timeout: - # the process might still be reported alive after `join`, let's context switch - if old_proc.proc.is_alive(): + # the worker might still be reported alive after `join`, let's context switch + if old_wrk.is_alive(): time.sleep(0.001) - if old_proc.proc.is_alive(): + if old_wrk.is_alive(): logger.warning(f'Killing old worker-{idx + 1} after it refused to gracefully stop') - old_proc.kill() - old_proc.join() + old_wrk.kill() + old_wrk.join() def _stop_workers(self): - for proc in self.procs: - proc.terminate() + for wrk in self.wrks: + wrk.terminate() - for proc in self.procs: - proc.join(self.workers_kill_timeout) + for wrk in self.wrks: + wrk.join(self.workers_kill_timeout) if self.workers_kill_timeout: - # the process might still be reported after `join`, let's context switch - if proc.proc.is_alive(): + # the worker might still be reported after `join`, let's context switch + if wrk.is_alive(): time.sleep(0.001) - if proc.proc.is_alive(): - logger.warning(f'Killing worker-{proc.idx} after it refused to gracefully stop') - proc.kill() - proc.join() + if wrk.is_alive(): + logger.warning(f'Killing worker-{wrk.idx} after it refused to gracefully stop') + wrk.kill() + wrk.join() - self.procs.clear() + self.wrks.clear() def _workers_lifetime_watcher(self, ttl): time.sleep(ttl) @@ -584,7 +335,7 @@ def _reload(self, sock, spawn_target, target_loader): logger.info('HUP signal received, gracefully respawning workers..') workers = list(range(self.workers)) self.reload_signal = False - self.respawned_procs.clear() + self.respawned_wrks.clear() self.main_loop_interrupt.clear() self._respawn_workers(workers, sock, spawn_target, target_loader, delay=self.respawn_interval) @@ -599,13 +350,13 @@ def _serve_loop(self, sock, spawn_target, target_loader): break cycle = time.time() - if any(cycle - self.respawned_procs.get(idx, 0) <= 5.5 for idx in self.interrupt_children): + if any(cycle - self.respawned_wrks.get(idx, 0) <= 5.5 for idx in self.interrupt_children): logger.error('Worker crash loop detected, exiting') break workers = list(self.interrupt_children) self.interrupt_children.clear() - self.respawned_procs.clear() + self.respawned_wrks.clear() self.main_loop_interrupt.clear() self._respawn_workers(workers, sock, spawn_target, target_loader) @@ -618,7 +369,7 @@ def _serve_loop(self, sock, spawn_target, target_loader): ttl = self.workers_lifetime * 0.95 now = time.time() etas = [self.workers_lifetime] - for worker in list(self.procs): + for worker in list(self.wrks): if (now - worker.birth) >= ttl: logger.info(f'worker-{worker.idx + 1} lifetime expired, gracefully respawning..') self._respawn_workers( diff --git a/granian/server/mp.py b/granian/server/mp.py new file mode 100644 index 00000000..9a59a4dc --- /dev/null +++ b/granian/server/mp.py @@ -0,0 +1,299 @@ +import multiprocessing +import socket +import sys +from typing import Any, Callable, Dict, Optional, Tuple + +from .._futures import _future_watcher_wrapper, _new_cbscheduler +from .._granian import ASGIWorker, RSGIWorker, WSGIWorker +from ..asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap +from ..rsgi import _callback_wrapper as _rsgi_call_wrap +from ..wsgi import _callback_wrapper as _wsgi_call_wrap +from .common import ( + AbstractServer, + AbstractWorker, + HTTP1Settings, + HTTP2Settings, + HTTPModes, + LogLevels, + Loops, + TaskImpl, + ThreadModes, + configure_logging, + logger, + setproctitle, +) + + +multiprocessing.allow_connection_pickling() + + +class WorkerProcess(AbstractWorker): + _idl = 'PID' + + def _spawn(self, target, args): + self.inner = multiprocessing.get_context().Process(name='granian-worker', target=target, args=args) + + def _id(self): + return self.inner.pid + + def terminate(self): + self.interrupt_by_parent = True + self.inner.terminate() + + def kill(self): + self.interrupt_by_parent = True + self.inner.kill() + + +class MPServer(AbstractServer[WorkerProcess]): + @staticmethod + def _spawn_asgi_worker( + worker_id: int, + process_name: Optional[str], + callback_loader: Callable[..., Any], + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_enabled: bool, + log_level: LogLevels, + log_config: Dict[str, Any], + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + from granian._loops import loops + from granian._signals import set_loop_signals + + if process_name: + setproctitle.setproctitle(f'{process_name} worker-{worker_id}') + configure_logging(log_level, log_config, log_enabled) + + loop = loops.get(loop_impl) + sfd = socket.fileno() + callback = callback_loader() + shutdown_event = set_loop_signals(loop) + wcallback = _asgi_call_wrap(callback, scope_opts, {}, log_access_fmt) + + worker = ASGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + + @staticmethod + def _spawn_asgi_lifespan_worker( + worker_id: int, + process_name: Optional[str], + callback_loader: Callable[..., Any], + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_enabled: bool, + log_level: LogLevels, + log_config: Dict[str, Any], + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + from granian._loops import loops + from granian._signals import set_loop_signals + + if process_name: + setproctitle.setproctitle(f'{process_name} worker-{worker_id}') + configure_logging(log_level, log_config, log_enabled) + + loop = loops.get(loop_impl) + sfd = socket.fileno() + callback = callback_loader() + lifespan_handler = LifespanProtocol(callback) + + loop.run_until_complete(lifespan_handler.startup()) + if lifespan_handler.interrupt: + logger.error('ASGI lifespan startup failed', exc_info=lifespan_handler.exc) + sys.exit(1) + + shutdown_event = set_loop_signals(loop) + wcallback = _asgi_call_wrap(callback, scope_opts, lifespan_handler.state, log_access_fmt) + + worker = ASGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + loop.run_until_complete(lifespan_handler.shutdown()) + + @staticmethod + def _spawn_rsgi_worker( + worker_id: int, + process_name: Optional[str], + callback_loader: Callable[..., Any], + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_enabled: bool, + log_level: LogLevels, + log_config: Dict[str, Any], + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + from granian._loops import loops + from granian._signals import set_loop_signals + + if process_name: + setproctitle.setproctitle(f'{process_name} worker-{worker_id}') + configure_logging(log_level, log_config, log_enabled) + + loop = loops.get(loop_impl) + sfd = socket.fileno() + target = callback_loader() + callback = getattr(target, '__rsgi__') if hasattr(target, '__rsgi__') else target + callback_init = ( + getattr(target, '__rsgi_init__') if hasattr(target, '__rsgi_init__') else lambda *args, **kwargs: None + ) + callback_del = ( + getattr(target, '__rsgi_del__') if hasattr(target, '__rsgi_del__') else lambda *args, **kwargs: None + ) + callback = _rsgi_call_wrap(callback, log_access_fmt) + shutdown_event = set_loop_signals(loop) + callback_init(loop) + + worker = RSGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(callback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + callback_del(loop) + + @staticmethod + def _spawn_wsgi_worker( + worker_id: int, + process_name: Optional[str], + callback_loader: Callable[..., Any], + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_enabled: bool, + log_level: LogLevels, + log_config: Dict[str, Any], + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + from granian._loops import loops + from granian._signals import set_sync_signals + + if process_name: + setproctitle.setproctitle(f'{process_name} worker-{worker_id}') + configure_logging(log_level, log_config, log_enabled) + + loop = loops.get(loop_impl) + sfd = socket.fileno() + callback = callback_loader() + shutdown_event = set_sync_signals() + + worker = WSGIWorker( + worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _wsgi_call_wrap(callback, scope_opts, log_access_fmt), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + + def _spawn_worker(self, idx, target, callback_loader, socket_loader) -> WorkerProcess: + return WorkerProcess( + parent=self, + idx=idx, + target=target, + args=( + idx + 1, + self.process_name, + callback_loader, + socket_loader(), + self.loop, + self.threads, + self.blocking_threads, + self.backpressure, + self.threading_mode, + self.task_impl, + self.http, + self.http1_settings, + self.http2_settings, + self.websockets, + self.log_enabled, + self.log_level, + self.log_config, + self.log_access_format if self.log_access else None, + self.ssl_ctx, + {'url_path_prefix': self.url_path_prefix}, + ), + ) diff --git a/granian/server/mt.py b/granian/server/mt.py new file mode 100644 index 00000000..c63b163f --- /dev/null +++ b/granian/server/mt.py @@ -0,0 +1,291 @@ +import socket +import sys +import threading +from typing import Any, Callable, Dict, Optional, Tuple + +from .._futures import _future_watcher_wrapper, _new_cbscheduler +from .._granian import ASGIWorker, RSGIWorker, WorkerSignal, WorkerSignalSync, WSGIWorker +from .._loops import loops +from ..asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap +from ..errors import ConfigurationError, FatalError +from ..rsgi import _callback_wrapper as _rsgi_call_wrap +from ..wsgi import _callback_wrapper as _wsgi_call_wrap +from .common import ( + AbstractServer, + AbstractWorker, + HTTP1Settings, + HTTP2Settings, + HTTPModes, + Interfaces, + Loops, + TaskImpl, + ThreadModes, + logger, +) + + +class WorkerThread(AbstractWorker): + _idl = 'TID' + + def __init__(self, parent, idx, target, args, sig): + self._sig = sig + super().__init__(parent, idx, target, args) + + def _spawn(self, target, args): + self.inner = threading.Thread(name='granian-worker', target=target, args=args) + self._alive = True + + def _id(self): + return self.inner.native_id + + def _watcher(self): + self.inner.join() + self._alive = False + if not self.interrupt_by_parent: + logger.error(f'Unexpected exit from worker-{self.idx + 1}') + self.parent.interrupt_children.append(self.idx) + self.parent.main_loop_interrupt.set() + + def terminate(self): + self._alive = False + self.interrupt_by_parent = True + self._sig.set() + + def is_alive(self): + if not self._alive: + return False + return self.inner.is_alive() + + +class MTServer(AbstractServer[WorkerThread]): + @staticmethod + def _spawn_asgi_worker( + worker_id: int, + shutdown_event: Any, + callback: Any, + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + loop = loops.get(loop_impl) + sfd = socket.fileno() + wcallback = _asgi_call_wrap(callback, scope_opts, {}, log_access_fmt) + + worker = ASGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + + @staticmethod + def _spawn_asgi_lifespan_worker( + worker_id: int, + shutdown_event: Any, + callback: Any, + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + loop = loops.get(loop_impl) + sfd = socket.fileno() + + lifespan_handler = LifespanProtocol(callback) + loop.run_until_complete(lifespan_handler.startup()) + if lifespan_handler.interrupt: + logger.error('ASGI lifespan startup failed', exc_info=lifespan_handler.exc) + sys.exit(1) + + wcallback = _asgi_call_wrap(callback, scope_opts, lifespan_handler.state, log_access_fmt) + + worker = ASGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(wcallback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + + @staticmethod + def _spawn_rsgi_worker( + worker_id: int, + shutdown_event: Any, + callback: Any, + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + loop = loops.get(loop_impl) + sfd = socket.fileno() + callback_init = ( + getattr(callback, '__rsgi_init__') if hasattr(callback, '__rsgi_init__') else lambda *args, **kwargs: None + ) + callback_del = ( + getattr(callback, '__rsgi_del__') if hasattr(callback, '__rsgi_del__') else lambda *args, **kwargs: None + ) + callback = getattr(callback, '__rsgi__') if hasattr(callback, '__rsgi__') else callback + callback = _rsgi_call_wrap(callback, log_access_fmt) + callback_init(loop) + + worker = RSGIWorker( + worker_id, + sfd, + threads, + blocking_threads, + backpressure, + http_mode, + http1_settings, + http2_settings, + websockets, + *ssl_ctx, + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _future_watcher_wrapper(callback), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + callback_del(loop) + + @staticmethod + def _spawn_wsgi_worker( + worker_id: int, + shutdown_event: Any, + callback: Any, + socket: socket.socket, + loop_impl: Loops, + threads: int, + blocking_threads: int, + backpressure: int, + threading_mode: ThreadModes, + task_impl: TaskImpl, + http_mode: HTTPModes, + http1_settings: Optional[HTTP1Settings], + http2_settings: Optional[HTTP2Settings], + websockets: bool, + log_access_fmt: Optional[str], + ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]], + scope_opts: Dict[str, Any], + ): + loop = loops.get(loop_impl) + sfd = socket.fileno() + + worker = WSGIWorker( + worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx + ) + serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode]) + scheduler = _new_cbscheduler( + loop, _wsgi_call_wrap(callback, scope_opts, log_access_fmt), impl_asyncio=task_impl == TaskImpl.asyncio + ) + serve(scheduler, loop, shutdown_event) + + def _spawn_worker(self, idx, target, callback_loader, socket_loader) -> WorkerThread: + sig = WorkerSignalSync(threading.Event()) if self.interface == Interfaces.WSGI else WorkerSignal() + + return WorkerThread( + parent=self, + idx=idx, + target=target, + args=( + idx + 1, + sig, + callback_loader, + socket_loader(), + self.loop, + self.threads, + self.blocking_threads, + self.backpressure, + self.threading_mode, + self.task_impl, + self.http, + self.http1_settings, + self.http2_settings, + self.websockets, + self.log_access_format if self.log_access else None, + self.ssl_ctx, + {'url_path_prefix': self.url_path_prefix}, + ), + sig=sig, + ) + + def _check_gil(self): + try: + assert sys._is_gil_enabled() is False + except Exception: + logger.error('Cannot run a free-threaded Granian build with GIL enabled') + raise FatalError('GIL enabled on free-threaded build') + + def _serve(self, spawn_target, target_loader): + target = target_loader() + self._check_gil() + sock = self.startup(spawn_target, target) + self._serve_loop(sock, spawn_target, target) + self.shutdown() + + def _serve_with_reloader(self, spawn_target, target_loader): + raise NotImplementedError + + def serve( + self, + spawn_target: Optional[Callable[..., None]] = None, + target_loader: Optional[Callable[..., Callable[..., Any]]] = None, + wrap_loader: bool = True, + ): + if self.reload_on_changes: + logger.error('The changes reloader is not supported on the free-threaded build') + raise ConfigurationError('reload') + + super().serve(spawn_target, target_loader, wrap_loader) diff --git a/pyproject.toml b/pyproject.toml index ccd00c76..e4e12315 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,6 @@ dynamic = [ requires-python = '>=3.9' dependencies = [ 'click>=8.0.0', - 'uvloop>=0.18.0; sys_platform != "win32" and platform_python_implementation == "CPython"', ] [project.optional-dependencies] @@ -44,6 +43,24 @@ pname = [ reload = [ 'watchfiles~=0.21', ] +rloop = [ + 'rloop; sys_platform != "win32"', +] +uvloop = [ + 'uvloop>=0.18.0; sys_platform != "win32" and platform_python_implementation == "CPython"', +] + +all = ['granian[pname,reload]'] + +[project.urls] +Homepage = 'https://github.com/emmett-framework/granian' +Funding = 'https://github.com/sponsors/gi0baro' +Source = 'https://github.com/emmett-framework/granian' + +[dependency-groups] +build = [ + 'maturin~=1.8, +] lint = [ 'ruff~=0.5.0', ] @@ -54,19 +71,18 @@ test = [ 'sniffio~=1.3', 'websockets~=11.0', ] -all = ['granian[pname,reload]'] -dev = ['granian[all,lint,test]'] -[project.urls] -Homepage = 'https://github.com/emmett-framework/granian' -Funding = 'https://github.com/sponsors/gi0baro' -Source = 'https://github.com/emmett-framework/granian' +all = [ + { include-group = 'build' }, + { include-group = 'lint' }, + { include-group = 'test' }, +] [project.scripts] granian = 'granian:cli.entrypoint' [build-system] -requires = ['maturin>=1.1.0,<2'] +requires = ['maturin>=1.8.0,<2'] build-backend = 'maturin' [tool.maturin] diff --git a/src/asgi/callbacks.rs b/src/asgi/callbacks.rs index 053fea64..438a16fc 100644 --- a/src/asgi/callbacks.rs +++ b/src/asgi/callbacks.rs @@ -138,6 +138,7 @@ impl CallbackWatcherWebsocket { // } // } +#[cfg(not(Py_GIL_DISABLED))] #[inline] pub(crate) fn call_http( cb: ArcCBScheduler, @@ -174,6 +175,41 @@ pub(crate) fn call_http( rx } +#[cfg(Py_GIL_DISABLED)] +#[inline] +pub(crate) fn call_http( + cb: ArcCBScheduler, + rt: RuntimeRef, + server_addr: SocketAddr, + client_addr: SocketAddr, + scheme: &str, + req: hyper::http::request::Parts, + body: hyper::body::Incoming, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let protocol = HTTPProtocol::new(rt, body, tx); + let scheme: Arc = scheme.into(); + + scope_native_parts!( + req, + server_addr, + client_addr, + path, + query_string, + version, + server, + client + ); + Python::with_gil(|py| { + let scope = build_scope_http(py, &req, version, server, client, &scheme, &path, query_string).unwrap(); + let watcher = Py::new(py, CallbackWatcherHTTP::new(py, protocol, scope)).unwrap(); + cb.get().schedule(py, watcher.as_any()); + }); + + rx +} + +#[cfg(not(Py_GIL_DISABLED))] #[inline] pub(crate) fn call_ws( cb: ArcCBScheduler, @@ -210,3 +246,38 @@ pub(crate) fn call_ws( rx } + +#[cfg(Py_GIL_DISABLED)] +#[inline] +pub(crate) fn call_ws( + cb: ArcCBScheduler, + rt: RuntimeRef, + server_addr: SocketAddr, + client_addr: SocketAddr, + scheme: &str, + ws: HyperWebsocket, + req: hyper::http::request::Parts, + upgrade: UpgradeData, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade); + let scheme: Arc = scheme.into(); + + scope_native_parts!( + req, + server_addr, + client_addr, + path, + query_string, + version, + server, + client + ); + Python::with_gil(|py| { + let scope = build_scope_ws(py, &req, version, server, client, &scheme, &path, query_string).unwrap(); + let watcher = Py::new(py, CallbackWatcherWebsocket::new(py, protocol, scope)).unwrap(); + cb.get().schedule(py, watcher.as_any()); + }); + + rx +} diff --git a/src/asgi/serve.rs b/src/asgi/serve.rs index dd7d1380..8537d8c2 100644 --- a/src/asgi/serve.rs +++ b/src/asgi/serve.rs @@ -4,7 +4,7 @@ use super::http::{handle, handle_ws}; use crate::callbacks::CallbackScheduler; use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py}; -use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal, WorkerSignals}; +use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal}; #[pyclass(frozen, module = "granian._granian")] pub struct ASGIWorker { @@ -79,19 +79,19 @@ impl ASGIWorker { fn serve_rth(&self, callback: Py, event_loop: &Bound, signal: Py) { match (self.config.websockets_enabled, self.config.ssl_enabled) { - (false, false) => self._serve_rth(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, false) => self._serve_rth_ws(callback, event_loop, WorkerSignals::Tokio(signal)), - (false, true) => self._serve_rth_ssl(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, true) => self._serve_rth_ssl_ws(callback, event_loop, WorkerSignals::Tokio(signal)), + (false, false) => self._serve_rth(callback, event_loop, signal), + (true, false) => self._serve_rth_ws(callback, event_loop, signal), + (false, true) => self._serve_rth_ssl(callback, event_loop, signal), + (true, true) => self._serve_rth_ssl_ws(callback, event_loop, signal), } } fn serve_wth(&self, callback: Py, event_loop: &Bound, signal: Py) { match (self.config.websockets_enabled, self.config.ssl_enabled) { - (false, false) => self._serve_wth(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, false) => self._serve_wth_ws(callback, event_loop, WorkerSignals::Tokio(signal)), - (false, true) => self._serve_wth_ssl(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, true) => self._serve_wth_ssl_ws(callback, event_loop, WorkerSignals::Tokio(signal)), + (false, false) => self._serve_wth(callback, event_loop, signal), + (true, false) => self._serve_wth_ws(callback, event_loop, signal), + (false, true) => self._serve_wth_ssl(callback, event_loop, signal), + (true, true) => self._serve_wth_ssl_ws(callback, event_loop, signal), } } } diff --git a/src/lib.rs b/src/lib.rs index 8fc30800..e1c3d2ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,11 @@ mod workers; mod ws; mod wsgi; +#[cfg(not(Py_GIL_DISABLED))] +const BUILD_GIL: bool = true; +#[cfg(Py_GIL_DISABLED)] +const BUILD_GIL: bool = false; + pub fn get_granian_version() -> &'static str { static GRANIAN_VERSION: OnceLock = OnceLock::new(); @@ -26,9 +31,10 @@ pub fn get_granian_version() -> &'static str { }) } -#[pymodule] +#[pymodule(gil_used = false)] fn _granian(py: Python, module: &Bound) -> PyResult<()> { module.add("__version__", get_granian_version())?; + module.add("BUILD_GIL", BUILD_GIL)?; module.add_class::()?; asgi::init_pymodule(module)?; rsgi::init_pymodule(py, module)?; diff --git a/src/rsgi/callbacks.rs b/src/rsgi/callbacks.rs index d00ef51e..e5998005 100644 --- a/src/rsgi/callbacks.rs +++ b/src/rsgi/callbacks.rs @@ -108,6 +108,7 @@ impl CallbackWatcherWebsocket { } } +#[cfg(not(Py_GIL_DISABLED))] #[inline] pub(crate) fn call_http( cb: ArcCBScheduler, @@ -129,6 +130,26 @@ pub(crate) fn call_http( rx } +#[cfg(Py_GIL_DISABLED)] +#[inline] +pub(crate) fn call_http( + cb: ArcCBScheduler, + rt: RuntimeRef, + body: hyper::body::Incoming, + scope: HTTPScope, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let protocol = HTTPProtocol::new(rt, tx, body); + + Python::with_gil(|py| { + let watcher = Py::new(py, CallbackWatcherHTTP::new(py, protocol, scope)).unwrap(); + cb.get().schedule(py, watcher.as_any()); + }); + + rx +} + +#[cfg(not(Py_GIL_DISABLED))] #[inline] pub(crate) fn call_ws( cb: ArcCBScheduler, @@ -150,3 +171,23 @@ pub(crate) fn call_ws( rx } + +#[cfg(Py_GIL_DISABLED)] +#[inline] +pub(crate) fn call_ws( + cb: ArcCBScheduler, + rt: RuntimeRef, + ws: HyperWebsocket, + upgrade: UpgradeData, + scope: WebsocketScope, +) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let protocol = WebsocketProtocol::new(rt, tx, ws, upgrade); + + Python::with_gil(|py| { + let watcher = Py::new(py, CallbackWatcherWebsocket::new(py, protocol, scope)).unwrap(); + cb.get().schedule(py, watcher.as_any()); + }); + + rx +} diff --git a/src/rsgi/serve.rs b/src/rsgi/serve.rs index bc904b91..57f4e35b 100644 --- a/src/rsgi/serve.rs +++ b/src/rsgi/serve.rs @@ -4,7 +4,7 @@ use super::http::{handle, handle_ws}; use crate::callbacks::CallbackScheduler; use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py}; -use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal, WorkerSignals}; +use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal}; #[pyclass(frozen, module = "granian._granian")] pub struct RSGIWorker { @@ -79,19 +79,19 @@ impl RSGIWorker { fn serve_rth(&self, callback: Py, event_loop: &Bound, signal: Py) { match (self.config.websockets_enabled, self.config.ssl_enabled) { - (false, false) => self._serve_rth(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, false) => self._serve_rth_ws(callback, event_loop, WorkerSignals::Tokio(signal)), - (false, true) => self._serve_rth_ssl(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, true) => self._serve_rth_ssl_ws(callback, event_loop, WorkerSignals::Tokio(signal)), + (false, false) => self._serve_rth(callback, event_loop, signal), + (true, false) => self._serve_rth_ws(callback, event_loop, signal), + (false, true) => self._serve_rth_ssl(callback, event_loop, signal), + (true, true) => self._serve_rth_ssl_ws(callback, event_loop, signal), } } fn serve_wth(&self, callback: Py, event_loop: &Bound, signal: Py) { match (self.config.websockets_enabled, self.config.ssl_enabled) { - (false, false) => self._serve_wth(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, false) => self._serve_wth_ws(callback, event_loop, WorkerSignals::Tokio(signal)), - (false, true) => self._serve_wth_ssl(callback, event_loop, WorkerSignals::Tokio(signal)), - (true, true) => self._serve_wth_ssl_ws(callback, event_loop, WorkerSignals::Tokio(signal)), + (false, false) => self._serve_wth(callback, event_loop, signal), + (true, false) => self._serve_wth_ws(callback, event_loop, signal), + (false, true) => self._serve_wth_ssl(callback, event_loop, signal), + (true, true) => self._serve_wth_ssl_ws(callback, event_loop, signal), } } } diff --git a/src/runtime.rs b/src/runtime.rs index f5437fdd..e822b267 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -34,6 +34,7 @@ pub trait Runtime: Send + 'static { where F: Future + Send + 'static; + #[cfg(not(Py_GIL_DISABLED))] fn blocking(&self) -> BlockingRunner; } @@ -103,6 +104,7 @@ impl Runtime for RuntimeRef { self.inner.spawn(fut) } + #[cfg(not(Py_GIL_DISABLED))] fn blocking(&self) -> BlockingRunner { self.innerb.clone() } @@ -144,6 +146,7 @@ pub(crate) fn init_runtime_st(blocking_threads: usize, py_loop: Arc) - // It consumes more cpu-cycles than `future_into_py_futlike`, // but for "quick" operations it's something like 12% faster. #[allow(unused_must_use)] +#[cfg(not(Py_GIL_DISABLED))] pub(crate) fn future_into_py_iter(rt: R, py: Python, fut: F) -> PyResult> where R: Runtime + ContextExt + Clone, @@ -158,7 +161,7 @@ where let _ = rb.run(move || { Python::with_gil(|py| { aw.get().set_result(py, result); - drop(aw) + drop(aw); }); }); }); @@ -166,13 +169,34 @@ where Ok(py_fut.into_any().into_bound(py)) } +#[allow(unused_must_use)] +#[cfg(Py_GIL_DISABLED)] +pub(crate) fn future_into_py_iter(rt: R, py: Python, fut: F) -> PyResult> +where + R: Runtime + ContextExt + Clone, + F: Future + Send + 'static, +{ + let aw = Py::new(py, PyIterAwaitable::new())?; + let py_fut = aw.clone_ref(py); + + rt.spawn(async move { + let result = fut.await; + Python::with_gil(|py| { + aw.get().set_result(py, result); + drop(aw); + }); + }); + + Ok(py_fut.into_any().into_bound(py)) +} + // NOTE: // `future_into_py_futlike` relies on an `asyncio.Future` like implementation. // This is generally ~38% faster than `pyo3_asyncio.future_into_py` implementation. // It won't consume more cpu-cycles than standard asyncio implementation, // and for "long" operations it's something like 6% faster than `future_into_py_iter`. #[allow(unused_must_use)] -#[cfg(unix)] +#[cfg(all(unix, not(Py_GIL_DISABLED)))] pub(crate) fn future_into_py_futlike(rt: R, py: Python, fut: F) -> PyResult> where R: Runtime + ContextExt + Clone, @@ -198,7 +222,32 @@ where } #[allow(unused_must_use)] -#[cfg(windows)] +#[cfg(all(unix, Py_GIL_DISABLED))] +pub(crate) fn future_into_py_futlike(rt: R, py: Python, fut: F) -> PyResult> +where + R: Runtime + ContextExt + Clone, + F: Future + Send + 'static, +{ + let event_loop = rt.py_event_loop(py); + let (aw, cancel_tx) = PyFutureAwaitable::new(event_loop).to_spawn(py)?; + let py_fut = aw.clone_ref(py); + + rt.spawn(async move { + tokio::select! { + result = fut => { + Python::with_gil(|py| PyFutureAwaitable::set_result(aw, py, result)); + }, + () = cancel_tx.notified() => { + Python::with_gil(|_| drop(aw)); + } + } + }); + + Ok(py_fut.into_any().into_bound(py)) +} + +#[allow(unused_must_use)] +#[cfg(all(windows, not(Py_GIL_DISABLED)))] pub(crate) fn future_into_py_futlike(rt: R, py: Python, fut: F) -> PyResult> where R: Runtime + ContextExt + Clone, @@ -249,6 +298,53 @@ where Ok(py_fut.into_bound(py)) } +#[allow(unused_must_use)] +#[cfg(all(windows, Py_GIL_DISABLED))] +pub(crate) fn future_into_py_futlike(rt: R, py: Python, fut: F) -> PyResult> +where + R: Runtime + ContextExt + Clone, + F: Future + Send + 'static, +{ + let event_loop = rt.py_event_loop(py); + let event_loop_ref = event_loop.clone_ref(py); + let cancel_tx = Arc::new(tokio::sync::Notify::new()); + + let py_fut = event_loop.call_method0(py, pyo3::intern!(py, "create_future"))?; + py_fut.call_method1( + py, + pyo3::intern!(py, "add_done_callback"), + (PyFutureDoneCallback { + cancel_tx: cancel_tx.clone(), + },), + )?; + let fut_ref = py_fut.clone_ref(py); + + rt.spawn(async move { + tokio::select! { + result = fut => { + Python::with_gil(|py| { + let pyres = result.into_pyobject(py).map(Bound::unbind); + let (cb, value) = match pyres { + Ok(val) => (fut_ref.getattr(py, pyo3::intern!(py, "set_result")).unwrap(), val), + Err(err) => (fut_ref.getattr(py, pyo3::intern!(py, "set_exception")).unwrap(), err.into_py_any(py).unwrap()) + }; + let _ = event_loop_ref.call_method1(py, pyo3::intern!(py, "call_soon_threadsafe"), (PyFutureResultSetter, cb, value)); + drop(fut_ref); + drop(event_loop_ref); + }); + }, + () = cancel_tx.notified() => { + Python::with_gil(|_| { + drop(fut_ref); + drop(event_loop_ref); + }); + } + } + }); + + Ok(py_fut.into_bound(py)) +} + #[allow(clippy::unnecessary_wraps)] #[inline(always)] pub(crate) fn empty_future_into_py(py: Python) -> PyResult> { diff --git a/src/workers.rs b/src/workers.rs index ebb63a5a..09e71dbe 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -12,11 +12,6 @@ use super::rsgi::serve::RSGIWorker; use super::tls::{load_certs as tls_load_certs, load_private_key as tls_load_pkey}; use super::wsgi::serve::WSGIWorker; -pub(crate) enum WorkerSignals { - Tokio(Py), - Crossbeam(Py), -} - #[pyclass(frozen, module = "granian._granian")] pub(crate) struct WorkerSignal { pub rx: Mutex>>, @@ -592,9 +587,9 @@ macro_rules! serve_rth { &self, callback: Py, event_loop: &Bound, - signal: crate::workers::WorkerSignals, + signal: Py, ) { - pyo3_log::init(); + _ = pyo3_log::try_init(); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); @@ -613,23 +608,7 @@ macro_rules! serve_rth { std::sync::Arc::new(event_loop.clone().unbind()), ); let rth = rt.handler(); - let mut srx = match signal { - crate::workers::WorkerSignals::Crossbeam(sig) => { - let (stx, srx) = tokio::sync::watch::channel(false); - std::thread::spawn(move || { - let pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - let _ = pyrx.recv(); - stx.send(true).unwrap(); - - Python::with_gil(|py| { - let _ = sig.get().release(py); - drop(sig); - }); - }); - srx - } - crate::workers::WorkerSignals::Tokio(sig) => sig.get().rx.lock().unwrap().take().unwrap(), - }; + let mut srx = signal.get().rx.lock().unwrap().take().unwrap(); let main_loop = crate::runtime::run_until_complete(rt.handler(), event_loop.clone(), async move { crate::workers::loop_match!( @@ -671,10 +650,9 @@ macro_rules! serve_rth_ssl { &self, callback: Py, event_loop: &Bound, - // context: Bound, - signal: crate::workers::WorkerSignals, + signal: Py, ) { - pyo3_log::init(); + _ = pyo3_log::try_init(); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); @@ -686,7 +664,6 @@ macro_rules! serve_rth_ssl { let http2_opts = self.config.http2_opts.clone(); let backpressure = self.config.backpressure.clone(); let tls_cfg = self.config.tls_cfg(); - // let callback_wrapper = crate::callbacks::CallbackWrapper::new(callback, event_loop.clone(), context); let callback_wrapper = std::sync::Arc::new(callback); let rt = crate::runtime::init_runtime_mt( @@ -695,23 +672,7 @@ macro_rules! serve_rth_ssl { std::sync::Arc::new(event_loop.clone().unbind()), ); let rth = rt.handler(); - let mut srx = match signal { - crate::workers::WorkerSignals::Crossbeam(sig) => { - let (stx, srx) = tokio::sync::watch::channel(false); - std::thread::spawn(move || { - let pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - let _ = pyrx.recv(); - stx.send(true).unwrap(); - - Python::with_gil(|py| { - let _ = sig.get().release(py); - drop(sig); - }); - }); - srx - } - crate::workers::WorkerSignals::Tokio(sig) => sig.get().rx.lock().unwrap().take().unwrap(), - }; + let mut srx = signal.get().rx.lock().unwrap().take().unwrap(); let main_loop = crate::runtime::run_until_complete(rt.handler(), event_loop.clone(), async move { crate::workers::loop_match_tls!( @@ -750,7 +711,6 @@ macro_rules! serve_rth_ssl { macro_rules! serve_wth_inner { ($self:expr, $target:expr, $callback:expr, $event_loop:expr, $wid:expr, $workers:expr, $srx:expr) => { - // let callback_wrapper = crate::callbacks::CallbackWrapper::new($callback, $event_loop.clone(), $context); let callback_wrapper = std::sync::Arc::new($callback); let py_loop = std::sync::Arc::new($event_loop.clone().unbind()); @@ -807,10 +767,9 @@ macro_rules! serve_wth { &self, callback: Py, event_loop: &Bound, - // context: Bound, - signal: crate::workers::WorkerSignals, + signal: Py, ) { - pyo3_log::init(); + _ = pyo3_log::try_init(); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); @@ -819,52 +778,31 @@ macro_rules! serve_wth { let mut workers = vec![]; crate::workers::serve_wth_inner!(self, $target, callback, event_loop, worker_id, workers, srx); - match signal { - crate::workers::WorkerSignals::Tokio(sig) => { - let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); - let mut pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - let main_loop = crate::runtime::run_until_complete(rtm.handler(), event_loop.clone(), async move { - let _ = pyrx.changed().await; - stx.send(true).unwrap(); - log::info!("Stopping worker-{}", worker_id); - while let Some(worker) = workers.pop() { - worker.join().unwrap(); - } - Ok(()) - }); - - match main_loop { - Ok(()) => {} - Err(err) => { - log::error!("{}", err); - std::process::exit(1); - } - }; + let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); + let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap(); + let main_loop = crate::runtime::run_until_complete(rtm.handler(), event_loop.clone(), async move { + let _ = pyrx.changed().await; + stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); + while let Some(worker) = workers.pop() { + worker.join().unwrap(); } - crate::workers::WorkerSignals::Crossbeam(sig) => { - std::thread::spawn(move || { - let pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - let _ = pyrx.recv(); - stx.send(true).unwrap(); - log::info!("Stopping worker-{}", worker_id); - while let Some(worker) = workers.pop() { - worker.join().unwrap(); - } + Ok(()) + }); - Python::with_gil(|py| { - let _ = sig.get().release(py); - drop(sig); - }); - }); + match main_loop { + Ok(()) => {} + Err(err) => { + log::error!("{}", err); + std::process::exit(1); } - } + }; } }; } macro_rules! serve_wth_ssl_inner { ($self:expr, $target:expr, $callback:expr, $event_loop:expr, $wid:expr, $workers:expr, $srx:expr) => { - // let callback_wrapper = crate::callbacks::CallbackWrapper::new($callback, $event_loop.clone(), $context); let callback_wrapper = std::sync::Arc::new($callback); let py_loop = std::sync::Arc::new($event_loop.clone().unbind()); @@ -919,10 +857,9 @@ macro_rules! serve_wth_ssl { &self, callback: Py, event_loop: &Bound, - // context: Bound, - signal: crate::workers::WorkerSignals, + signal: Py, ) { - pyo3_log::init(); + _ = pyo3_log::try_init(); let worker_id = self.config.id; log::info!("Started worker-{}", worker_id); @@ -931,42 +868,25 @@ macro_rules! serve_wth_ssl { let mut workers = vec![]; crate::workers::serve_wth_ssl_inner!(self, $target, callback, event_loop, worker_id, workers, srx); - match signal { - crate::workers::WorkerSignals::Tokio(sig) => { - let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); - let mut pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - let main_loop = crate::runtime::run_until_complete(rtm.handler(), event_loop.clone(), async move { - let _ = pyrx.changed().await; - stx.send(true).unwrap(); - log::info!("Stopping worker-{}", worker_id); - while let Some(worker) = workers.pop() { - worker.join().unwrap(); - } - Ok(()) - }); - - match main_loop { - Ok(()) => {} - Err(err) => { - log::error!("{}", err); - std::process::exit(1); - } - }; + let rtm = crate::runtime::init_runtime_mt(1, 1, std::sync::Arc::new(event_loop.clone().unbind())); + let mut pyrx = signal.get().rx.lock().unwrap().take().unwrap(); + let main_loop = crate::runtime::run_until_complete(rtm.handler(), event_loop.clone(), async move { + let _ = pyrx.changed().await; + stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); + while let Some(worker) = workers.pop() { + worker.join().unwrap(); } - crate::workers::WorkerSignals::Crossbeam(sig) => { - let py = event_loop.py(); - let pyrx = sig.get().rx.lock().unwrap().take().unwrap(); - - py.allow_threads(|| { - let _ = pyrx.recv(); - stx.send(true).unwrap(); - log::info!("Stopping worker-{}", worker_id); - while let Some(worker) = workers.pop() { - worker.join().unwrap(); - } - }); + Ok(()) + }); + + match main_loop { + Ok(()) => {} + Err(err) => { + log::error!("{}", err); + std::process::exit(1); } - } + }; } }; } diff --git a/src/wsgi/serve.rs b/src/wsgi/serve.rs index f10922ee..14ae1721 100644 --- a/src/wsgi/serve.rs +++ b/src/wsgi/serve.rs @@ -4,9 +4,7 @@ use super::http::handle; use crate::callbacks::CallbackScheduler; use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py}; -use crate::workers::{ - serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignalSync, WorkerSignals, -}; +use crate::workers::{WorkerConfig, WorkerSignalSync}; #[pyclass(frozen, module = "granian._granian")] pub struct WSGIWorker { @@ -14,10 +12,211 @@ pub struct WSGIWorker { } impl WSGIWorker { - serve_rth!(_serve_rth, handle); - serve_wth!(_serve_wth, handle); - serve_rth_ssl!(_serve_rth_ssl, handle); - serve_wth_ssl!(_serve_wth_ssl, handle); + fn _serve_rth( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { + _ = pyo3_log::try_init(); + + let worker_id = self.config.id; + log::info!("Started worker-{}", worker_id); + + let tcp_listener = self.config.tcp_listener(); + let http_mode = self.config.http_mode.clone(); + let http_upgrades = self.config.websockets_enabled; + let http1_opts = self.config.http1_opts.clone(); + let http2_opts = self.config.http2_opts.clone(); + let backpressure = self.config.backpressure; + let callback_wrapper = std::sync::Arc::new(callback); + + let rt = crate::runtime::init_runtime_mt( + self.config.threads, + self.config.blocking_threads, + std::sync::Arc::new(event_loop.clone().unbind()), + ); + let rth = rt.handler(); + + let (stx, mut srx) = tokio::sync::watch::channel(false); + let main_loop = rt.handler().inner.spawn(async move { + crate::workers::loop_match!( + http_mode, + http_upgrades, + tcp_listener, + srx, + backpressure, + rth, + callback_wrapper, + tokio::spawn, + hyper_util::rt::TokioExecutor::new, + http1_opts, + http2_opts, + hyper_util::rt::TokioIo::new, + handle + ); + + log::info!("Stopping worker-{}", worker_id); + + Python::with_gil(|_| drop(callback_wrapper)); + }); + + let pysig = signal.clone_ref(py); + std::thread::spawn(move || { + let pyrx = pysig.get().rx.lock().unwrap().take().unwrap(); + _ = pyrx.recv(); + stx.send(true).unwrap(); + + while !main_loop.is_finished() { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + + Python::with_gil(|py| { + _ = pysig.get().release(py); + drop(pysig); + }); + }); + + _ = signal.get().qs.call_method0(py, pyo3::intern!(py, "wait")); + } + + fn _serve_wth( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { + _ = pyo3_log::try_init(); + + let worker_id = self.config.id; + log::info!("Started worker-{}", worker_id); + + let (stx, srx) = tokio::sync::watch::channel(false); + let mut workers = vec![]; + crate::workers::serve_wth_inner!(self, handle, callback, event_loop, worker_id, workers, srx); + + let pysig = signal.clone_ref(py); + std::thread::spawn(move || { + let pyrx = pysig.get().rx.lock().unwrap().take().unwrap(); + _ = pyrx.recv(); + stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); + while let Some(worker) = workers.pop() { + worker.join().unwrap(); + } + + Python::with_gil(|py| { + _ = pysig.get().release(py); + drop(pysig); + }); + }); + + _ = signal.get().qs.call_method0(py, pyo3::intern!(py, "wait")); + } + + fn _serve_rth_ssl( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { + _ = pyo3_log::try_init(); + + let worker_id = self.config.id; + log::info!("Started worker-{}", worker_id); + + let tcp_listener = self.config.tcp_listener(); + let http_mode = self.config.http_mode.clone(); + let http_upgrades = self.config.websockets_enabled; + let http1_opts = self.config.http1_opts.clone(); + let http2_opts = self.config.http2_opts.clone(); + let backpressure = self.config.backpressure; + let tls_cfg = self.config.tls_cfg(); + let callback_wrapper = std::sync::Arc::new(callback); + + let rt = crate::runtime::init_runtime_mt( + self.config.threads, + self.config.blocking_threads, + std::sync::Arc::new(event_loop.clone().unbind()), + ); + let rth = rt.handler(); + + let (stx, mut srx) = tokio::sync::watch::channel(false); + rt.handler().inner.spawn(async move { + crate::workers::loop_match_tls!( + http_mode, + http_upgrades, + tcp_listener, + tls_cfg, + srx, + backpressure, + rth, + callback_wrapper, + tokio::spawn, + hyper_util::rt::TokioExecutor::new, + http1_opts, + http2_opts, + hyper_util::rt::TokioIo::new, + handle + ); + + log::info!("Stopping worker-{}", worker_id); + + Python::with_gil(|_| drop(callback_wrapper)); + }); + + let pysig = signal.clone_ref(py); + std::thread::spawn(move || { + let pyrx = pysig.get().rx.lock().unwrap().take().unwrap(); + _ = pyrx.recv(); + stx.send(true).unwrap(); + + Python::with_gil(|py| { + _ = pysig.get().release(py); + drop(pysig); + }); + }); + + _ = signal.get().qs.call_method0(py, pyo3::intern!(py, "wait")); + } + + fn _serve_wth_ssl( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { + _ = pyo3_log::try_init(); + + let worker_id = self.config.id; + log::info!("Started worker-{}", worker_id); + + let (stx, srx) = tokio::sync::watch::channel(false); + let mut workers = vec![]; + crate::workers::serve_wth_ssl_inner!(self, handle, callback, event_loop, worker_id, workers, srx); + + let pysig = signal.clone_ref(py); + std::thread::spawn(move || { + let pyrx = pysig.get().rx.lock().unwrap().take().unwrap(); + _ = pyrx.recv(); + stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); + while let Some(worker) = workers.pop() { + worker.join().unwrap(); + } + + Python::with_gil(|py| { + _ = pysig.get().release(py); + drop(pysig); + }); + }); + + _ = signal.get().qs.call_method0(py, pyo3::intern!(py, "wait")); + } } #[pymethods] @@ -73,17 +272,29 @@ impl WSGIWorker { }) } - fn serve_rth(&self, callback: Py, event_loop: &Bound, signal: Py) { + fn serve_rth( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { match self.config.ssl_enabled { - false => self._serve_rth(callback, event_loop, WorkerSignals::Crossbeam(signal)), - true => self._serve_rth_ssl(callback, event_loop, WorkerSignals::Crossbeam(signal)), + false => self._serve_rth(py, callback, event_loop, signal), + true => self._serve_rth_ssl(py, callback, event_loop, signal), } } - fn serve_wth(&self, callback: Py, event_loop: &Bound, signal: Py) { + fn serve_wth( + &self, + py: Python, + callback: Py, + event_loop: &Bound, + signal: Py, + ) { match self.config.ssl_enabled { - false => self._serve_wth(callback, event_loop, WorkerSignals::Crossbeam(signal)), - true => self._serve_wth_ssl(callback, event_loop, WorkerSignals::Crossbeam(signal)), + false => self._serve_wth(py, callback, event_loop, signal), + true => self._serve_wth_ssl(py, callback, event_loop, signal), } } }