-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Serve] Add gRPC inter-deployment communication #59908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Serve] Add gRPC inter-deployment communication #59908
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature to Ray Serve: inter-deployment communication via gRPC. This provides an alternative to the default Ray actor calls, which can be beneficial for performance and for certain network environments. The feature is enabled by a new _by_reference=False option on deployment handles.
The changes are comprehensive, touching multiple layers of the Serve stack:
- A new
InterDeploymentServiceis defined in the protobuf, with RPCs for unary and streaming requests, including support for backpressure. - A flexible serialization layer (
RPCSerializer) is added, supportingcloudpickle,pickle,msgpack,orjson, and anoopfor raw bytes. This allows users to choose the best serialization method for their use case. - Replicas now run a gRPC server to handle these incoming requests. The logic is nicely encapsulated in a decorator (
_wrap_inter_deployment_grpc_call). - On the client side, a new
gRPCReplicaWrapperandgRPCReplicaResultare introduced to handle sending requests and receiving responses over gRPC. - The
DeploymentHandle.options()method is extended with_by_reference,_request_serialization, and_response_serializationto control this new communication channel. - New tests are added to validate the gRPC transport, including different serialization methods and streaming.
Overall, this is a well-designed and well-implemented feature. The code is clear and the changes are well-contained. I have one suggestion for refactoring to reduce code duplication.
| rpc HandleRequestWithRejection(InterDeploymentRequest) returns (InterDeploymentResponse); | ||
| // Streaming request with rejection support for backpressure. | ||
| rpc HandleRequestWithRejectionStreaming(InterDeploymentRequest) returns (stream InterDeploymentResponse); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proto file modified requires RPC standards review (Bugbot Rules)
Medium Severity
.proto files.
Please review the RPC fault-tolerance & idempotency standards guide here:
https://github.com/ray-project/ray/tree/master/doc/source/ray-core/internals/rpc-fault-tolerance.rst
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
Add support for deployments to communicate via gRPC instead of Ray actor
calls. This is enabled by setting `_by_reference=False` on a deployment
handle:
```python
handle = serve.get_deployment_handle("Downstream", "app")
grpc_handle = handle.options(_by_reference=False)
result = await grpc_handle.remote(data)
```
Changes:
- Add InterDeploymentService protobuf with HandleRequest, HandleRequestStreaming
HandleRequestWithRejection, HandleRequestWithRejectionStreaming RPCs
- Add _by_reference, _request_serialization, _response_serialization handle options
- Add RPCSerializer supporting cloudpickle, pickle, msgpack, orjson, noop
- Add gRPCReplicaWrapper for client-side gRPC transport
- Add gRPCReplicaResult for handling gRPC responses
- Add gRPC server to Replica implementing InterDeploymentServiceServicer
- Add grpc_port field to RunningReplicaInfo
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
27d99b1 to
fda4b8e
Compare
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| logger = logging.getLogger(SERVE_LOGGER_NAME) | ||
|
|
||
|
|
||
| def _wrap_grpc_call(f): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
| with self._handle_errors_and_metrics(request_metadata) as status_code_callback: | ||
| yield status_code_callback | ||
|
|
||
| @_wrap_grpc_call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following code block identical to parity implementation
| return self._obj_ref_gen | ||
|
|
||
|
|
||
| class gRPCReplicaResult(ReplicaResult): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
| GRPC_CONTEXT_ARG_NAME, | ||
| HEALTH_CHECK_METHOD, | ||
| RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, | ||
| RAY_SERVE_GRPC_MAX_MESSAGE_SIZE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Divergence: Currently set to 2GB. Should we bump this to 4GB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's bump it up
| ) | ||
|
|
||
|
|
||
| class gRPCReplicaWrapper(ReplicaWrapper): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
| _by_reference=handle_options._by_reference, | ||
| _on_separate_loop=init_options._run_router_in_separate_loop, | ||
| request_serialization=handle_options.request_serialization, | ||
| response_serialization=handle_options.response_serialization, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additions identical to parity implementation
| "RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP", "1" | ||
| ) | ||
|
|
||
| # For now, this is used only for testing. In the suite of tests that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic identical to parity implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical
280d413 to
0ec7116
Compare
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| MESSAGE_PACK_OFFSET = 9 | ||
|
|
||
|
|
||
| def asyncio_grpc_exception_handler(loop, context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Identical to parity implementation
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
| RAY_SERVE_GRPC_MAX_MESSAGE_SIZE, | ||
| ) | ||
| ] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing max_send_message_length in gRPC server configuration
High Severity
The gRPC server and client channel configurations only set grpc.max_receive_message_length but are missing grpc.max_send_message_length. The existing DEFAULT_GRPC_SERVER_OPTIONS in constants.py sets both options. Without max_send_message_length, the server cannot send responses larger than the default 4MB limit, and the client cannot send requests larger than 4MB. This will cause failures for inter-deployment communication with large payloads, which contradicts the PR's benchmarks testing up to 10MB messages.
Why are these changes needed?
This PR adds gRPC-based inter-deployment communication for Ray Serve, allowing deployments to communicate with each other using gRPC transport instead of Ray actor calls. This can provide performance benefits in certain scenarios.
Key Changes
gRPC Server on Replicas: Each replica now starts a gRPC server that can handle requests from other deployments.
gRPC Replica Wrapper: A new
gRPCReplicaWrapperclass handles sending requests via gRPC and processing responses.Handle Options: The
_by_referenceoption on handles controls whether to use Ray actor calls (True) or gRPC transport (False).New Environment Variables:
RAY_SERVE_USE_GRPC_BY_DEFAULT: Master flag to enable gRPC transport by default for all inter-deployment communicationRAY_SERVE_PROXY_USE_GRPC: Controls whether the proxy uses gRPC transport (defaults to the master flag value)RAY_SERVE_GRPC_MAX_MESSAGE_SIZE: Configures the maximum gRPC message size (default: 2GB-1)Related issue number
N/A
Checks
scripts/format.shto lint the changes in this PR.temporary testing hook, I've added it under the API Reference
(Experimental) page.
Test Plan
python/ray/serve/tests/test_grpc_e2e.pypython/ray/serve/tests/test_grpc_replica_wrapper.pypython/ray/serve/tests/unit/test_grpc_replica_result.pyBenchmarks
Script available here
Results show throughput/latency improvements w/ gRPC for message size < ~1MB.