From d0648e82324e4a56bbe307c3fd349fd7fba1e926 Mon Sep 17 00:00:00 2001 From: Conrad Date: Fri, 20 Feb 2026 21:00:58 -0500 Subject: [PATCH 1/4] build: Bump grpcio minimum version to 1.78.0 --- wool/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wool/pyproject.toml b/wool/pyproject.toml index 341a508..c31c861 100644 --- a/wool/pyproject.toml +++ b/wool/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ ] dependencies = [ "cloudpickle", - "grpcio>=1.76.0", + "grpcio>=1.78.0", "portalocker", "protobuf", "shortuuid", From ed63beaed2f96f1dc3f3a747d502bc4015ce0d6b Mon Sep 17 00:00:00 2001 From: Conrad Date: Fri, 20 Feb 2026 21:01:43 -0500 Subject: [PATCH 2/4] feat: Add namespace isolation for task execution Introduce a namespace parameter to Task that controls global variable isolation during execution: - None (default): ephemeral isolated globals per invocation - "name": shared globals across tasks using the same namespace, enabling patterns like @lru_cache between related tasks - wool.WORKER: no isolation, runs in worker-level globals _IsolatedGlobals provides dict-subclass overlay semantics required by CPython's STORE_GLOBAL bytecode. Shared namespaces are managed via a ResourcePool-backed registry with TTL-based cleanup. --- wool/protobuf/task.proto | 1 + wool/src/wool/__init__.py | 2 + wool/src/wool/runtime/routine/task.py | 175 ++++++++++++++++++++++---- 3 files changed, 157 insertions(+), 21 deletions(-) diff --git a/wool/protobuf/task.proto b/wool/protobuf/task.proto index b7e062d..2abd06b 100644 --- a/wool/protobuf/task.proto +++ b/wool/protobuf/task.proto @@ -15,6 +15,7 @@ message Task { string function = 11; int32 line_no = 12; string tag = 13; + optional string namespace = 14; } message Result { diff --git a/wool/src/wool/__init__.py b/wool/src/wool/__init__.py index 1e0fab5..8d87f45 100644 --- a/wool/src/wool/__init__.py +++ b/wool/src/wool/__init__.py @@ -22,6 +22,7 @@ from wool.runtime.loadbalancer.base import NoWorkersAvailable from wool.runtime.loadbalancer.roundrobin import RoundRobinLoadBalancer from wool.runtime.resourcepool import ResourcePool +from wool.runtime.routine.task import WORKER from wool.runtime.routine.task import Task from wool.runtime.routine.task import TaskEvent from wool.runtime.routine.task import TaskEventHandler @@ -70,6 +71,7 @@ "NoWorkersAvailable", "RoundRobinLoadBalancer", # Routines + "WORKER", "Task", "TaskEvent", "TaskEventHandler", diff --git a/wool/src/wool/runtime/routine/task.py b/wool/src/wool/runtime/routine/task.py index 45a1a7b..c145481 100644 --- a/wool/src/wool/runtime/routine/task.py +++ b/wool/src/wool/runtime/routine/task.py @@ -3,7 +3,10 @@ import asyncio import logging import traceback +import types +from collections.abc import AsyncIterator from collections.abc import Callable +from contextlib import asynccontextmanager from contextlib import contextmanager from contextvars import Context from contextvars import ContextVar @@ -31,6 +34,35 @@ import wool from wool.runtime import protobuf as pb from wool.runtime.event import Event +from wool.runtime.resourcepool import ResourcePool + +# Sentinel for worker-level globals +WORKER: str = "__worker__" + +# Default TTL for namespace cleanup (5 minutes) +NAMESPACE_TTL: float = 300.0 + + +def _create_namespace_globals(namespace: str) -> _IsolatedGlobals: + """Factory function to create _IsolatedGlobals for a namespace. + + Creates an _IsolatedGlobals that falls back to an empty dict for reads. + The actual callable's __globals__ aren't available at creation time, + so reads of module-level names won't work. Named namespaces are primarily + useful for sharing mutable state between tasks, not for accessing + module-level imports (those should be accessed via the callable's closure + or passed as arguments). + """ + return _IsolatedGlobals({}) + + +# Registry stores shared _IsolatedGlobals instances, keyed by namespace name. +# All tasks using the same namespace share the same _IsolatedGlobals instance, +# so globals set by one task are visible to others. +_namespace_registry: ResourcePool[_IsolatedGlobals] = ResourcePool( + factory=_create_namespace_globals, + ttl=NAMESPACE_TTL, +) Args = Tuple Kwargs = Dict @@ -40,6 +72,50 @@ W = TypeVar("W", bound=Routine) +class _IsolatedGlobals(dict): + """A dict subclass that provides overlay semantics for function globals. + + Writes go to this dict directly, while reads fall through to the + original globals if not found. This enables namespace isolation for + task execution. + + For named namespaces, the same _IsolatedGlobals instance is shared + across all tasks using that namespace, so writes are visible to all. + + .. note:: + This type must be a ``dict`` subclass, not a ``MutableMapping``. + Python's ``STORE_GLOBAL`` bytecode uses ``PyDict_SetItem`` at the + C level, which requires an actual ``dict`` instance. Using a + ``MutableMapping`` (like ``ChainMap``) as a function's + ``__globals__`` would bypass ``__setitem__`` and fail. + + :param original_globals: + The original function's __globals__ dict to fall back to for reads. + """ + + def __init__(self, original_globals: dict): + super().__init__() + self._original = original_globals + + def __getitem__(self, key): + # First check local overlay + try: + return super().__getitem__(key) + except KeyError: + pass + # Then check original globals + return self._original[key] + + def __contains__(self, key): + return super().__contains__(key) or key in self._original + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return default + + _do_dispatch: ContextVar[bool] = ContextVar("_do_dispatch", default=True) @@ -118,6 +194,17 @@ class Task(Generic[W]): Line number where the task was defined. :param tag: Optional descriptive tag for the task. + :param namespace: + Controls global namespace isolation for task execution: + + - ``None`` (default): Ephemeral isolated globals. Each invocation + gets a fresh isolated globals dict, preventing global state + leakage between task invocations. + - ``"name"``: Named namespace. Tasks with the same namespace string + share globals, enabling patterns like ``@lru_cache`` across + related tasks. + - ``wool.WORKER``: Worker-level globals. The task runs in the + shared worker namespace with no isolation. """ id: UUID @@ -132,6 +219,7 @@ class Task(Generic[W]): function: str | None = None line_no: int | None = None tag: str | None = None + namespace: str | None = None def __post_init__(self, **kwargs): """ @@ -216,6 +304,7 @@ def from_protobuf(cls, task: pb.task.Task) -> Task: function=task.function if task.function else None, line_no=task.line_no if task.line_no else None, tag=task.tag if task.tag else None, + namespace=task.namespace if task.namespace else None, ) def to_protobuf(self) -> pb.task.Task: @@ -232,6 +321,7 @@ def to_protobuf(self) -> pb.task.Task: function=self.function if self.function else "", line_no=self.line_no if self.line_no else 0, tag=self.tag if self.tag else "", + namespace=self.namespace if self.namespace else "", ) def dispatch(self) -> W: @@ -242,6 +332,49 @@ def dispatch(self) -> W: else: raise ValueError("Expected routine to be coroutine or async generator") + @asynccontextmanager + async def _prepare_callable(self) -> AsyncIterator[Callable[..., W]]: + """Prepare the callable with appropriate globals based on namespace. + + Yields the callable to execute, managing namespace lifecycle for + shared namespaces. + + :yields: + The callable configured with the appropriate globals dict. + """ + if self.namespace is None: + # Ephemeral: fresh isolated globals each invocation + callable_globals = _IsolatedGlobals(self.callable.__globals__) + yield types.FunctionType( + self.callable.__code__, + callable_globals, + self.callable.__name__, + self.callable.__defaults__, + self.callable.__closure__, + ) + elif self.namespace == WORKER: + # Worker globals: use original directly + yield self.callable + else: + # Shared namespace: use shared globals from pool. + # All tasks with the same namespace share the same + # _IsolatedGlobals instance, so STORE_GLOBAL writes + # are visible across tasks. + async with _namespace_registry.get(self.namespace) as namespace: + # Merge this callable's globals into the shared namespace. + # Keys already in namespace take precedence, allowing + # imports from different modules to accumulate. + for key, value in self.callable.__globals__.items(): + if key not in namespace: + namespace[key] = value + yield types.FunctionType( + self.callable.__code__, + namespace, + self.callable.__name__, + self.callable.__defaults__, + self.callable.__closure__, + ) + async def _run(self): """ Execute the task's callable with its arguments in proxy context. @@ -249,7 +382,7 @@ async def _run(self): :returns: The result of executing the callable. :raises RuntimeError: - If no proxy pool is available for task execution. + If no proxy pool available for task execution. """ proxy_pool = wool.__proxy_pool__.get() if not proxy_pool: @@ -260,8 +393,9 @@ async def _run(self): try: with self: with do_dispatch(False): - await asyncio.sleep(0) - return await self.callable(*self.args, **self.kwargs) + await asyncio.sleep(0) # Release the event loop + async with self._prepare_callable() as callable_to_run: + return await callable_to_run(*self.args, **self.kwargs) finally: wool.__proxy__.reset(token) @@ -279,24 +413,23 @@ async def _stream(self): raise RuntimeError("No proxy pool available for task execution") async with proxy_pool.get(self.proxy) as proxy: await asyncio.sleep(0) - gen = self.callable(*self.args, **self.kwargs) - try: - while True: - # Set the proxy in context variable for nested task dispatch - token = wool.__proxy__.set(proxy) - try: - with self: - with do_dispatch(False): - try: - result = await anext(gen) - except StopAsyncIteration: - break - finally: - wool.__proxy__.reset(token) - - yield result - finally: - await gen.aclose() + async with self._prepare_callable() as callable_to_run: + gen = callable_to_run(*self.args, **self.kwargs) + try: + while True: + token = wool.__proxy__.set(proxy) + try: + with self: + with do_dispatch(False): + try: + result = await anext(gen) + except StopAsyncIteration: + break + finally: + wool.__proxy__.reset(token) + yield result + finally: + await gen.aclose() def _finish(self, _): TaskEvent("task-completed", task=self).emit() From 66ca2c3b911a49fcf7dea56c8c33d6b368a5043e Mon Sep 17 00:00:00 2001 From: Conrad Date: Fri, 20 Feb 2026 21:04:39 -0500 Subject: [PATCH 3/4] feat: Add namespace parameter to routine decorator Extend routine() to accept an optional namespace keyword argument, forwarded to Task during dispatch. Support bare @routine, nullary @routine(), and parameterized @routine(namespace="name") call forms. --- wool/src/wool/runtime/routine/wrapper.py | 184 ++++++++++++++--------- 1 file changed, 110 insertions(+), 74 deletions(-) diff --git a/wool/src/wool/runtime/routine/wrapper.py b/wool/src/wool/runtime/routine/wrapper.py index 5edcd08..a553759 100644 --- a/wool/src/wool/runtime/routine/wrapper.py +++ b/wool/src/wool/runtime/routine/wrapper.py @@ -15,6 +15,7 @@ from typing import Type from typing import TypeVar from typing import cast +from typing import overload from uuid import uuid4 import wool @@ -31,7 +32,7 @@ # public -def routine(fn: C) -> C: +def routine(fn: C | None = None, *, namespace: str | None = None) -> C: """Decorator to declare an asynchronous function as remotely executable. Converts an asynchronous function or async generator into a distributed @@ -39,9 +40,31 @@ def routine(fn: C) -> C: is invoked, it is dispatched to the worker pool associated with the current worker pool session context. + This decorator supports three usage forms: + + - ``@work`` - bare decorator (default namespace=None) + - ``@work()`` - empty call (default namespace=None) + - ``@work(namespace="name")`` - with explicit parameter + :param fn: The asynchronous function or async generator to convert into a - distributed routine. + distributed routine. When using as ``@work``, this is the decorated + function. When using as ``@work()``, this is None. + :param namespace: + Controls global namespace isolation for task execution: + + - ``None`` (default): Ephemeral isolated globals. Each invocation + gets fresh isolated globals, preventing global state leakage. + - ``"name"``: Named namespace. Tasks with the same namespace string + share globals, enabling patterns like ``@lru_cache``. When a task + enters a shared namespace, it inherits any existing globals and + contributes its own module globals (e.g., imports) for keys that + don't already exist. This allows tasks from different modules to + share a namespace while each bringing its required imports, and + prevents later tasks from overwriting state (like caches) + established by earlier tasks. + - ``wool.WORKER``: Worker-level globals. The task runs in the + shared worker namespace with no isolation. :returns: The decorated function that dispatches to the worker pool when called. @@ -139,82 +162,93 @@ async def main(): async for value in fibonacci_series(10): print(value) # 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 """ - # Check if function is a coroutine or async generator - is_valid = ( - iscoroutinefunction(fn) - or isasyncgenfunction(fn) - or ( - isinstance(fn, (classmethod, staticmethod)) - and (iscoroutinefunction(fn.__func__) or isasyncgenfunction(fn.__func__)) - ) - ) - if not is_valid: - raise ValueError("Expected a coroutine function or async generator function") - - if isinstance(fn, (classmethod, staticmethod)): - wrapped_fn = fn.__func__ - else: - wrapped_fn = fn - - if isasyncgenfunction(wrapped_fn): - - @wraps(wrapped_fn) - async def async_generator_wrapper(*args, **kwargs): - # Handle static and class methods in a picklable way. - parent, function = _resolve(fn) - assert parent is not None - assert callable(function) - - if do_dispatch(): - proxy = wool.__proxy__.get() - assert proxy - stream = await _dispatch( - proxy, - async_generator_wrapper.__module__, - async_generator_wrapper.__qualname__, - function, - *args, - **kwargs, - ) - else: - stream = _stream(fn, parent, *args, **kwargs) - assert isasyncgen(stream) - try: - async for result in stream: - yield result - finally: - await stream.aclose() + def decorator(fn: C) -> C: + # Check if function is a coroutine or async generator + is_valid = ( + iscoroutinefunction(fn) + or isasyncgenfunction(fn) + or ( + isinstance(fn, (classmethod, staticmethod)) + and (iscoroutinefunction(fn.__func__) or isasyncgenfunction(fn.__func__)) + ) + ) + if not is_valid: + raise ValueError("Expected a coroutine function or async generator function") - return cast(C, async_generator_wrapper) + if isinstance(fn, (classmethod, staticmethod)): + wrapped_fn = fn.__func__ + else: + wrapped_fn = fn + + if isasyncgenfunction(wrapped_fn): + + @wraps(wrapped_fn) + async def async_generator_wrapper(*args, **kwargs): + # Handle static and class methods in a picklable way. + parent, function = _resolve(fn) + assert parent is not None + assert callable(function) + + if do_dispatch(): + proxy = wool.__proxy__.get() + assert proxy + stream = await _dispatch( + proxy, + async_generator_wrapper.__module__, + async_generator_wrapper.__qualname__, + function, + namespace, + *args, + **kwargs, + ) + else: + stream = _stream(fn, parent, *args, **kwargs) + assert isasyncgen(stream) + + try: + async for result in stream: + yield result + finally: + await stream.aclose() + + return cast(C, async_generator_wrapper) - else: + else: - @wraps(wrapped_fn) - async def coroutine_wrapper(*args, **kwargs): - # Handle static and class methods in a picklable way. - parent, function = _resolve(fn) - assert parent is not None - assert callable(function) - - if do_dispatch(): - proxy = wool.__proxy__.get() - assert proxy - stream = await _dispatch( - proxy, - coroutine_wrapper.__module__, - coroutine_wrapper.__qualname__, - function, - *args, - **kwargs, - ) - coro = _stream_to_coroutine(stream) - else: - coro = _execute(fn, parent, *args, **kwargs) - - return await coro - - return cast(C, coroutine_wrapper) + @wraps(wrapped_fn) + async def coroutine_wrapper(*args, **kwargs): + # Handle static and class methods in a picklable way. + parent, function = _resolve(fn) + assert parent is not None + assert callable(function) + + if do_dispatch(): + proxy = wool.__proxy__.get() + assert proxy + stream = await _dispatch( + proxy, + coroutine_wrapper.__module__, + coroutine_wrapper.__qualname__, + function, + namespace, + *args, + **kwargs, + ) + coro = _stream_to_coroutine(stream) + else: + coro = _execute(fn, parent, *args, **kwargs) + + return await coro + + return cast(C, coroutine_wrapper) + + # Handle @routine vs @routine() vs @routine(namespace=...) + if fn is not None: + # Called as @routine (bare decorator) + return decorator(fn) + # Called as @routine() or @routine(namespace=...) + return decorator def _dispatch( @@ -222,6 +256,7 @@ def _dispatch( module: str, qualname: str, function: Callable[..., Coroutine | AsyncGenerator], + namespace: str | None, *args, **kwargs, ): @@ -240,6 +275,7 @@ def _dispatch( kwargs=kwargs, tag=f"{module}.{qualname}({signature})", proxy=proxy, + namespace=namespace, ) return proxy.dispatch(task, timeout=ctx.dispatch_timeout.get()) From eb4156c3064b4b2c1b69a4838fc76e3ca0f94232 Mon Sep 17 00:00:00 2001 From: Conrad Date: Fri, 20 Feb 2026 21:05:31 -0500 Subject: [PATCH 4/4] test: Add tests for namespace isolation and routine decorator forms Cover _IsolatedGlobals overlay semantics, _namespace_registry lifecycle, Task.namespace serialization round-trips, and the three routine() call forms (bare, nullary, parameterized). Add WORKER to the expected public API surface. --- wool/tests/runtime/routine/test_task.py | 817 +++++++++++++++++++++ wool/tests/runtime/routine/test_wrapper.py | 417 +++++++++++ wool/tests/test_public.py | 1 + 3 files changed, 1235 insertions(+) diff --git a/wool/tests/runtime/routine/test_task.py b/wool/tests/runtime/routine/test_task.py index 739e026..de610f2 100644 --- a/wool/tests/runtime/routine/test_task.py +++ b/wool/tests/runtime/routine/test_task.py @@ -1356,3 +1356,820 @@ async def create_nested_tasks(depth): # (except the first one which has no caller) if i > 0: assert tasks[i].caller == tasks[i - 1].id + + +class TestNamespaceIsolation: + """Tests for Task namespace parameter and namespace isolation.""" + + # NS-001: Default namespace=None when not specified + def test_default_namespace_none(self, sample_task, clear_event_handlers): + """Test Task defaults namespace to None. + + Given: + A Task created without specifying namespace + When: + Task is instantiated + Then: + The namespace field defaults to None + """ + # Arrange & Act + task = sample_task() + + # Assert + assert task.namespace is None + + # NS-002: Explicit namespace=None + def test_explicit_namespace_none(self, sample_task, clear_event_handlers): + """Test Task with explicit namespace=None. + + Given: + A Task created with namespace=None + When: + Task is instantiated + Then: + The namespace field is None + """ + # Arrange & Act + task = sample_task(namespace=None) + + # Assert + assert task.namespace is None + + # NS-003: Explicit namespace with WORKER sentinel + def test_explicit_namespace_worker(self, sample_task, clear_event_handlers): + """Test Task with namespace=WORKER. + + Given: + A Task created with namespace=WORKER + When: + Task is instantiated + Then: + The namespace field is the WORKER sentinel + """ + from wool.runtime.routine.task import WORKER + + # Arrange & Act + task = sample_task(namespace=WORKER) + + # Assert + assert task.namespace == WORKER + + # NS-004: Explicit named namespace + def test_explicit_named_namespace(self, sample_task, clear_event_handlers): + """Test Task with explicit named namespace. + + Given: + A Task created with namespace="cache" + When: + Task is instantiated + Then: + The namespace field is "cache" + """ + # Arrange & Act + task = sample_task(namespace="cache") + + # Assert + assert task.namespace == "cache" + + # NS-005: Isolation - coroutine globals not visible after execution + @pytest.mark.asyncio + async def test_isolation_coroutine_globals_not_visible( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolated coroutine globals are not visible after execution. + + Given: + A Task with namespace=None (default) + When: + The callable sets a global variable + Then: + The global variable is not visible in the original namespace + """ + + # Arrange + async def test_callable(): + global isolation_test_var + isolation_test_var = "modified" # noqa: F841 + return "result" + + task = sample_task(callable=test_callable, namespace=None) + + # Act + result = await task.dispatch() + + # Assert + assert result == "result" + # The global should not be set in the original namespace + assert "isolation_test_var" not in globals() + + # NS-006: Worker namespace - coroutine globals persist + @pytest.mark.asyncio + async def test_worker_namespace_coroutine_globals_persist( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test worker namespace coroutine globals persist after execution. + + Given: + A Task with namespace=WORKER + When: + The callable sets a global variable + Then: + The global variable is visible in the original namespace + """ + from wool.runtime.routine.task import WORKER + + # Arrange + async def test_callable(): + global shared_test_var + shared_test_var = "modified" + return shared_test_var + + task = sample_task(callable=test_callable, namespace=WORKER) + + # Act + result = await task.dispatch() + + # Assert + assert result == "modified" + # The global should be set in the original namespace + assert globals().get("shared_test_var") == "modified" + + # Cleanup + del globals()["shared_test_var"] + + # NS-007: Isolation - async generator globals not visible + @pytest.mark.asyncio + async def test_isolation_async_generator_globals_not_visible( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolated async generator globals are not visible after execution. + + Given: + A Task with async generator callable and namespace=None + When: + The callable sets a global variable + Then: + The global variable is not visible in the original namespace + """ + + # Arrange + async def test_generator(): + global isolation_gen_var + isolation_gen_var = "generator_modified" # noqa: F841 + yield "value" + + task = sample_task(callable=test_generator, namespace=None) + + # Act + results = [] + async for value in task.dispatch(): + results.append(value) + + # Assert + assert results == ["value"] + assert "isolation_gen_var" not in globals() + + # NS-008: Worker namespace - async generator globals persist + @pytest.mark.asyncio + async def test_worker_namespace_async_generator_globals_persist( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test worker namespace async generator globals persist after execution. + + Given: + A Task with async generator callable and namespace=WORKER + When: + The callable sets a global variable + Then: + The global variable is visible in the original namespace + """ + from wool.runtime.routine.task import WORKER + + # Arrange + async def test_generator(): + global shared_gen_var + shared_gen_var = "generator_modified" + yield shared_gen_var + + task = sample_task(callable=test_generator, namespace=WORKER) + + # Act + results = [] + async for value in task.dispatch(): + results.append(value) + + # Assert + assert results == ["generator_modified"] + assert globals().get("shared_gen_var") == "generator_modified" + + # Cleanup + del globals()["shared_gen_var"] + + # NS-009: Read-through - module imports accessible via overlay + @pytest.mark.asyncio + async def test_read_through_module_imports_accessible( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolated tasks can read module-level imports. + + Given: + A Task with namespace=None + When: + The callable accesses a module-level import + Then: + The import is accessible through the overlay + """ + + # Arrange + async def test_callable(): + # Should be able to access uuid4 imported at module level + return uuid4() + + task = sample_task(callable=test_callable, namespace=None) + + # Act + result = await task.dispatch() + + # Assert - should return a valid UUID + from uuid import UUID + + assert isinstance(result, UUID) + + # NS-010: Write isolation - shadowed imports don't affect original + @pytest.mark.asyncio + async def test_write_isolation_shadowed_imports_dont_affect_original( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test shadowed module-level names don't affect original namespace. + + Given: + A Task with namespace=None + When: + The callable shadows a module-level name + Then: + The original module-level name is unchanged + """ + # Arrange - capture the original uuid4 reference + from uuid import uuid4 as original_uuid4 + + # Store original reference for comparison + original_ref = uuid4 + + async def test_callable(): + global uuid4 + uuid4 = lambda: "fake_uuid" # noqa: F841, E731 + return "done" + + task = sample_task(callable=test_callable, namespace=None) + + # Act + await task.dispatch() + + # Assert - uuid4 in this test file should still be the original function + # We compare against the captured original reference + assert uuid4 is original_ref + assert uuid4 is original_uuid4 + + # NS-011: Closure variables preserved with isolation + @pytest.mark.asyncio + async def test_closure_variables_preserved_with_isolation( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test closure variables work correctly with isolation. + + Given: + A Task with namespace=None and a callable with closure + When: + The callable accesses closure variables + Then: + The closure variables are accessible + """ + # Arrange + captured_value = "closure_value" + + async def test_callable(): + return captured_value + + task = sample_task(callable=test_callable, namespace=None) + + # Act + result = await task.dispatch() + + # Assert + assert result == "closure_value" + + # NS-012: from_protobuf with namespace set + @pytest.mark.asyncio + async def test_from_protobuf_with_namespace( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test from_protobuf correctly deserializes namespace. + + Given: + A protobuf Task message with namespace="cache" + When: + from_protobuf is called + Then: + The deserialized Task has namespace="cache" + """ + # Arrange + task_id = uuid4() + pb_task = pb.task.Task( + id=str(task_id), + callable=cloudpickle.dumps(sample_async_callable), + args=cloudpickle.dumps(()), + kwargs=cloudpickle.dumps({}), + proxy=cloudpickle.dumps(picklable_proxy), + proxy_id=str(picklable_proxy.id), + namespace="cache", + ) + + # Act + task = Task.from_protobuf(pb_task) + + # Assert + assert task.namespace == "cache" + + # NS-013: from_protobuf with WORKER namespace + @pytest.mark.asyncio + async def test_from_protobuf_with_worker_namespace( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test from_protobuf correctly deserializes WORKER namespace. + + Given: + A protobuf Task message with namespace=WORKER + When: + from_protobuf is called + Then: + The deserialized Task has namespace=WORKER + """ + from wool.runtime.routine.task import WORKER + + # Arrange + task_id = uuid4() + pb_task = pb.task.Task( + id=str(task_id), + callable=cloudpickle.dumps(sample_async_callable), + args=cloudpickle.dumps(()), + kwargs=cloudpickle.dumps({}), + proxy=cloudpickle.dumps(picklable_proxy), + proxy_id=str(picklable_proxy.id), + namespace=WORKER, + ) + + # Act + task = Task.from_protobuf(pb_task) + + # Assert + assert task.namespace == WORKER + + # NS-014: from_protobuf defaults to None when field absent + @pytest.mark.asyncio + async def test_from_protobuf_defaults_none_when_absent( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test from_protobuf defaults namespace to None when field is absent. + + Given: + A protobuf Task message without namespace field set + When: + from_protobuf is called + Then: + The deserialized Task has namespace=None (default) + """ + # Arrange + task_id = uuid4() + pb_task = pb.task.Task( + id=str(task_id), + callable=cloudpickle.dumps(sample_async_callable), + args=cloudpickle.dumps(()), + kwargs=cloudpickle.dumps({}), + proxy=cloudpickle.dumps(picklable_proxy), + proxy_id=str(picklable_proxy.id), + # namespace intentionally not set + ) + + # Act + task = Task.from_protobuf(pb_task) + + # Assert + assert task.namespace is None + + # NS-015: to_protobuf serializes namespace + def test_to_protobuf_serializes_namespace( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test to_protobuf correctly serializes namespace. + + Given: + A Task with namespace="cache" + When: + to_protobuf is called + Then: + The protobuf Task has namespace="cache" + """ + # Arrange + task = Task( + id=uuid4(), + callable=sample_async_callable, + args=(), + kwargs={}, + proxy=picklable_proxy, + namespace="cache", + ) + + # Act + pb_task = task.to_protobuf() + + # Assert + assert pb_task.namespace == "cache" + + # NS-016: to_protobuf serializes namespace=None as empty string + def test_to_protobuf_serializes_namespace_none( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test to_protobuf correctly serializes namespace=None. + + Given: + A Task with namespace=None + When: + to_protobuf is called + Then: + The protobuf Task has namespace="" + """ + # Arrange + task = Task( + id=uuid4(), + callable=sample_async_callable, + args=(), + kwargs={}, + proxy=picklable_proxy, + namespace=None, + ) + + # Act + pb_task = task.to_protobuf() + + # Assert + assert pb_task.namespace == "" + + # NS-EC-001: Mutable object mutation visible (overlay semantics) + @pytest.mark.asyncio + async def test_mutable_object_mutation_visible( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test mutable object mutations are visible through overlay. + + Given: + A Task with namespace=None accessing a mutable global + When: + The callable mutates the mutable object + Then: + The mutation is visible (ChainMap semantics allow this) + """ + # Arrange + mutable_list = [1, 2, 3] + + async def test_callable(): + # Mutations to mutable objects accessed from parent globals + # are visible because we get a reference to the original object + mutable_list.append(4) + return mutable_list + + task = sample_task(callable=test_callable, namespace=None) + + # Act + result = await task.dispatch() + + # Assert - mutation is visible because we mutated the same object + assert result == [1, 2, 3, 4] + assert mutable_list == [1, 2, 3, 4] + + # NS-EC-002: Deletion affects overlay only + @pytest.mark.asyncio + async def test_deletion_affects_overlay_only( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test deletion in isolated namespace doesn't affect original. + + Given: + A Task with namespace=None + When: + The callable shadows then deletes a global name + Then: + The original namespace is unaffected + """ + # Arrange - define a module-level variable for this test + global deletion_test_var + deletion_test_var = "original" + + async def test_callable(): + global deletion_test_var + deletion_test_var = "shadowed" # noqa: F841 + # Deletion in overlay should not raise (shadows the original) + del deletion_test_var + return "done" + + task = sample_task(callable=test_callable, namespace=None) + + # Act + result = await task.dispatch() + + # Assert + assert result == "done" + # Original should be unaffected + assert globals().get("deletion_test_var") == "original" + + # Cleanup + del globals()["deletion_test_var"] + + # NS-EC-003: Isolation through nested function calls + @pytest.mark.asyncio + async def test_isolation_through_nested_function_calls( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolation is maintained through nested function calls. + + Given: + A Task with namespace=None calling a nested function + When: + The nested function sets a global + Then: + The global is isolated to the overlay + """ + + # Arrange + async def test_callable(): + def nested_helper(): + global nested_isolation_var + nested_isolation_var = "from_nested" # noqa: F841 + + nested_helper() + return "done" + + task = sample_task(callable=test_callable, namespace=None) + + # Act + await task.dispatch() + + # Assert - nested globals should also be isolated + # Note: The nested function has its own globals reference, + # so this tests the isolation behavior for the main callable + assert "nested_isolation_var" not in globals() + + # NS-EC-004: Isolation maintained on exception + @pytest.mark.asyncio + async def test_isolation_maintained_on_exception( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolation is maintained even when exception is raised. + + Given: + A Task with namespace=None that raises an exception + When: + The callable sets a global then raises + Then: + The global is still isolated + """ + + # Arrange + async def test_callable(): + global exception_test_var + exception_test_var = "before_exception" # noqa: F841 + raise ValueError("Test exception") + + task = sample_task(callable=test_callable, namespace=None) + + # Act + with pytest.raises(ValueError, match="Test exception"): + await task.dispatch() + + # Assert - global should still be isolated + assert "exception_test_var" not in globals() + + # NS-EC-005: Isolation maintained on cancellation + @pytest.mark.asyncio + async def test_isolation_maintained_on_cancellation( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test isolation is maintained when task is cancelled. + + Given: + A Task with namespace=None that gets cancelled + When: + The callable sets a global then is cancelled + Then: + The global is still isolated + """ + + # Arrange + async def test_callable(): + global cancellation_test_var + cancellation_test_var = "before_cancel" # noqa: F841 + await asyncio.sleep(10) # Will be cancelled + return "not reached" + + task = sample_task(callable=test_callable, namespace=None) + + # Act + async def run_and_cancel(): + task_coro = asyncio.create_task(task.dispatch()) + await asyncio.sleep(0.01) + task_coro.cancel() + try: + await task_coro + except asyncio.CancelledError: + pass + + await run_and_cancel() + + # Assert - global should still be isolated + assert "cancellation_test_var" not in globals() + + # NS-SHARE-001: Named namespace shares globals between tasks + @pytest.mark.asyncio + async def test_named_namespace_shares_globals( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test that tasks with same namespace share globals. + + Given: + Two Tasks with namespace="shared" + When: + First task sets a global, second task reads it + Then: + The second task should see the global set by the first + """ + from wool.runtime.routine.task import _namespace_registry + + # Arrange + async def setter(): + global shared_namespace_var + shared_namespace_var = "from_first_task" + return "done" + + async def getter(): + global shared_namespace_var + return shared_namespace_var + + task1 = sample_task(callable=setter, namespace="shared_test") + task2 = sample_task(callable=getter, namespace="shared_test") + + # Act + await task1.dispatch() + result = await task2.dispatch() + + # Assert + assert result == "from_first_task" + + # Cleanup - clear the namespace registry + await _namespace_registry.clear("shared_test") + + # NS-SHARE-002: Different namespaces are isolated + @pytest.mark.asyncio + async def test_different_namespaces_isolated( + self, sample_task, mock_worker_proxy_cache, clear_event_handlers + ): + """Test that tasks with different namespaces are isolated. + + Given: + Two Tasks with different namespaces + When: + First task sets a global, second task tries to read it + Then: + The second task should not see the global + """ + from wool.runtime.routine.task import _namespace_registry + + # Arrange + async def setter(): + global isolated_namespace_var + isolated_namespace_var = "from_first_task" + return "done" + + async def getter(): + global isolated_namespace_var + try: + return isolated_namespace_var + except NameError: + return "not_found" + + task1 = sample_task(callable=setter, namespace="ns_a") + task2 = sample_task(callable=getter, namespace="ns_b") + + # Act + await task1.dispatch() + result = await task2.dispatch() + + # Assert - different namespace should not see the variable + assert result == "not_found" + + # Cleanup + await _namespace_registry.clear("ns_a") + await _namespace_registry.clear("ns_b") + + # PBT-NS-001: Serialization round-trip preserves namespace + @settings(max_examples=50, deadline=None) + @given( + namespace_value=st.one_of( + st.none(), + st.text( + min_size=1, + max_size=20, + alphabet=st.characters( + whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="_-" + ), + ), + ) + ) + @pytest.mark.asyncio + async def test_serialization_roundtrip_preserves_namespace( + self, + namespace_value, + ): + """Property-based test: namespace survives serialization round-trip. + + Given: + A Task with any namespace value + When: + Serialized to protobuf and deserialized + Then: + The namespace value is preserved + """ + + # Arrange + async def test_callable(): + return "result" + + proxy = PicklableProxy() + original_task = Task( + id=uuid4(), + callable=test_callable, + args=(), + kwargs={}, + proxy=proxy, + namespace=namespace_value, + ) + + # Act + pb_task = original_task.to_protobuf() + deserialized_task = Task.from_protobuf(pb_task) + + # Assert + assert deserialized_task.namespace == original_task.namespace + + # PBT-NS-002: Multiple executions have isolated globals with namespace=None + @settings( + max_examples=10, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given(execution_count=st.integers(min_value=2, max_value=5)) + @pytest.mark.asyncio + async def test_multiple_executions_have_isolated_globals( + self, + execution_count, + mock_worker_proxy_cache, + clear_event_handlers, + ): + """Property-based test: Each execution has isolated globals. + + Given: + A Task with namespace=None + When: + Executed multiple times + Then: + Each execution has isolated globals + """ + # Arrange + execution_results = [] + + async def test_callable(): + global multi_exec_var + try: + # Try to access the variable from previous execution + _ = multi_exec_var + execution_results.append("found") + except NameError: + execution_results.append("not_found") + multi_exec_var = "set" # noqa: F841 + return "done" + + proxy = PicklableProxy() + + # Act - execute multiple times + for _ in range(execution_count): + task = Task( + id=uuid4(), + callable=test_callable, + args=(), + kwargs={}, + proxy=proxy, + namespace=None, + ) + await task.dispatch() + + # Assert - each execution should not find the variable from previous + # (first execution gets NameError, all should get NameError with isolation) + assert all(r == "not_found" for r in execution_results) diff --git a/wool/tests/runtime/routine/test_wrapper.py b/wool/tests/runtime/routine/test_wrapper.py index 0058e1e..9606207 100644 --- a/wool/tests/runtime/routine/test_wrapper.py +++ b/wool/tests/runtime/routine/test_wrapper.py @@ -42,6 +42,27 @@ async def foo_gen(x): assert foo_gen.__module__ == "runtime.routine.test_wrapper" +# Additional module-level functions for namespace parameter tests +@routine(namespace="test_namespace") +async def namespace_func(): + """Test function with named namespace for dispatch tests.""" + return "result" + + +assert namespace_func.__qualname__ == "namespace_func" +assert namespace_func.__module__ == "runtime.routine.test_wrapper" + + +@routine(namespace="local_test") +async def local_exec_func(x): + """Test function for local execution tests.""" + return x * 2 + + +assert local_exec_func.__qualname__ == "local_exec_func" +assert local_exec_func.__module__ == "runtime.routine.test_wrapper" + + class Foo: """Test class for instance and class method tests.""" @@ -567,3 +588,399 @@ async def consume_generator(): # Assert cleanup happened assert mock_stream.aclose_called + + +class TestRoutineNamespaceParameter: + """Tests for the @routine decorator with namespace parameter.""" + + # WN-001: @routine without arguments defaults namespace=None + def test_bare_decorator_defaults_namespace_none(self): + """Test @routine bare decorator defaults namespace to None. + + Given: + A function decorated with @routine (bare, no parentheses) + When: + The decorator is applied + Then: + The underlying dispatch uses namespace=None + """ + # Arrange & Act - already done at module level + # foo is decorated with @routine + + # Assert - verify the function is wrapped (indirectly) + assert callable(foo) + assert foo.__name__ == "foo" + + # WN-002: @routine() empty call defaults namespace=None + def test_empty_call_defaults_namespace_none(self): + """Test @routine() empty call defaults namespace to None. + + Given: + A function decorated with @routine() + When: + The decorator is applied + Then: + The function is properly wrapped + """ + + # Arrange + @routine() + async def empty_call_func(): + return "result" + + # Assert + assert callable(empty_call_func) + assert empty_call_func.__name__ == "empty_call_func" + + # WN-003: @routine(namespace=None) explicit + def test_explicit_namespace_none(self): + """Test @routine(namespace=None) creates a properly wrapped function. + + Given: + A function decorated with @routine(namespace=None) + When: + The decorator is applied + Then: + The function is properly wrapped + """ + + # Arrange + @routine(namespace=None) + async def explicit_none_func(): + return "result" + + # Assert + assert callable(explicit_none_func) + assert explicit_none_func.__name__ == "explicit_none_func" + + # WN-004: @routine(namespace="name") explicit + def test_explicit_namespace_string(self): + """Test @routine(namespace="name") creates a properly wrapped function. + + Given: + A function decorated with @routine(namespace="cache") + When: + The decorator is applied + Then: + The function is properly wrapped + """ + + # Arrange + @routine(namespace="cache") + async def explicit_namespace_func(): + return "result" + + # Assert + assert callable(explicit_namespace_func) + assert explicit_namespace_func.__name__ == "explicit_namespace_func" + + # WN-005: @routine(namespace=None) on async generator + def test_namespace_none_on_async_generator(self): + """Test @routine(namespace=None) works on async generators. + + Given: + An async generator decorated with @routine(namespace=None) + When: + The decorator is applied + Then: + The function is properly wrapped + """ + + # Arrange + @routine(namespace=None) + async def gen_namespace_none(): + yield 1 + yield 2 + + # Assert + assert callable(gen_namespace_none) + assert gen_namespace_none.__name__ == "gen_namespace_none" + + # WN-006: @routine(namespace="name") on async generator + def test_namespace_string_on_async_generator(self): + """Test @routine(namespace="name") works on async generators. + + Given: + An async generator decorated with @routine(namespace="shared") + When: + The decorator is applied + Then: + The function is properly wrapped + """ + + # Arrange + @routine(namespace="shared") + async def gen_namespace_shared(): + yield 1 + yield 2 + + # Assert + assert callable(gen_namespace_shared) + assert gen_namespace_shared.__name__ == "gen_namespace_shared" + + # WN-007: @routine(namespace=None) on classmethod + def test_namespace_none_on_classmethod(self): + """Test @routine(namespace=None) works on classmethods. + + Given: + A classmethod decorated with @routine(namespace=None) + When: + The decorator is applied + Then: + The method is properly wrapped + """ + + # Arrange + class TestClass: + @routine(namespace=None) + @classmethod + async def class_method(cls): + return "result" + + # Assert + assert callable(TestClass.class_method) + assert TestClass.class_method.__name__ == "class_method" + + # WN-008: @routine(namespace="name") on staticmethod + def test_namespace_string_on_staticmethod(self): + """Test @routine(namespace="name") works on staticmethods. + + Given: + A staticmethod decorated with @routine(namespace="shared") + When: + The decorator is applied + Then: + The method is properly wrapped + """ + + # Arrange + class TestClass: + @routine(namespace="shared") + @staticmethod + async def static_method(): + return "result" + + # Assert + assert callable(TestClass.static_method) + assert TestClass.static_method.__name__ == "static_method" + + # WN-009: Dispatch path creates WorkTask with namespace=None + @pytest.mark.asyncio + async def test_dispatch_path_creates_task_with_namespace_none( + self, + mocker: MockerFixture, + mock_proxy_context, + ): + """Test dispatch path creates WorkTask with namespace=None. + + Given: + A function decorated with @routine (defaults to namespace=None) + When: + The function is called in dispatch mode + Then: + The WorkTask is created with namespace=None + """ + # Arrange + captured_task = None + + async def capture_dispatch(task, **kwargs): + nonlocal captured_task + captured_task = task + + async def _stream(): + yield 8 # Expected result for foo(5, 3) + + return _stream() + + mock_proxy_context.dispatch = mocker.MagicMock(side_effect=capture_dispatch) + + # Act - use module-level decorated function + await foo(5, 3) + + # Assert + assert captured_task is not None + assert captured_task.namespace is None + + # WN-010: Dispatch path creates WorkTask with namespace string + @pytest.mark.asyncio + async def test_dispatch_path_creates_task_with_namespace_string( + self, + mocker: MockerFixture, + mock_proxy_context, + ): + """Test dispatch path creates WorkTask with namespace string. + + Given: + A function decorated with @routine(namespace="test_namespace") + When: + The function is called in dispatch mode + Then: + The WorkTask is created with namespace="test_namespace" + """ + # Arrange + captured_task = None + + async def capture_dispatch(task, **kwargs): + nonlocal captured_task + captured_task = task + + async def _stream(): + yield "result" + + return _stream() + + mock_proxy_context.dispatch = mocker.MagicMock(side_effect=capture_dispatch) + + # Act - use the module-level decorated function with namespace + await namespace_func() + + # Assert + assert captured_task is not None + assert captured_task.namespace == "test_namespace" + + # WN-011: Local execution path (namespace has no effect) + @pytest.mark.asyncio + async def test_local_execution_path_namespace_no_effect(self): + """Test local execution ignores namespace parameter. + + Given: + A function decorated with @routine(namespace="local_test") + When: + The function is called in local mode (do_dispatch=False) + Then: + The function executes normally (namespace has no effect locally) + """ + # Arrange + from wool.runtime.routine.task import do_dispatch + + # Act - use the module-level decorated function + with do_dispatch(False): + result = await local_exec_func(5) + + # Assert + assert result == 10 + + # WN-VAL-001: Unknown parameter raises TypeError + def test_unknown_parameter_raises_type_error(self): + """Test unknown parameter raises TypeError. + + Given: + A function decorated with @routine(unknown_param=True) + When: + The decorator is applied + Then: + TypeError is raised + """ + # Arrange, Act & Assert + with pytest.raises(TypeError, match="unexpected keyword argument"): + + @routine(unknown_param=True) # type: ignore + async def unknown_param_func(): + return "result" + + # WN-012: @routine with WORKER sentinel + def test_namespace_worker_sentinel(self): + """Test @routine(namespace=WORKER) creates a properly wrapped function. + + Given: + A function decorated with @routine(namespace=WORKER) + When: + The decorator is applied + Then: + The function is properly wrapped + """ + import wool + + # Arrange + @routine(namespace=wool.WORKER) + async def worker_namespace_func(): + return "result" + + # Assert + assert callable(worker_namespace_func) + assert worker_namespace_func.__name__ == "worker_namespace_func" + + # PBT-WN-001: Decorator works for all function type + namespace combinations + @settings( + max_examples=20, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + namespace_value=st.one_of(st.none(), st.just("test_ns")), + function_type=st.sampled_from( + ["coroutine", "async_generator", "classmethod", "staticmethod"] + ), + ) + @pytest.mark.asyncio + async def test_decorator_all_combinations( + self, + namespace_value, + function_type, + mocker: MockerFixture, + mock_proxy_context, + ): + """Property test: @routine decorator works for all function/namespace combos. + + Given: + Any namespace value (None or string) + Any function type (coroutine, async_generator, classmethod, staticmethod) + When: + The decorator is applied and called in dispatch mode + Then: + The function is properly wrapped, dispatches correctly, + and the WorkTask has the correct namespace value + """ + # Arrange + captured_task = None + + async def capture_dispatch(task, **kwargs): + nonlocal captured_task + captured_task = task + + async def _stream(): + yield "result" + + return _stream() + + mock_proxy_context.dispatch = mocker.MagicMock(side_effect=capture_dispatch) + + # Act - use existing module-level functions based on namespace value + # We test decoration works, but use module-level functions for dispatch + if namespace_value is None: + # Use foo which has default namespace=None + await foo(5, 3) + else: + # Use namespace_func which has namespace="test_namespace" + await namespace_func() + + # Assert - verify the captured task has correct namespace value + assert captured_task is not None + if namespace_value is None: + assert captured_task.namespace is None + else: + # namespace_func uses "test_namespace" + assert captured_task.namespace == "test_namespace" + + # Also verify decoration doesn't fail for other function types + # (we can't dispatch these due to _resolve limitations, but we can + # verify they decorate without error) + if function_type == "classmethod": + + class TestClass: + @routine(namespace=namespace_value) + @classmethod + async def method(cls): + return "result" + + assert callable(TestClass.method) + + elif function_type == "staticmethod": + + class TestClass: + @routine(namespace=namespace_value) + @staticmethod + async def method(): + return "result" + + assert callable(TestClass.method) diff --git a/wool/tests/test_public.py b/wool/tests/test_public.py index 3050c52..b998463 100644 --- a/wool/tests/test_public.py +++ b/wool/tests/test_public.py @@ -44,6 +44,7 @@ def test_public_api_completeness(): "NoWorkersAvailable", "RoundRobinLoadBalancer", # Routines + "WORKER", "Task", "TaskEvent", "TaskEventHandler",