From 0a5d224decaeb98e9f52d806346ec2105cdd46f3 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:31:24 -0500 Subject: [PATCH 1/7] build: Add version fields to proto schemas Reserve field 1 for `version` in the Task message and add TaskVersionEnvelope for pre-deserialization version extraction. Add a `version` field to Ack for observability. --- wool/protobuf/task.proto | 18 +++++++++++++----- wool/protobuf/worker.proto | 1 + wool/src/wool/runtime/protobuf/task.py | 3 ++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/wool/protobuf/task.proto b/wool/protobuf/task.proto index b7e062d..e3bc813 100644 --- a/wool/protobuf/task.proto +++ b/wool/protobuf/task.proto @@ -3,11 +3,12 @@ syntax = "proto3"; package wool.runtime.protobuf.task; message Task { - string id = 1; - bytes callable = 2; - bytes args = 3; - bytes kwargs = 4; - string caller = 5; + string version = 1; + string id = 2; + bytes callable = 3; + bytes args = 4; + bytes kwargs = 5; + string caller = 6; bytes proxy = 7; string proxy_id = 8; int32 timeout = 9; @@ -17,6 +18,13 @@ message Task { string tag = 13; } +// Minimal envelope for pre-deserialization version extraction. +// Used by the version interceptor to parse field 1 from any +// Task wire format, including future incompatible versions. +message TaskVersionEnvelope { + string version = 1; +} + message Result { bytes dump = 1; } diff --git a/wool/protobuf/worker.proto b/wool/protobuf/worker.proto index b973112..293f4dd 100644 --- a/wool/protobuf/worker.proto +++ b/wool/protobuf/worker.proto @@ -20,6 +20,7 @@ message Response { message Ack { // Acknowledgment that the task was received and processing started + string version = 1; } message Nack { diff --git a/wool/src/wool/runtime/protobuf/task.py b/wool/src/wool/runtime/protobuf/task.py index 7d43faf..699a97a 100644 --- a/wool/src/wool/runtime/protobuf/task.py +++ b/wool/src/wool/runtime/protobuf/task.py @@ -2,10 +2,11 @@ from wool.runtime.protobuf.task_pb2 import Exception from wool.runtime.protobuf.task_pb2 import Result from wool.runtime.protobuf.task_pb2 import Task + from wool.runtime.protobuf.task_pb2 import TaskVersionEnvelope from wool.runtime.protobuf.task_pb2 import Worker as Worker except ImportError as e: from wool.runtime.protobuf.exception import ProtobufImportError raise ProtobufImportError(e) from e -__all__ = ["Exception", "Result", "Task", "Worker"] +__all__ = ["Exception", "Result", "Task", "TaskVersionEnvelope", "Worker"] From 3d72c09a2762d81a859da2d071aaa81b34728009 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:32:02 -0500 Subject: [PATCH 2/7] feat: Add version to task serialization and Nack handling Stamp wool.__version__ on outgoing Task protobuf messages and on Ack responses. Raise RpcError in WorkerConnection when a Nack is received instead of an Ack. --- wool/src/wool/runtime/routine/task.py | 1 + wool/src/wool/runtime/worker/connection.py | 4 ++++ wool/src/wool/runtime/worker/service.py | 2 +- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/wool/src/wool/runtime/routine/task.py b/wool/src/wool/runtime/routine/task.py index a199423..59cae9e 100644 --- a/wool/src/wool/runtime/routine/task.py +++ b/wool/src/wool/runtime/routine/task.py @@ -222,6 +222,7 @@ def from_protobuf(cls, task: pb.task.Task) -> Task: def to_protobuf(self) -> pb.task.Task: return pb.task.Task( + version=wool.__version__, id=str(self.id), callable=cloudpickle.dumps(self.callable), args=cloudpickle.dumps(self.args), diff --git a/wool/src/wool/runtime/worker/connection.py b/wool/src/wool/runtime/worker/connection.py index dea15a3..e62dc18 100644 --- a/wool/src/wool/runtime/worker/connection.py +++ b/wool/src/wool/runtime/worker/connection.py @@ -361,6 +361,10 @@ async def _dispatch(self, ch, task, timeout): call: _DispatchCall = ch.stub.dispatch(task.to_protobuf()) try: response = await anext(aiter(call)) + if response.HasField("nack"): + raise RpcError( + details=f"Task rejected by worker: {response.nack.reason}" + ) if not response.HasField("ack"): raise UnexpectedResponse( f"Expected 'ack' response, " diff --git a/wool/src/wool/runtime/worker/service.py b/wool/src/wool/runtime/worker/service.py index c3421c5..ff3c94b 100644 --- a/wool/src/wool/runtime/worker/service.py +++ b/wool/src/wool/runtime/worker/service.py @@ -143,7 +143,7 @@ async def dispatch( ) with self._tracker(Task.from_protobuf(request)) as task: - yield pb.worker.Response(ack=pb.worker.Ack()) + yield pb.worker.Response(ack=pb.worker.Ack(version=wool.__version__)) try: if isasyncgen(task): async for result in task: From 55913c3eae2ec1c735b5e25da2071e9353b1e5c0 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:32:51 -0500 Subject: [PATCH 3/7] feat: Add discovery-time major version filter Add parse_major_version helper and _create_version_filter to WorkerProxy. Compose security and version filters into a single compatible predicate used across all discovery modes. --- wool/src/wool/runtime/worker/proxy.py | 47 ++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/wool/src/wool/runtime/worker/proxy.py b/wool/src/wool/runtime/worker/proxy.py index 04ad7a6..7be172c 100644 --- a/wool/src/wool/runtime/worker/proxy.py +++ b/wool/src/wool/runtime/worker/proxy.py @@ -33,6 +33,21 @@ T = TypeVar("T") +def parse_major_version(version: str) -> int | None: + """Extract the major version number from a version string. + + :param version: + A version string (e.g. ``"1.2.3"``). + :returns: + The major version as an integer, or ``None`` if + unparseable. + """ + try: + return int(version.split(".")[0]) + except (ValueError, IndexError): + return None + + class ReducibleAsyncIterator(Generic[T]): """An async iterator that can be pickled via __reduce__. @@ -208,12 +223,16 @@ def __init__( # Create security filter based on resolved credentials security_filter = self._create_security_filter(self._credentials) + version_filter = self._create_version_filter() + + def compatible(w): + return security_filter(w) and version_filter(w) match (pool_uri, discovery, workers): case (pool_uri, None, None) if pool_uri is not None: - # Combine tag filter with security filter + # Combine tag and compatibility filters def combined_filter(w): - return bool({pool_uri, *tags} & w.tags) and security_filter(w) + return bool({pool_uri, *tags} & w.tags) and compatible(w) self._discovery = LocalDiscovery(pool_uri).subscribe( filter=combined_filter @@ -221,8 +240,7 @@ def combined_filter(w): case (None, discovery, None) if discovery is not None: self._discovery = discovery case (None, None, workers) if workers is not None: - # Filter workers by security compatibility - compatible_workers = [w for w in workers if security_filter(w)] + compatible_workers = [w for w in workers if compatible(w)] self._discovery = ReducibleAsyncIterator( [ DiscoveryEvent("worker-added", metadata=w) @@ -416,6 +434,27 @@ def _create_security_filter( # Proxy has no credentials: only accept insecure workers return lambda metadata: not metadata.secure + @staticmethod + def _create_version_filter() -> Callable[[WorkerMetadata], bool]: + """Create discovery filter based on major version compatibility. + + Workers must share the same major version as the local proxy. + Workers with unparseable versions are rejected. + + :returns: + Predicate function for filtering workers by version + compatibility. + """ + local_major = parse_major_version(wool.__version__) + + def version_filter(metadata: WorkerMetadata) -> bool: + worker_major = parse_major_version(metadata.version) + if local_major is None or worker_major is None: + return False + return local_major == worker_major + + return version_filter + async def _await_workers(self): while not self._loadbalancer_context or not self._loadbalancer_context.workers: await asyncio.sleep(0) From df13abdb729d1d3249599a07e3fd652d276c5484 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:33:26 -0500 Subject: [PATCH 4/7] feat: Add gRPC version interceptor for dispatch Extract the client version from raw request bytes via TaskVersionEnvelope before full deserialization. Reject requests with empty, unparseable, or incompatible major versions with a Nack. --- wool/src/wool/runtime/worker/interceptor.py | 80 +++++++++++++++++++++ wool/src/wool/runtime/worker/process.py | 3 +- 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 wool/src/wool/runtime/worker/interceptor.py diff --git a/wool/src/wool/runtime/worker/interceptor.py b/wool/src/wool/runtime/worker/interceptor.py new file mode 100644 index 0000000..60088fe --- /dev/null +++ b/wool/src/wool/runtime/worker/interceptor.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import grpc +import grpc.aio + +import wool +from wool.runtime import protobuf as pb +from wool.runtime.worker.proxy import parse_major_version + + +class VersionInterceptor(grpc.aio.ServerInterceptor): + """gRPC server interceptor for wire protocol version checking. + + Intercepts the ``dispatch`` RPC to extract the client version from + field 1 of the raw request bytes using + :class:`~wool.runtime.protobuf.task.TaskVersionEnvelope`. If the + client major version differs from the local worker major version, + the RPC is short-circuited with a + :class:`~wool.runtime.protobuf.worker.Nack` response. + + Requests with empty, missing, or unparseable version fields are + rejected. + """ + + async def intercept_service(self, continuation, handler_call_details): + handler = await continuation(handler_call_details) + if handler is None or not handler_call_details.method.endswith("/dispatch"): + return handler + + original_handler = handler.unary_stream + original_deserializer = handler.request_deserializer + assert original_handler is not None + assert original_deserializer is not None + + async def version_checked_handler(request_bytes, context): + envelope = pb.task.TaskVersionEnvelope() + try: + envelope.ParseFromString(request_bytes) + except Exception: + yield pb.worker.Response( + nack=pb.worker.Nack(reason="Failed to parse version envelope") + ) + return + + client_major = parse_major_version(envelope.version) + local_major = parse_major_version(wool.__version__) + + if client_major is None or local_major is None: + yield pb.worker.Response( + nack=pb.worker.Nack( + reason=( + f"Unparseable version: " + f"client={envelope.version!r}, " + f"worker={wool.__version__!r}" + ) + ) + ) + return + + if client_major != local_major: + yield pb.worker.Response( + nack=pb.worker.Nack( + reason=( + f"Major version mismatch: " + f"client={envelope.version}, " + f"worker={wool.__version__}" + ) + ) + ) + return + + request = original_deserializer(request_bytes) + async for response in original_handler(request, context): # pyright: ignore[reportGeneralTypeIssues] + yield response + + return grpc.unary_stream_rpc_method_handler( + version_checked_handler, + request_deserializer=None, + response_serializer=handler.response_serializer, + ) diff --git a/wool/src/wool/runtime/worker/process.py b/wool/src/wool/runtime/worker/process.py index 64e54bd..d3a3689 100644 --- a/wool/src/wool/runtime/worker/process.py +++ b/wool/src/wool/runtime/worker/process.py @@ -18,6 +18,7 @@ from wool.runtime.resourcepool import ResourcePool from wool.runtime.worker.base import ServerCredentialsType from wool.runtime.worker.base import resolve_server_credentials +from wool.runtime.worker.interceptor import VersionInterceptor from wool.runtime.worker.service import WorkerService if TYPE_CHECKING: @@ -172,7 +173,7 @@ async def _serve(self): requests. It creates a gRPC server, adds the worker service, and starts listening for incoming connections. """ - server = grpc.aio.server() + server = grpc.aio.server(interceptors=[VersionInterceptor()]) credentials = resolve_server_credentials(self._credentials) address = self._address(self._host, self._port) From 6505a215582d61aa08af0202cfd46611c6d9b33b Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:33:54 -0500 Subject: [PATCH 5/7] test: Add wire protocol version compatibility tests Cover version round-tripping through protobuf, Nack handling in WorkerConnection, discovery-time major version filtering in WorkerProxy, and dispatch-time version interception in WorkerService (including empty, unparseable, and incompatible version scenarios). --- wool/tests/runtime/routine/test_task.py | 75 +++++++ wool/tests/runtime/worker/test_connection.py | 70 ++++++ wool/tests/runtime/worker/test_proxy.py | 218 ++++++++++++++++++- wool/tests/runtime/worker/test_service.py | 206 ++++++++++++++++++ 4 files changed, 565 insertions(+), 4 deletions(-) diff --git a/wool/tests/runtime/routine/test_task.py b/wool/tests/runtime/routine/test_task.py index 739e026..71f1c94 100644 --- a/wool/tests/runtime/routine/test_task.py +++ b/wool/tests/runtime/routine/test_task.py @@ -372,6 +372,81 @@ def test_to_protobuf_none_optionals( assert pb_task.line_no == 0 assert pb_task.tag == "" + def test_to_protobuf_with_version_field( + self, sample_async_callable, picklable_proxy, clear_event_handlers + ): + """Test to_protobuf includes wool.__version__ in version field. + + Given: + A Task instance + When: + to_protobuf() is called + Then: + The protobuf Task contains wool.__version__ in the + version field. + """ + # Arrange + task = Task( + id=uuid4(), + callable=sample_async_callable, + args=(), + kwargs={}, + proxy=picklable_proxy, + ) + + # Act + pb_task = task.to_protobuf() + + # Assert + assert pb_task.version == wool.__version__ + + @settings( + max_examples=50, + deadline=None, + ) + @given( + version=st.from_regex(r"\d{1,3}\.\d{1,3}(rc\d{1,3}|\.\d{1,3})", fullmatch=True), + ) + def test_from_protobuf_with_version_roundtrip(self, version): + """Test version field round-trips through protobuf serialization. + + Given: + Any PEP 440-like version string + When: + A protobuf Task with that version is serialized + Then: + The version field is preserved on the wire. + """ + # Arrange + proxy = PicklableProxy() + + async def test_callable(): + return "result" + + pb_task = pb.task.Task( + version=version, + id=str(uuid4()), + callable=cloudpickle.dumps(test_callable), + args=cloudpickle.dumps(()), + kwargs=cloudpickle.dumps({}), + caller="", + proxy=cloudpickle.dumps(proxy), + proxy_id=str(proxy.id), + timeout=0, + filename="", + function="", + line_no=0, + tag="", + ) + + # Act — serialize to bytes and parse back + wire_bytes = pb_task.SerializeToString() + parsed = pb.task.Task() + parsed.ParseFromString(wire_bytes) + + # Assert + assert parsed.version == version + @pytest.mark.asyncio async def test_dispatch_successful_execution( self, diff --git a/wool/tests/runtime/worker/test_connection.py b/wool/tests/runtime/worker/test_connection.py index bf7818e..da6bdff 100644 --- a/wool/tests/runtime/worker/test_connection.py +++ b/wool/tests/runtime/worker/test_connection.py @@ -709,6 +709,76 @@ async def test_dispatch_stream_early_close( # Assert - only got first 2 results assert results == ["result_1", "result_2"] + @pytest.mark.asyncio + async def test_dispatch_with_version_in_ack( + self, mocker: MockerFixture, sample_task, async_stream, mock_grpc_call + ): + """Test dispatch accepts Ack with version field. + + Given: + A mock worker returning Ack with version field + When: + dispatch() is called + Then: + Ack is accepted and stream is returned normally. + """ + # Arrange + responses = ( + pb.worker.Response(ack=pb.worker.Ack(version="1.0.0")), + pb.worker.Response( + result=pb.task.Result(dump=cloudpickle.dumps("test_result")) + ), + ) + mock_call = mock_grpc_call(async_stream(responses)) + + mock_stub = mocker.MagicMock() + mock_stub.dispatch = mocker.MagicMock(return_value=mock_call) + mocker.patch.object(pb.worker, "WorkerStub", return_value=mock_stub) + + connection = WorkerConnection("localhost:50051", limit=10) + + # Act + results = [] + async for result in await connection.dispatch(sample_task): + results.append(result) + + # Assert + assert results == ["test_result"] + + @pytest.mark.asyncio + async def test_dispatch_nack_raises_rpc_error( + self, mocker: MockerFixture, sample_task, async_stream, mock_grpc_call + ): + """Test dispatch raises RpcError on Nack response. + + Given: + A mock worker returning Nack with version mismatch reason + When: + dispatch() is called + Then: + It should raise RpcError with the rejection reason. + """ + # Arrange + responses = ( + pb.worker.Response( + nack=pb.worker.Nack( + reason="Major version mismatch: client=2.0.0, worker=1.0.0" + ) + ), + ) + mock_call = mock_grpc_call(async_stream(responses)) + + mock_stub = mocker.MagicMock() + mock_stub.dispatch = mocker.MagicMock(return_value=mock_call) + mocker.patch.object(pb.worker, "WorkerStub", return_value=mock_stub) + + connection = WorkerConnection("localhost:50051", limit=10) + + # Act & Assert + with pytest.raises(RpcError, match="Task rejected by worker"): + async for _ in await connection.dispatch(sample_task): + pass + class TestResourceLifetime: """Tests for pool-backed resource lifetime management.""" diff --git a/wool/tests/runtime/worker/test_proxy.py b/wool/tests/runtime/worker/test_proxy.py index 2be91bb..16483b3 100644 --- a/wool/tests/runtime/worker/test_proxy.py +++ b/wool/tests/runtime/worker/test_proxy.py @@ -1434,7 +1434,9 @@ async def test_start_invalid_discovery_type_raises_error( # ========================================================================= @pytest.mark.asyncio - async def test_security_filter_with_credentials(self, mock_proxy_session): + async def test_security_filter_with_credentials( + self, mock_proxy_session, mocker: MockerFixture + ): """Test only secure workers are discovered with credentials. Given: @@ -1446,6 +1448,7 @@ async def test_security_filter_with_credentials(self, mock_proxy_session): Only secure workers appear in the workers list. """ # Arrange + mocker.patch.object(wp.wool, "__version__", "1.0.0") secure_worker = WorkerMetadata( uid=uuid.uuid4(), address="192.168.1.100:50051", @@ -1480,17 +1483,19 @@ async def test_security_filter_with_credentials(self, mock_proxy_session): @pytest.mark.asyncio async def test_pool_uri_combined_filter(self, mocker: MockerFixture): - """Test pool URI filter combines tag matching and security filtering. + """Test pool URI filter combines tag matching, security, and version + filtering. Given: A WorkerProxy instantiated with a pool URI and extra tags. When: The discovery filter is evaluated against workers. Then: - Only workers matching the tags and security requirements - pass the filter. + Only workers matching the tags, security, and version + requirements pass the filter. """ # Arrange + mocker.patch.object(wp.wool, "__version__", "1.0.0") mock_subscriber = mocker.MagicMock() mock_local_discovery = mocker.MagicMock() mock_local_discovery.subscribe.return_value = mock_subscriber @@ -1535,6 +1540,211 @@ async def test_pool_uri_combined_filter(self, mocker: MockerFixture): ) assert filter_fn(secure_matching) is False + # ========================================================================= + # Version Filtering Tests + # ========================================================================= + + @settings( + max_examples=50, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + local_major=st.integers(min_value=0, max_value=100), + local_minor=st.integers(min_value=0, max_value=100), + local_patch=st.integers(min_value=0, max_value=100), + worker_minor=st.integers(min_value=0, max_value=100), + worker_patch=st.integers(min_value=0, max_value=100), + ) + @pytest.mark.asyncio + async def test__worker_sentinel_with_compatible_major_version( + self, + mock_proxy_session, + local_major, + local_minor, + local_patch, + worker_minor, + worker_patch, + mocker: MockerFixture, + ): + """Test workers with matching major version are accepted. + + Given: + A proxy with version X.a.b and a worker with version + X.c.d (same major version) + When: + The worker is discovered via static workers list + Then: + The worker is accepted into the load balancer context. + """ + # Arrange + local_version = f"{local_major}.{local_minor}.{local_patch}" + worker_version = f"{local_major}.{worker_minor}.{worker_patch}" + mocker.patch.object(wp.wool, "__version__", local_version) + + worker = WorkerMetadata( + uid=uuid.uuid4(), + address="127.0.0.1:50051", + pid=1234, + version=worker_version, + ) + + proxy = WorkerProxy(workers=[worker]) + + # Act + await proxy.start() + await asyncio.sleep(0.05) + + # Assert + assert worker in proxy.workers + + # Cleanup + await proxy.stop() + + @settings( + max_examples=50, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + local_major=st.integers(min_value=0, max_value=100), + worker_major=st.integers(min_value=0, max_value=100), + ) + @pytest.mark.asyncio + async def test__worker_sentinel_with_incompatible_major_version( + self, + mock_proxy_session, + local_major, + worker_major, + mocker: MockerFixture, + ): + """Test workers with different major version are rejected. + + Given: + A proxy with major version X and a worker with a + different major version Y + When: + The worker is discovered via static workers list + Then: + The worker is rejected from the load balancer context. + """ + from hypothesis import assume + + assume(local_major != worker_major) + + # Arrange + local_version = f"{local_major}.0.0" + worker_version = f"{worker_major}.0.0" + mocker.patch.object(wp.wool, "__version__", local_version) + + worker = WorkerMetadata( + uid=uuid.uuid4(), + address="127.0.0.1:50051", + pid=1234, + version=worker_version, + ) + + proxy = WorkerProxy(workers=[worker]) + + # Act + await proxy.start() + await asyncio.sleep(0.05) + + # Assert + assert worker not in proxy.workers + + # Cleanup + await proxy.stop() + + @settings( + max_examples=50, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + version_text=st.text(min_size=0, max_size=20).filter( + lambda s: not s or not s.split(".")[0].strip().isdigit() + ), + ) + @pytest.mark.asyncio + async def test__worker_sentinel_with_unparseable_version( + self, + mock_proxy_session, + version_text, + mocker: MockerFixture, + ): + """Test workers with unparseable version are rejected. + + Given: + A proxy with a valid version and a worker whose + version string has no parseable integer prefix + When: + The worker is discovered via static workers list + Then: + The worker is rejected from the load balancer context. + """ + # Arrange + mocker.patch.object(wp.wool, "__version__", "1.0.0") + + worker = WorkerMetadata( + uid=uuid.uuid4(), + address="127.0.0.1:50051", + pid=1234, + version=version_text, + ) + + proxy = WorkerProxy(workers=[worker]) + + # Act + await proxy.start() + await asyncio.sleep(0.05) + + # Assert + assert worker not in proxy.workers + + # Cleanup + await proxy.stop() + + @pytest.mark.asyncio + async def test__worker_sentinel_with_combined_security_and_version_filter( + self, + mock_proxy_session, + mocker: MockerFixture, + ): + """Test version filter applies alongside security filter. + + Given: + A proxy with credentials and version 1.x + When: + A secure worker with version 2.y is discovered + Then: + The worker is rejected (version filter applies alongside + security filter). + """ + # Arrange + mocker.patch.object(wp.wool, "__version__", "1.0.0") + mock_credentials = object() + + worker = WorkerMetadata( + uid=uuid.uuid4(), + address="127.0.0.1:50051", + pid=1234, + version="2.0.0", + secure=True, + ) + + proxy = WorkerProxy( + workers=[worker], + credentials=mock_credentials, + ) + + # Act + await proxy.start() + await asyncio.sleep(0.05) + + # Assert + assert worker not in proxy.workers + + # Cleanup + await proxy.stop() + # ============================================================================ # Property-Based Tests diff --git a/wool/tests/runtime/worker/test_service.py b/wool/tests/runtime/worker/test_service.py index 46f5334..ce6a59d 100644 --- a/wool/tests/runtime/worker/test_service.py +++ b/wool/tests/runtime/worker/test_service.py @@ -9,17 +9,29 @@ import grpc import pytest from grpc import StatusCode +from hypothesis import HealthCheck +from hypothesis import assume +from hypothesis import given +from hypothesis import settings +from hypothesis import strategies as st from pytest_mock import MockerFixture +import wool from wool.runtime import protobuf as pb from wool.runtime.protobuf.worker import WorkerStub from wool.runtime.protobuf.worker import add_WorkerServicer_to_server from wool.runtime.routine.task import Task from wool.runtime.routine.task import TaskEvent +from wool.runtime.worker.interceptor import VersionInterceptor from wool.runtime.worker.service import WorkerService from wool.runtime.worker.service import _ReadOnlyEvent +@pytest.fixture(scope="function") +def grpc_interceptors(): + return [VersionInterceptor()] + + @pytest.fixture(scope="function") def grpc_add_to_server(): return add_WorkerServicer_to_server @@ -1191,3 +1203,197 @@ async def sample_task(): # Cleanup worker loops for entry in service._loop_pool._cache.values(): WorkerService._destroy_worker_loop(entry.obj) + + @pytest.mark.asyncio + @pytest.mark.dependency("test_dispatch_task_that_returns") + async def test_dispatch_with_version_in_ack( + self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache + ): + """Test dispatch returns Ack with wool.__version__. + + Given: + A running WorkerService + When: + A task is dispatched with a compatible version + Then: + The Ack response contains wool.__version__. + """ + + # Arrange + async def sample_task(): + return "test_result" + + mock_proxy = mocker.MagicMock() + mock_proxy.id = "test-proxy-id" + + wool_task = Task( + id=uuid4(), + callable=sample_task, + args=(), + kwargs={}, + proxy=mock_proxy, + ) + + request = wool_task.to_protobuf() + + # Act + async with grpc_aio_stub() as stub: + stream = stub.dispatch(request) + responses = [r async for r in stream] + + # Assert + ack_response = responses[0] + assert ack_response.HasField("ack") + assert ack_response.ack.version == wool.__version__ + + @pytest.mark.asyncio + @pytest.mark.dependency("test_dispatch_task_that_returns") + async def test_dispatch_with_empty_client_version( + self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache + ): + """Test dispatch rejects tasks with empty version field. + + Given: + A running WorkerService with the version interceptor + When: + A task with empty version field is dispatched + Then: + The worker responds with a Nack citing unparseable + version. + """ + + # Arrange + async def sample_task(): + return "test_result" + + mock_proxy = mocker.MagicMock() + mock_proxy.id = "test-proxy-id" + + wool_task = Task( + id=uuid4(), + callable=sample_task, + args=(), + kwargs={}, + proxy=mock_proxy, + ) + + request = wool_task.to_protobuf() + request.ClearField("version") + + # Act + async with grpc_aio_stub() as stub: + stream = stub.dispatch(request) + responses = [r async for r in stream] + + # Assert + assert len(responses) == 1 + assert responses[0].HasField("nack") + assert "Unparseable version" in responses[0].nack.reason + + @settings( + max_examples=20, + deadline=None, + suppress_health_check=[HealthCheck.function_scoped_fixture], + ) + @given( + local_major=st.integers(min_value=0, max_value=100), + client_major=st.integers(min_value=0, max_value=100), + ) + @pytest.mark.asyncio + @pytest.mark.dependency("test_dispatch_task_that_returns") + async def test_dispatch_with_incompatible_major_version( + self, + grpc_aio_stub, + mocker: MockerFixture, + mock_worker_proxy_cache, + local_major, + client_major, + ): + """Test dispatch yields Nack for incompatible major version. + + Given: + Two semver-like version strings with different major + versions, or empty or unparseable version + When: + A task is dispatched with the client version through the + version interceptor + Then: + The dispatch yields a Nack with a reason citing version + mismatch. + """ + assume(local_major != client_major) + + # Arrange + mocker.patch.object(wool, "__version__", f"{local_major}.0.0") + + async def sample_task(): + return "should_not_execute" + + mock_proxy = mocker.MagicMock() + mock_proxy.id = "test-proxy-id" + + wool_task = Task( + id=uuid4(), + callable=sample_task, + args=(), + kwargs={}, + proxy=mock_proxy, + ) + + request = wool_task.to_protobuf() + # Override version field to simulate incompatible client + request.version = f"{client_major}.0.0" + + # Act + async with grpc_aio_stub() as stub: + stream = stub.dispatch(request) + responses = [r async for r in stream] + + # Assert + assert len(responses) == 1 + assert responses[0].HasField("nack") + assert "Major version mismatch" in responses[0].nack.reason + + @pytest.mark.asyncio + @pytest.mark.dependency("test_dispatch_task_that_returns") + async def test_dispatch_with_unparseable_client_version( + self, grpc_aio_stub, mocker: MockerFixture, mock_worker_proxy_cache + ): + """Test dispatch rejects tasks with unparseable version. + + Given: + A running WorkerService with the version interceptor + When: + A task with unparseable version string is dispatched + Then: + The worker responds with a Nack citing unparseable + version. + """ + + # Arrange + async def sample_task(): + return "should_not_execute" + + mock_proxy = mocker.MagicMock() + mock_proxy.id = "test-proxy-id" + + wool_task = Task( + id=uuid4(), + callable=sample_task, + args=(), + kwargs={}, + proxy=mock_proxy, + ) + + request = wool_task.to_protobuf() + request.version = "not-a-version" + + # Act + async with grpc_aio_stub() as stub: + stream = stub.dispatch(request) + responses = [r async for r in stream] + + # Assert + assert len(responses) == 1 + assert responses[0].HasField("nack") + assert "Unparseable version" in responses[0].nack.reason From aa56dcfafd9735d6c6d4fc3d4b2af31dbf4d7fff Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 10:34:11 -0500 Subject: [PATCH 6/7] docs: Add protobuf subpackage README Document the wire protocol, dispatch sequence, serialization strategy, version compatibility guarantees, and schema evolution rules for users of Wool. --- wool/src/wool/runtime/protobuf/README.md | 84 ++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 wool/src/wool/runtime/protobuf/README.md diff --git a/wool/src/wool/runtime/protobuf/README.md b/wool/src/wool/runtime/protobuf/README.md new file mode 100644 index 0000000..cbcaf87 --- /dev/null +++ b/wool/src/wool/runtime/protobuf/README.md @@ -0,0 +1,84 @@ +# Wire protocol + +Wool uses a binary wire protocol built on Protocol Buffers and gRPC +for all communication between clients and workers. + +## Dispatch sequence + +The `Worker.dispatch` RPC uses a server-streaming pattern. The client +sends a single `Task` message and receives a stream of `Response` +messages: + +``` +Client Worker + | | + |── Task ──────────────────────>| + | | + |<──────── Response(Ack) ───────| (or Nack on rejection) + |<──────── Response(Result) ────| (one or more results) + |<──────── Response(Exception) ─| (on failure) + | | +``` + +### Response types + +1. **Ack** — The worker accepted the task and started processing. + Carries the worker's `version` string for observability. +2. **Nack** — The worker rejected the task. The `reason` field + describes why (e.g., major version mismatch, unparseable version). + No further responses follow a Nack. +3. **Result** — A cloudpickle-serialized return value. Coroutine + tasks yield exactly one result; async generator tasks yield one + per iteration. +4. **Exception** — A cloudpickle-serialized exception from the + remote execution. Terminates the stream. + +## Serialization + +Wool uses a hybrid serialization approach: + +- **Protobuf envelope** — Structured metadata fields (`id`, + `version`, `caller`, `timeout`, etc.) are native protobuf fields + for efficient parsing and forward compatibility. +- **cloudpickle payloads** — The `callable`, `args`, `kwargs`, and + `proxy` fields are serialized with cloudpickle and stored as + `bytes` fields. This allows arbitrary Python objects to be + transmitted without schema changes. +- **Results and exceptions** — `Result.dump` and `Exception.dump` + are cloudpickle-serialized bytes. + +## Version compatibility + +Wool enforces major-version compatibility at two layers. + +### Discovery-time filtering + +`WorkerProxy` applies a version filter during worker discovery. +Workers whose major version differs from the client's are excluded +from the load balancer and never receive tasks. + +### Dispatch-time interception + +`VersionInterceptor` is a gRPC server interceptor that extracts the +version field from raw request bytes *before* full deserialization. +This uses `TaskVersionEnvelope` — a minimal protobuf message +containing only `string version = 1` — which can parse field 1 from +any `Task` wire format, including future incompatible versions. + +Requests with empty, missing, or unparseable version fields are +rejected with a `Nack` response. If the client's major version +differs from the worker's, the interceptor yields a `Nack` without +attempting full deserialization. This prevents deserialization errors +when the wire format has changed across major versions. + +## Schema evolution rules + +- **Additive-only within a major version.** New fields may be + appended with new field numbers. Existing field numbers and types + must not change within the same major version. +- **Major version = wire compatibility boundary.** A major version + bump permits breaking changes to the protobuf schema (field + renumbering, type changes, removal). +- **Field 1 is always `version`.** The `Task` message reserves + field 1 for the version string. This invariant enables + pre-deserialization version extraction via `TaskVersionEnvelope`. From bc1c4b7532f03a0f98fc8a794eba03b3a2eb8ad8 Mon Sep 17 00:00:00 2001 From: Conrad Date: Sun, 22 Feb 2026 11:11:32 -0500 Subject: [PATCH 7/7] ci: Fetch tags in checkout step for version resolution The default shallow clone fetches no tags, causing the build-hooks git version parser to fall back to "0+". This version string is unparseable by parse_major_version, failing all test_service.py tests in CI. Adding "fetch-tags: true" resolves a real tag without requiring a full history clone. --- .github/workflows/run-tests.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index e214af2..d6c7aa5 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -44,7 +44,9 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - + with: + fetch-depth: 0 + fetch-tags: true - name: Install uv and prepare python uses: astral-sh/setup-uv@v5 with: