From f45935a4d760a36bf989ed79bfd02aa7ec203468 Mon Sep 17 00:00:00 2001 From: ohmayr Date: Thu, 12 Dec 2024 03:48:39 +0500 Subject: [PATCH] feat: add client debug logging support for async gRPC (#2291) --- .../%service/transports/_mixins.py.j2 | 20 +-- .../services/%service/transports/grpc.py.j2 | 10 +- .../%service/transports/grpc_asyncio.py.j2 | 100 +++++++++++- .../services/asset_service/transports/grpc.py | 12 +- .../asset_service/transports/grpc_asyncio.py | 130 +++++++++++---- .../iam_credentials/transports/grpc.py | 10 +- .../transports/grpc_asyncio.py | 88 +++++++++- .../services/eventarc/transports/grpc.py | 28 ++-- .../eventarc/transports/grpc_asyncio.py | 136 ++++++++++++---- .../config_service_v2/transports/grpc.py | 16 +- .../transports/grpc_asyncio.py | 152 +++++++++++++----- .../logging_service_v2/transports/grpc.py | 16 +- .../transports/grpc_asyncio.py | 98 +++++++++-- .../metrics_service_v2/transports/grpc.py | 16 +- .../transports/grpc_asyncio.py | 96 +++++++++-- .../services/cloud_redis/transports/grpc.py | 24 +-- .../cloud_redis/transports/grpc_asyncio.py | 118 +++++++++++--- 17 files changed, 867 insertions(+), 203 deletions(-) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/_mixins.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/_mixins.py.j2 index 84b085ee1a..3458cc78e8 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/_mixins.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/_mixins.py.j2 @@ -32,7 +32,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "delete_operation" not in self._stubs: - self._stubs["delete_operation"] = self.grpc_channel.unary_unary( + self._stubs["delete_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/DeleteOperation", request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, response_deserializer=None, @@ -52,7 +52,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -72,7 +72,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "wait_operation" not in self._stubs: - self._stubs["wait_operation"] = self.grpc_channel.unary_unary( + self._stubs["wait_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/WaitOperation", request_serializer=operations_pb2.WaitOperationRequest.SerializeToString, response_deserializer=None, @@ -92,7 +92,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -112,7 +112,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, @@ -136,7 +136,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_locations" not in self._stubs: - self._stubs["list_locations"] = self.grpc_channel.unary_unary( + self._stubs["list_locations"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/ListLocations", request_serializer=locations_pb2.ListLocationsRequest.SerializeToString, response_deserializer=locations_pb2.ListLocationsResponse.FromString, @@ -156,7 +156,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_location" not in self._stubs: - self._stubs["get_location"] = self.grpc_channel.unary_unary( + self._stubs["get_location"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/GetLocation", request_serializer=locations_pb2.GetLocationRequest.SerializeToString, response_deserializer=locations_pb2.Location.FromString, @@ -188,7 +188,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "set_iam_policy" not in self._stubs: - self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/SetIamPolicy", request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -216,7 +216,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_iam_policy" not in self._stubs: - self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/GetIamPolicy", request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -246,7 +246,7 @@ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "test_iam_permissions" not in self._stubs: - self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary( + self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/TestIamPermissions", request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 index ede5a4831b..cb3ee8cfa6 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 @@ -2,6 +2,7 @@ {% block content %} +import json import logging as std_logging import pickle import warnings @@ -69,7 +70,12 @@ class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -90,7 +96,7 @@ class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 index 0bb3126e5f..650f4a2c65 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 @@ -4,6 +4,9 @@ {% import "%namespace/%name_%version/%sub/services/%service/_shared_macros.j2" as shared_macros %} import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -16,8 +19,11 @@ from google.api_core import operations_v1 {% endif %} from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore {% filter sort_lines %} @@ -47,6 +53,81 @@ from google.longrunning import operations_pb2 # type: ignore from .base import {{ service.name }}Transport, DEFAULT_CLIENT_INFO from .grpc import {{ service.name }}GrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic + or wait for next gen protobuf. + #} + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "{{ service.meta.address.proto }}", + "rpcName": str(client_call_details.method), + "request": grpc_request, + {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2275): logging `metadata` seems repetitive and may need to be cleaned up. We're including it within "request" for consistency with REST transport.' #} + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic + or wait for next gen protobuf. + #} + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "{{ service.meta.address.proto }}", + "rpcName": str(client_call_details.method), + "response": grpc_response, + {# TODO(https://github.com/googleapis/gapic-generator-python/issues/2275): logging `metadata` seems repetitive and may need to be cleaned up. We're including it within "request" for consistency with REST transport.' #} + "metadata": grpc_response["metadata"], + }, + ) + return response + class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): """gRPC AsyncIO backend transport for {{ service.name }}. @@ -242,8 +323,11 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -267,7 +351,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): # Quick check: Only create a new client if we do not already have one. if self._operations_client is None: self._operations_client = operations_v1.OperationsAsyncClient( - self.grpc_channel + self._logged_channel ) # Return the client from cache. @@ -297,7 +381,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if '{{ method.transport_safe_name|snake_case }}' not in self._stubs: - self._stubs['{{ method.transport_safe_name|snake_case }}'] = self.grpc_channel.{{ method.grpc_stub_type }}( + self._stubs['{{ method.transport_safe_name|snake_case }}'] = self._logged_channel.{{ method.grpc_stub_type }}( '/{{ '.'.join(method.meta.address.package) }}.{{ service.name }}/{{ method.name }}', request_serializer={{ method.input.ident }}.{% if method.input.ident.python_import.module.endswith('_pb2') %}SerializeToString{% else %}serialize{% endif %}, response_deserializer={{ method.output.ident }}.{% if method.output.ident.python_import.module.endswith('_pb2') %}FromString{% else %}deserialize{% endif %}, @@ -325,7 +409,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "set_iam_policy" not in self._stubs: - self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/SetIamPolicy", request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -351,7 +435,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_iam_policy" not in self._stubs: - self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/GetIamPolicy", request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -380,7 +464,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "test_iam_permissions" not in self._stubs: - self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary( + self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/TestIamPermissions", request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, @@ -393,7 +477,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): {{ shared_macros.wrap_async_method_macro()|indent(4) }} def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -405,4 +489,4 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): __all__ = ( '{{ service.name }}GrpcAsyncIOTransport', ) -{% endblock %} +{% endblock %} \ No newline at end of file diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py index dca9171f12..519ecdec2b 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -54,7 +55,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -74,7 +80,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -1019,7 +1025,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py index 36cdea1121..54666f93ec 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -24,8 +27,11 @@ from google.api_core import operations_v1 from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.asset_v1.types import asset_service @@ -34,6 +40,73 @@ from .base import AssetServiceTransport, DEFAULT_CLIENT_INFO from .grpc import AssetServiceGrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class AssetServiceGrpcAsyncIOTransport(AssetServiceTransport): """gRPC AsyncIO backend transport for AssetService. @@ -227,8 +300,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -251,7 +327,7 @@ def operations_client(self) -> operations_v1.OperationsAsyncClient: # Quick check: Only create a new client if we do not already have one. if self._operations_client is None: self._operations_client = operations_v1.OperationsAsyncClient( - self.grpc_channel + self._logged_channel ) # Return the client from cache. @@ -288,7 +364,7 @@ def export_assets(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'export_assets' not in self._stubs: - self._stubs['export_assets'] = self.grpc_channel.unary_unary( + self._stubs['export_assets'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/ExportAssets', request_serializer=asset_service.ExportAssetsRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -315,7 +391,7 @@ def list_assets(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_assets' not in self._stubs: - self._stubs['list_assets'] = self.grpc_channel.unary_unary( + self._stubs['list_assets'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/ListAssets', request_serializer=asset_service.ListAssetsRequest.serialize, response_deserializer=asset_service.ListAssetsResponse.deserialize, @@ -347,7 +423,7 @@ def batch_get_assets_history(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'batch_get_assets_history' not in self._stubs: - self._stubs['batch_get_assets_history'] = self.grpc_channel.unary_unary( + self._stubs['batch_get_assets_history'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/BatchGetAssetsHistory', request_serializer=asset_service.BatchGetAssetsHistoryRequest.serialize, response_deserializer=asset_service.BatchGetAssetsHistoryResponse.deserialize, @@ -375,7 +451,7 @@ def create_feed(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_feed' not in self._stubs: - self._stubs['create_feed'] = self.grpc_channel.unary_unary( + self._stubs['create_feed'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/CreateFeed', request_serializer=asset_service.CreateFeedRequest.serialize, response_deserializer=asset_service.Feed.deserialize, @@ -401,7 +477,7 @@ def get_feed(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_feed' not in self._stubs: - self._stubs['get_feed'] = self.grpc_channel.unary_unary( + self._stubs['get_feed'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/GetFeed', request_serializer=asset_service.GetFeedRequest.serialize, response_deserializer=asset_service.Feed.deserialize, @@ -428,7 +504,7 @@ def list_feeds(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_feeds' not in self._stubs: - self._stubs['list_feeds'] = self.grpc_channel.unary_unary( + self._stubs['list_feeds'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/ListFeeds', request_serializer=asset_service.ListFeedsRequest.serialize, response_deserializer=asset_service.ListFeedsResponse.deserialize, @@ -454,7 +530,7 @@ def update_feed(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_feed' not in self._stubs: - self._stubs['update_feed'] = self.grpc_channel.unary_unary( + self._stubs['update_feed'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/UpdateFeed', request_serializer=asset_service.UpdateFeedRequest.serialize, response_deserializer=asset_service.Feed.deserialize, @@ -480,7 +556,7 @@ def delete_feed(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_feed' not in self._stubs: - self._stubs['delete_feed'] = self.grpc_channel.unary_unary( + self._stubs['delete_feed'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/DeleteFeed', request_serializer=asset_service.DeleteFeedRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -509,7 +585,7 @@ def search_all_resources(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'search_all_resources' not in self._stubs: - self._stubs['search_all_resources'] = self.grpc_channel.unary_unary( + self._stubs['search_all_resources'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/SearchAllResources', request_serializer=asset_service.SearchAllResourcesRequest.serialize, response_deserializer=asset_service.SearchAllResourcesResponse.deserialize, @@ -538,7 +614,7 @@ def search_all_iam_policies(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'search_all_iam_policies' not in self._stubs: - self._stubs['search_all_iam_policies'] = self.grpc_channel.unary_unary( + self._stubs['search_all_iam_policies'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/SearchAllIamPolicies', request_serializer=asset_service.SearchAllIamPoliciesRequest.serialize, response_deserializer=asset_service.SearchAllIamPoliciesResponse.deserialize, @@ -565,7 +641,7 @@ def analyze_iam_policy(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_iam_policy' not in self._stubs: - self._stubs['analyze_iam_policy'] = self.grpc_channel.unary_unary( + self._stubs['analyze_iam_policy'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeIamPolicy', request_serializer=asset_service.AnalyzeIamPolicyRequest.serialize, response_deserializer=asset_service.AnalyzeIamPolicyResponse.deserialize, @@ -602,7 +678,7 @@ def analyze_iam_policy_longrunning(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_iam_policy_longrunning' not in self._stubs: - self._stubs['analyze_iam_policy_longrunning'] = self.grpc_channel.unary_unary( + self._stubs['analyze_iam_policy_longrunning'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeIamPolicyLongrunning', request_serializer=asset_service.AnalyzeIamPolicyLongrunningRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -634,7 +710,7 @@ def analyze_move(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_move' not in self._stubs: - self._stubs['analyze_move'] = self.grpc_channel.unary_unary( + self._stubs['analyze_move'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeMove', request_serializer=asset_service.AnalyzeMoveRequest.serialize, response_deserializer=asset_service.AnalyzeMoveResponse.deserialize, @@ -675,7 +751,7 @@ def query_assets(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'query_assets' not in self._stubs: - self._stubs['query_assets'] = self.grpc_channel.unary_unary( + self._stubs['query_assets'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/QueryAssets', request_serializer=asset_service.QueryAssetsRequest.serialize, response_deserializer=asset_service.QueryAssetsResponse.deserialize, @@ -702,7 +778,7 @@ def create_saved_query(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_saved_query' not in self._stubs: - self._stubs['create_saved_query'] = self.grpc_channel.unary_unary( + self._stubs['create_saved_query'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/CreateSavedQuery', request_serializer=asset_service.CreateSavedQueryRequest.serialize, response_deserializer=asset_service.SavedQuery.deserialize, @@ -728,7 +804,7 @@ def get_saved_query(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_saved_query' not in self._stubs: - self._stubs['get_saved_query'] = self.grpc_channel.unary_unary( + self._stubs['get_saved_query'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/GetSavedQuery', request_serializer=asset_service.GetSavedQueryRequest.serialize, response_deserializer=asset_service.SavedQuery.deserialize, @@ -755,7 +831,7 @@ def list_saved_queries(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_saved_queries' not in self._stubs: - self._stubs['list_saved_queries'] = self.grpc_channel.unary_unary( + self._stubs['list_saved_queries'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/ListSavedQueries', request_serializer=asset_service.ListSavedQueriesRequest.serialize, response_deserializer=asset_service.ListSavedQueriesResponse.deserialize, @@ -781,7 +857,7 @@ def update_saved_query(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_saved_query' not in self._stubs: - self._stubs['update_saved_query'] = self.grpc_channel.unary_unary( + self._stubs['update_saved_query'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/UpdateSavedQuery', request_serializer=asset_service.UpdateSavedQueryRequest.serialize, response_deserializer=asset_service.SavedQuery.deserialize, @@ -807,7 +883,7 @@ def delete_saved_query(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_saved_query' not in self._stubs: - self._stubs['delete_saved_query'] = self.grpc_channel.unary_unary( + self._stubs['delete_saved_query'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/DeleteSavedQuery', request_serializer=asset_service.DeleteSavedQueryRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -834,7 +910,7 @@ def batch_get_effective_iam_policies(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'batch_get_effective_iam_policies' not in self._stubs: - self._stubs['batch_get_effective_iam_policies'] = self.grpc_channel.unary_unary( + self._stubs['batch_get_effective_iam_policies'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/BatchGetEffectiveIamPolicies', request_serializer=asset_service.BatchGetEffectiveIamPoliciesRequest.serialize, response_deserializer=asset_service.BatchGetEffectiveIamPoliciesResponse.deserialize, @@ -860,7 +936,7 @@ def analyze_org_policies(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_org_policies' not in self._stubs: - self._stubs['analyze_org_policies'] = self.grpc_channel.unary_unary( + self._stubs['analyze_org_policies'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeOrgPolicies', request_serializer=asset_service.AnalyzeOrgPoliciesRequest.serialize, response_deserializer=asset_service.AnalyzeOrgPoliciesResponse.deserialize, @@ -888,7 +964,7 @@ def analyze_org_policy_governed_containers(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_org_policy_governed_containers' not in self._stubs: - self._stubs['analyze_org_policy_governed_containers'] = self.grpc_channel.unary_unary( + self._stubs['analyze_org_policy_governed_containers'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeOrgPolicyGovernedContainers', request_serializer=asset_service.AnalyzeOrgPolicyGovernedContainersRequest.serialize, response_deserializer=asset_service.AnalyzeOrgPolicyGovernedContainersResponse.deserialize, @@ -933,7 +1009,7 @@ def analyze_org_policy_governed_assets(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'analyze_org_policy_governed_assets' not in self._stubs: - self._stubs['analyze_org_policy_governed_assets'] = self.grpc_channel.unary_unary( + self._stubs['analyze_org_policy_governed_assets'] = self._logged_channel.unary_unary( '/google.cloud.asset.v1.AssetService/AnalyzeOrgPolicyGovernedAssets', request_serializer=asset_service.AnalyzeOrgPolicyGovernedAssetsRequest.serialize, response_deserializer=asset_service.AnalyzeOrgPolicyGovernedAssetsResponse.deserialize, @@ -1140,7 +1216,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -1157,7 +1233,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py index 62413fb4bf..0e55e29785 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -51,7 +52,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -71,7 +77,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py index 8589ca45f0..8ac65146dc 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -23,14 +26,84 @@ from google.api_core import retry_async as retries from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.iam.credentials_v1.types import common from .base import IAMCredentialsTransport, DEFAULT_CLIENT_INFO from .grpc import IAMCredentialsGrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class IAMCredentialsGrpcAsyncIOTransport(IAMCredentialsTransport): """gRPC AsyncIO backend transport for IAMCredentials. @@ -232,8 +305,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -266,7 +342,7 @@ def generate_access_token(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'generate_access_token' not in self._stubs: - self._stubs['generate_access_token'] = self.grpc_channel.unary_unary( + self._stubs['generate_access_token'] = self._logged_channel.unary_unary( '/google.iam.credentials.v1.IAMCredentials/GenerateAccessToken', request_serializer=common.GenerateAccessTokenRequest.serialize, response_deserializer=common.GenerateAccessTokenResponse.deserialize, @@ -293,7 +369,7 @@ def generate_id_token(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'generate_id_token' not in self._stubs: - self._stubs['generate_id_token'] = self.grpc_channel.unary_unary( + self._stubs['generate_id_token'] = self._logged_channel.unary_unary( '/google.iam.credentials.v1.IAMCredentials/GenerateIdToken', request_serializer=common.GenerateIdTokenRequest.serialize, response_deserializer=common.GenerateIdTokenResponse.deserialize, @@ -320,7 +396,7 @@ def sign_blob(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'sign_blob' not in self._stubs: - self._stubs['sign_blob'] = self.grpc_channel.unary_unary( + self._stubs['sign_blob'] = self._logged_channel.unary_unary( '/google.iam.credentials.v1.IAMCredentials/SignBlob', request_serializer=common.SignBlobRequest.serialize, response_deserializer=common.SignBlobResponse.deserialize, @@ -347,7 +423,7 @@ def sign_jwt(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'sign_jwt' not in self._stubs: - self._stubs['sign_jwt'] = self.grpc_channel.unary_unary( + self._stubs['sign_jwt'] = self._logged_channel.unary_unary( '/google.iam.credentials.v1.IAMCredentials/SignJwt', request_serializer=common.SignJwtRequest.serialize, response_deserializer=common.SignJwtResponse.deserialize, @@ -425,7 +501,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py index c7d20f208f..8c602c7fd8 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -62,7 +63,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -82,7 +88,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -817,7 +823,7 @@ def delete_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "delete_operation" not in self._stubs: - self._stubs["delete_operation"] = self.grpc_channel.unary_unary( + self._stubs["delete_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/DeleteOperation", request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, response_deserializer=None, @@ -835,7 +841,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -853,7 +859,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -871,7 +877,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, @@ -889,7 +895,7 @@ def list_locations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_locations" not in self._stubs: - self._stubs["list_locations"] = self.grpc_channel.unary_unary( + self._stubs["list_locations"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/ListLocations", request_serializer=locations_pb2.ListLocationsRequest.SerializeToString, response_deserializer=locations_pb2.ListLocationsResponse.FromString, @@ -907,7 +913,7 @@ def get_location( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_location" not in self._stubs: - self._stubs["get_location"] = self.grpc_channel.unary_unary( + self._stubs["get_location"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/GetLocation", request_serializer=locations_pb2.GetLocationRequest.SerializeToString, response_deserializer=locations_pb2.Location.FromString, @@ -932,7 +938,7 @@ def set_iam_policy( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "set_iam_policy" not in self._stubs: - self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/SetIamPolicy", request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -958,7 +964,7 @@ def get_iam_policy( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_iam_policy" not in self._stubs: - self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/GetIamPolicy", request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -986,7 +992,7 @@ def test_iam_permissions( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "test_iam_permissions" not in self._stubs: - self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary( + self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/TestIamPermissions", request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py index 5914da06cb..38009d7f90 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -24,8 +27,11 @@ from google.api_core import operations_v1 from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.eventarc_v1.types import channel @@ -42,6 +48,73 @@ from .base import EventarcTransport, DEFAULT_CLIENT_INFO from .grpc import EventarcGrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class EventarcGrpcAsyncIOTransport(EventarcTransport): """gRPC AsyncIO backend transport for Eventarc. @@ -237,8 +310,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -261,7 +337,7 @@ def operations_client(self) -> operations_v1.OperationsAsyncClient: # Quick check: Only create a new client if we do not already have one. if self._operations_client is None: self._operations_client = operations_v1.OperationsAsyncClient( - self.grpc_channel + self._logged_channel ) # Return the client from cache. @@ -286,7 +362,7 @@ def get_trigger(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_trigger' not in self._stubs: - self._stubs['get_trigger'] = self.grpc_channel.unary_unary( + self._stubs['get_trigger'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/GetTrigger', request_serializer=eventarc.GetTriggerRequest.serialize, response_deserializer=trigger.Trigger.deserialize, @@ -312,7 +388,7 @@ def list_triggers(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_triggers' not in self._stubs: - self._stubs['list_triggers'] = self.grpc_channel.unary_unary( + self._stubs['list_triggers'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/ListTriggers', request_serializer=eventarc.ListTriggersRequest.serialize, response_deserializer=eventarc.ListTriggersResponse.deserialize, @@ -339,7 +415,7 @@ def create_trigger(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_trigger' not in self._stubs: - self._stubs['create_trigger'] = self.grpc_channel.unary_unary( + self._stubs['create_trigger'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/CreateTrigger', request_serializer=eventarc.CreateTriggerRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -365,7 +441,7 @@ def update_trigger(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_trigger' not in self._stubs: - self._stubs['update_trigger'] = self.grpc_channel.unary_unary( + self._stubs['update_trigger'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/UpdateTrigger', request_serializer=eventarc.UpdateTriggerRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -391,7 +467,7 @@ def delete_trigger(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_trigger' not in self._stubs: - self._stubs['delete_trigger'] = self.grpc_channel.unary_unary( + self._stubs['delete_trigger'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/DeleteTrigger', request_serializer=eventarc.DeleteTriggerRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -417,7 +493,7 @@ def get_channel(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_channel' not in self._stubs: - self._stubs['get_channel'] = self.grpc_channel.unary_unary( + self._stubs['get_channel'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/GetChannel', request_serializer=eventarc.GetChannelRequest.serialize, response_deserializer=channel.Channel.deserialize, @@ -443,7 +519,7 @@ def list_channels(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_channels' not in self._stubs: - self._stubs['list_channels'] = self.grpc_channel.unary_unary( + self._stubs['list_channels'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/ListChannels', request_serializer=eventarc.ListChannelsRequest.serialize, response_deserializer=eventarc.ListChannelsResponse.deserialize, @@ -470,7 +546,7 @@ def create_channel_(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_channel_' not in self._stubs: - self._stubs['create_channel_'] = self.grpc_channel.unary_unary( + self._stubs['create_channel_'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/CreateChannel', request_serializer=eventarc.CreateChannelRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -496,7 +572,7 @@ def update_channel(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_channel' not in self._stubs: - self._stubs['update_channel'] = self.grpc_channel.unary_unary( + self._stubs['update_channel'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/UpdateChannel', request_serializer=eventarc.UpdateChannelRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -522,7 +598,7 @@ def delete_channel(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_channel' not in self._stubs: - self._stubs['delete_channel'] = self.grpc_channel.unary_unary( + self._stubs['delete_channel'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/DeleteChannel', request_serializer=eventarc.DeleteChannelRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -548,7 +624,7 @@ def get_provider(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_provider' not in self._stubs: - self._stubs['get_provider'] = self.grpc_channel.unary_unary( + self._stubs['get_provider'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/GetProvider', request_serializer=eventarc.GetProviderRequest.serialize, response_deserializer=discovery.Provider.deserialize, @@ -574,7 +650,7 @@ def list_providers(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_providers' not in self._stubs: - self._stubs['list_providers'] = self.grpc_channel.unary_unary( + self._stubs['list_providers'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/ListProviders', request_serializer=eventarc.ListProvidersRequest.serialize, response_deserializer=eventarc.ListProvidersResponse.deserialize, @@ -600,7 +676,7 @@ def get_channel_connection(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_channel_connection' not in self._stubs: - self._stubs['get_channel_connection'] = self.grpc_channel.unary_unary( + self._stubs['get_channel_connection'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/GetChannelConnection', request_serializer=eventarc.GetChannelConnectionRequest.serialize, response_deserializer=channel_connection.ChannelConnection.deserialize, @@ -626,7 +702,7 @@ def list_channel_connections(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_channel_connections' not in self._stubs: - self._stubs['list_channel_connections'] = self.grpc_channel.unary_unary( + self._stubs['list_channel_connections'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/ListChannelConnections', request_serializer=eventarc.ListChannelConnectionsRequest.serialize, response_deserializer=eventarc.ListChannelConnectionsResponse.deserialize, @@ -653,7 +729,7 @@ def create_channel_connection(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_channel_connection' not in self._stubs: - self._stubs['create_channel_connection'] = self.grpc_channel.unary_unary( + self._stubs['create_channel_connection'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/CreateChannelConnection', request_serializer=eventarc.CreateChannelConnectionRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -679,7 +755,7 @@ def delete_channel_connection(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_channel_connection' not in self._stubs: - self._stubs['delete_channel_connection'] = self.grpc_channel.unary_unary( + self._stubs['delete_channel_connection'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/DeleteChannelConnection', request_serializer=eventarc.DeleteChannelConnectionRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -705,7 +781,7 @@ def get_google_channel_config(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_google_channel_config' not in self._stubs: - self._stubs['get_google_channel_config'] = self.grpc_channel.unary_unary( + self._stubs['get_google_channel_config'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/GetGoogleChannelConfig', request_serializer=eventarc.GetGoogleChannelConfigRequest.serialize, response_deserializer=google_channel_config.GoogleChannelConfig.deserialize, @@ -731,7 +807,7 @@ def update_google_channel_config(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_google_channel_config' not in self._stubs: - self._stubs['update_google_channel_config'] = self.grpc_channel.unary_unary( + self._stubs['update_google_channel_config'] = self._logged_channel.unary_unary( '/google.cloud.eventarc.v1.Eventarc/UpdateGoogleChannelConfig', request_serializer=eventarc.UpdateGoogleChannelConfigRequest.serialize, response_deserializer=gce_google_channel_config.GoogleChannelConfig.deserialize, @@ -884,7 +960,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -901,7 +977,7 @@ def delete_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "delete_operation" not in self._stubs: - self._stubs["delete_operation"] = self.grpc_channel.unary_unary( + self._stubs["delete_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/DeleteOperation", request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, response_deserializer=None, @@ -919,7 +995,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -937,7 +1013,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -955,7 +1031,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, @@ -973,7 +1049,7 @@ def list_locations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_locations" not in self._stubs: - self._stubs["list_locations"] = self.grpc_channel.unary_unary( + self._stubs["list_locations"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/ListLocations", request_serializer=locations_pb2.ListLocationsRequest.SerializeToString, response_deserializer=locations_pb2.ListLocationsResponse.FromString, @@ -991,7 +1067,7 @@ def get_location( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_location" not in self._stubs: - self._stubs["get_location"] = self.grpc_channel.unary_unary( + self._stubs["get_location"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/GetLocation", request_serializer=locations_pb2.GetLocationRequest.SerializeToString, response_deserializer=locations_pb2.Location.FromString, @@ -1016,7 +1092,7 @@ def set_iam_policy( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "set_iam_policy" not in self._stubs: - self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/SetIamPolicy", request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -1042,7 +1118,7 @@ def get_iam_policy( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_iam_policy" not in self._stubs: - self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary( + self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/GetIamPolicy", request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, response_deserializer=policy_pb2.Policy.FromString, @@ -1070,7 +1146,7 @@ def test_iam_permissions( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "test_iam_permissions" not in self._stubs: - self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary( + self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( "/google.iam.v1.IAMPolicy/TestIamPermissions", request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py index b8de6fe5d1..02789efdee 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -54,7 +55,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -74,7 +80,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -1269,7 +1275,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -1287,7 +1293,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -1305,7 +1311,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py index 9479b4476c..78495902e8 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -24,8 +27,11 @@ from google.api_core import operations_v1 from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.logging_v2.types import logging_config @@ -34,6 +40,73 @@ from .base import ConfigServiceV2Transport, DEFAULT_CLIENT_INFO from .grpc import ConfigServiceV2GrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class ConfigServiceV2GrpcAsyncIOTransport(ConfigServiceV2Transport): """gRPC AsyncIO backend transport for ConfigServiceV2. @@ -227,8 +300,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -251,7 +327,7 @@ def operations_client(self) -> operations_v1.OperationsAsyncClient: # Quick check: Only create a new client if we do not already have one. if self._operations_client is None: self._operations_client = operations_v1.OperationsAsyncClient( - self.grpc_channel + self._logged_channel ) # Return the client from cache. @@ -276,7 +352,7 @@ def list_buckets(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_buckets' not in self._stubs: - self._stubs['list_buckets'] = self.grpc_channel.unary_unary( + self._stubs['list_buckets'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/ListBuckets', request_serializer=logging_config.ListBucketsRequest.serialize, response_deserializer=logging_config.ListBucketsResponse.deserialize, @@ -302,7 +378,7 @@ def get_bucket(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_bucket' not in self._stubs: - self._stubs['get_bucket'] = self.grpc_channel.unary_unary( + self._stubs['get_bucket'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetBucket', request_serializer=logging_config.GetBucketRequest.serialize, response_deserializer=logging_config.LogBucket.deserialize, @@ -331,7 +407,7 @@ def create_bucket_async(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_bucket_async' not in self._stubs: - self._stubs['create_bucket_async'] = self.grpc_channel.unary_unary( + self._stubs['create_bucket_async'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateBucketAsync', request_serializer=logging_config.CreateBucketRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -363,7 +439,7 @@ def update_bucket_async(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_bucket_async' not in self._stubs: - self._stubs['update_bucket_async'] = self.grpc_channel.unary_unary( + self._stubs['update_bucket_async'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateBucketAsync', request_serializer=logging_config.UpdateBucketRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -391,7 +467,7 @@ def create_bucket(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_bucket' not in self._stubs: - self._stubs['create_bucket'] = self.grpc_channel.unary_unary( + self._stubs['create_bucket'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateBucket', request_serializer=logging_config.CreateBucketRequest.serialize, response_deserializer=logging_config.LogBucket.deserialize, @@ -423,7 +499,7 @@ def update_bucket(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_bucket' not in self._stubs: - self._stubs['update_bucket'] = self.grpc_channel.unary_unary( + self._stubs['update_bucket'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateBucket', request_serializer=logging_config.UpdateBucketRequest.serialize, response_deserializer=logging_config.LogBucket.deserialize, @@ -454,7 +530,7 @@ def delete_bucket(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_bucket' not in self._stubs: - self._stubs['delete_bucket'] = self.grpc_channel.unary_unary( + self._stubs['delete_bucket'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/DeleteBucket', request_serializer=logging_config.DeleteBucketRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -482,7 +558,7 @@ def undelete_bucket(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'undelete_bucket' not in self._stubs: - self._stubs['undelete_bucket'] = self.grpc_channel.unary_unary( + self._stubs['undelete_bucket'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UndeleteBucket', request_serializer=logging_config.UndeleteBucketRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -508,7 +584,7 @@ def list_views(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_views' not in self._stubs: - self._stubs['list_views'] = self.grpc_channel.unary_unary( + self._stubs['list_views'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/ListViews', request_serializer=logging_config.ListViewsRequest.serialize, response_deserializer=logging_config.ListViewsResponse.deserialize, @@ -534,7 +610,7 @@ def get_view(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_view' not in self._stubs: - self._stubs['get_view'] = self.grpc_channel.unary_unary( + self._stubs['get_view'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetView', request_serializer=logging_config.GetViewRequest.serialize, response_deserializer=logging_config.LogView.deserialize, @@ -561,7 +637,7 @@ def create_view(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_view' not in self._stubs: - self._stubs['create_view'] = self.grpc_channel.unary_unary( + self._stubs['create_view'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateView', request_serializer=logging_config.CreateViewRequest.serialize, response_deserializer=logging_config.LogView.deserialize, @@ -591,7 +667,7 @@ def update_view(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_view' not in self._stubs: - self._stubs['update_view'] = self.grpc_channel.unary_unary( + self._stubs['update_view'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateView', request_serializer=logging_config.UpdateViewRequest.serialize, response_deserializer=logging_config.LogView.deserialize, @@ -620,7 +696,7 @@ def delete_view(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_view' not in self._stubs: - self._stubs['delete_view'] = self.grpc_channel.unary_unary( + self._stubs['delete_view'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/DeleteView', request_serializer=logging_config.DeleteViewRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -646,7 +722,7 @@ def list_sinks(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_sinks' not in self._stubs: - self._stubs['list_sinks'] = self.grpc_channel.unary_unary( + self._stubs['list_sinks'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/ListSinks', request_serializer=logging_config.ListSinksRequest.serialize, response_deserializer=logging_config.ListSinksResponse.deserialize, @@ -672,7 +748,7 @@ def get_sink(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_sink' not in self._stubs: - self._stubs['get_sink'] = self.grpc_channel.unary_unary( + self._stubs['get_sink'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetSink', request_serializer=logging_config.GetSinkRequest.serialize, response_deserializer=logging_config.LogSink.deserialize, @@ -702,7 +778,7 @@ def create_sink(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_sink' not in self._stubs: - self._stubs['create_sink'] = self.grpc_channel.unary_unary( + self._stubs['create_sink'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateSink', request_serializer=logging_config.CreateSinkRequest.serialize, response_deserializer=logging_config.LogSink.deserialize, @@ -733,7 +809,7 @@ def update_sink(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_sink' not in self._stubs: - self._stubs['update_sink'] = self.grpc_channel.unary_unary( + self._stubs['update_sink'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateSink', request_serializer=logging_config.UpdateSinkRequest.serialize, response_deserializer=logging_config.LogSink.deserialize, @@ -760,7 +836,7 @@ def delete_sink(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_sink' not in self._stubs: - self._stubs['delete_sink'] = self.grpc_channel.unary_unary( + self._stubs['delete_sink'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/DeleteSink', request_serializer=logging_config.DeleteSinkRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -789,7 +865,7 @@ def create_link(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_link' not in self._stubs: - self._stubs['create_link'] = self.grpc_channel.unary_unary( + self._stubs['create_link'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateLink', request_serializer=logging_config.CreateLinkRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -816,7 +892,7 @@ def delete_link(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_link' not in self._stubs: - self._stubs['delete_link'] = self.grpc_channel.unary_unary( + self._stubs['delete_link'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/DeleteLink', request_serializer=logging_config.DeleteLinkRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -842,7 +918,7 @@ def list_links(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_links' not in self._stubs: - self._stubs['list_links'] = self.grpc_channel.unary_unary( + self._stubs['list_links'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/ListLinks', request_serializer=logging_config.ListLinksRequest.serialize, response_deserializer=logging_config.ListLinksResponse.deserialize, @@ -868,7 +944,7 @@ def get_link(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_link' not in self._stubs: - self._stubs['get_link'] = self.grpc_channel.unary_unary( + self._stubs['get_link'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetLink', request_serializer=logging_config.GetLinkRequest.serialize, response_deserializer=logging_config.Link.deserialize, @@ -895,7 +971,7 @@ def list_exclusions(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_exclusions' not in self._stubs: - self._stubs['list_exclusions'] = self.grpc_channel.unary_unary( + self._stubs['list_exclusions'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/ListExclusions', request_serializer=logging_config.ListExclusionsRequest.serialize, response_deserializer=logging_config.ListExclusionsResponse.deserialize, @@ -921,7 +997,7 @@ def get_exclusion(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_exclusion' not in self._stubs: - self._stubs['get_exclusion'] = self.grpc_channel.unary_unary( + self._stubs['get_exclusion'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetExclusion', request_serializer=logging_config.GetExclusionRequest.serialize, response_deserializer=logging_config.LogExclusion.deserialize, @@ -949,7 +1025,7 @@ def create_exclusion(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_exclusion' not in self._stubs: - self._stubs['create_exclusion'] = self.grpc_channel.unary_unary( + self._stubs['create_exclusion'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CreateExclusion', request_serializer=logging_config.CreateExclusionRequest.serialize, response_deserializer=logging_config.LogExclusion.deserialize, @@ -976,7 +1052,7 @@ def update_exclusion(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_exclusion' not in self._stubs: - self._stubs['update_exclusion'] = self.grpc_channel.unary_unary( + self._stubs['update_exclusion'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateExclusion', request_serializer=logging_config.UpdateExclusionRequest.serialize, response_deserializer=logging_config.LogExclusion.deserialize, @@ -1002,7 +1078,7 @@ def delete_exclusion(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_exclusion' not in self._stubs: - self._stubs['delete_exclusion'] = self.grpc_channel.unary_unary( + self._stubs['delete_exclusion'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/DeleteExclusion', request_serializer=logging_config.DeleteExclusionRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -1037,7 +1113,7 @@ def get_cmek_settings(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_cmek_settings' not in self._stubs: - self._stubs['get_cmek_settings'] = self.grpc_channel.unary_unary( + self._stubs['get_cmek_settings'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetCmekSettings', request_serializer=logging_config.GetCmekSettingsRequest.serialize, response_deserializer=logging_config.CmekSettings.deserialize, @@ -1077,7 +1153,7 @@ def update_cmek_settings(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_cmek_settings' not in self._stubs: - self._stubs['update_cmek_settings'] = self.grpc_channel.unary_unary( + self._stubs['update_cmek_settings'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateCmekSettings', request_serializer=logging_config.UpdateCmekSettingsRequest.serialize, response_deserializer=logging_config.CmekSettings.deserialize, @@ -1113,7 +1189,7 @@ def get_settings(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_settings' not in self._stubs: - self._stubs['get_settings'] = self.grpc_channel.unary_unary( + self._stubs['get_settings'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/GetSettings', request_serializer=logging_config.GetSettingsRequest.serialize, response_deserializer=logging_config.Settings.deserialize, @@ -1156,7 +1232,7 @@ def update_settings(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_settings' not in self._stubs: - self._stubs['update_settings'] = self.grpc_channel.unary_unary( + self._stubs['update_settings'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/UpdateSettings', request_serializer=logging_config.UpdateSettingsRequest.serialize, response_deserializer=logging_config.Settings.deserialize, @@ -1183,7 +1259,7 @@ def copy_log_entries(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'copy_log_entries' not in self._stubs: - self._stubs['copy_log_entries'] = self.grpc_channel.unary_unary( + self._stubs['copy_log_entries'] = self._logged_channel.unary_unary( '/google.logging.v2.ConfigServiceV2/CopyLogEntries', request_serializer=logging_config.CopyLogEntriesRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -1453,7 +1529,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -1470,7 +1546,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -1488,7 +1564,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -1506,7 +1582,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py index de25f5f6fd..718d4ac671 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -53,7 +54,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -73,7 +79,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -493,7 +499,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -511,7 +517,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -529,7 +535,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py index 83a400473f..6933d0f2ba 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -23,8 +26,11 @@ from google.api_core import retry_async as retries from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.logging_v2.types import logging @@ -33,6 +39,73 @@ from .base import LoggingServiceV2Transport, DEFAULT_CLIENT_INFO from .grpc import LoggingServiceV2GrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class LoggingServiceV2GrpcAsyncIOTransport(LoggingServiceV2Transport): """gRPC AsyncIO backend transport for LoggingServiceV2. @@ -225,8 +298,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -262,7 +338,7 @@ def delete_log(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_log' not in self._stubs: - self._stubs['delete_log'] = self.grpc_channel.unary_unary( + self._stubs['delete_log'] = self._logged_channel.unary_unary( '/google.logging.v2.LoggingServiceV2/DeleteLog', request_serializer=logging.DeleteLogRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -294,7 +370,7 @@ def write_log_entries(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'write_log_entries' not in self._stubs: - self._stubs['write_log_entries'] = self.grpc_channel.unary_unary( + self._stubs['write_log_entries'] = self._logged_channel.unary_unary( '/google.logging.v2.LoggingServiceV2/WriteLogEntries', request_serializer=logging.WriteLogEntriesRequest.serialize, response_deserializer=logging.WriteLogEntriesResponse.deserialize, @@ -323,7 +399,7 @@ def list_log_entries(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_log_entries' not in self._stubs: - self._stubs['list_log_entries'] = self.grpc_channel.unary_unary( + self._stubs['list_log_entries'] = self._logged_channel.unary_unary( '/google.logging.v2.LoggingServiceV2/ListLogEntries', request_serializer=logging.ListLogEntriesRequest.serialize, response_deserializer=logging.ListLogEntriesResponse.deserialize, @@ -351,7 +427,7 @@ def list_monitored_resource_descriptors(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_monitored_resource_descriptors' not in self._stubs: - self._stubs['list_monitored_resource_descriptors'] = self.grpc_channel.unary_unary( + self._stubs['list_monitored_resource_descriptors'] = self._logged_channel.unary_unary( '/google.logging.v2.LoggingServiceV2/ListMonitoredResourceDescriptors', request_serializer=logging.ListMonitoredResourceDescriptorsRequest.serialize, response_deserializer=logging.ListMonitoredResourceDescriptorsResponse.deserialize, @@ -379,7 +455,7 @@ def list_logs(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_logs' not in self._stubs: - self._stubs['list_logs'] = self.grpc_channel.unary_unary( + self._stubs['list_logs'] = self._logged_channel.unary_unary( '/google.logging.v2.LoggingServiceV2/ListLogs', request_serializer=logging.ListLogsRequest.serialize, response_deserializer=logging.ListLogsResponse.deserialize, @@ -407,7 +483,7 @@ def tail_log_entries(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'tail_log_entries' not in self._stubs: - self._stubs['tail_log_entries'] = self.grpc_channel.stream_stream( + self._stubs['tail_log_entries'] = self._logged_channel.stream_stream( '/google.logging.v2.LoggingServiceV2/TailLogEntries', request_serializer=logging.TailLogEntriesRequest.serialize, response_deserializer=logging.TailLogEntriesResponse.deserialize, @@ -536,7 +612,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -553,7 +629,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -571,7 +647,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -589,7 +665,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py index 9f6330cbb3..f393fd814e 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -53,7 +54,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -73,7 +79,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -448,7 +454,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -466,7 +472,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -484,7 +490,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py index 2707c88cf5..b382f48506 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -23,8 +26,11 @@ from google.api_core import retry_async as retries from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.logging_v2.types import logging_metrics @@ -33,6 +39,73 @@ from .base import MetricsServiceV2Transport, DEFAULT_CLIENT_INFO from .grpc import MetricsServiceV2GrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class MetricsServiceV2GrpcAsyncIOTransport(MetricsServiceV2Transport): """gRPC AsyncIO backend transport for MetricsServiceV2. @@ -225,8 +298,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -258,7 +334,7 @@ def list_log_metrics(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_log_metrics' not in self._stubs: - self._stubs['list_log_metrics'] = self.grpc_channel.unary_unary( + self._stubs['list_log_metrics'] = self._logged_channel.unary_unary( '/google.logging.v2.MetricsServiceV2/ListLogMetrics', request_serializer=logging_metrics.ListLogMetricsRequest.serialize, response_deserializer=logging_metrics.ListLogMetricsResponse.deserialize, @@ -284,7 +360,7 @@ def get_log_metric(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_log_metric' not in self._stubs: - self._stubs['get_log_metric'] = self.grpc_channel.unary_unary( + self._stubs['get_log_metric'] = self._logged_channel.unary_unary( '/google.logging.v2.MetricsServiceV2/GetLogMetric', request_serializer=logging_metrics.GetLogMetricRequest.serialize, response_deserializer=logging_metrics.LogMetric.deserialize, @@ -310,7 +386,7 @@ def create_log_metric(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_log_metric' not in self._stubs: - self._stubs['create_log_metric'] = self.grpc_channel.unary_unary( + self._stubs['create_log_metric'] = self._logged_channel.unary_unary( '/google.logging.v2.MetricsServiceV2/CreateLogMetric', request_serializer=logging_metrics.CreateLogMetricRequest.serialize, response_deserializer=logging_metrics.LogMetric.deserialize, @@ -336,7 +412,7 @@ def update_log_metric(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_log_metric' not in self._stubs: - self._stubs['update_log_metric'] = self.grpc_channel.unary_unary( + self._stubs['update_log_metric'] = self._logged_channel.unary_unary( '/google.logging.v2.MetricsServiceV2/UpdateLogMetric', request_serializer=logging_metrics.UpdateLogMetricRequest.serialize, response_deserializer=logging_metrics.LogMetric.deserialize, @@ -362,7 +438,7 @@ def delete_log_metric(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_log_metric' not in self._stubs: - self._stubs['delete_log_metric'] = self.grpc_channel.unary_unary( + self._stubs['delete_log_metric'] = self._logged_channel.unary_unary( '/google.logging.v2.MetricsServiceV2/DeleteLogMetric', request_serializer=logging_metrics.DeleteLogMetricRequest.serialize, response_deserializer=empty_pb2.Empty.FromString, @@ -464,7 +540,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -481,7 +557,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -499,7 +575,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -517,7 +593,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py index cf5ec9a7fe..76c07f74bb 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import logging as std_logging import pickle import warnings @@ -54,7 +55,12 @@ def intercept_unary_unary(self, continuation, client_call_details, request): elif isinstance(request, google.protobuf.message.Message): request_payload = MessageToJson(request) else: - request_payload = f"{type(result).__name__}: {pickle.dumps(request)}" + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } grpc_request = { "payload": request_payload, "requestMethod": "grpc", @@ -74,7 +80,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request): if logging_enabled: # pragma: NO COVER response_metadata = response.trailing_metadata() # Convert gRPC metadata `` to list of tuples - metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None result = response.result() if isinstance(result, proto.Message): response_payload = type(result).to_json(result) @@ -690,7 +696,7 @@ def delete_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "delete_operation" not in self._stubs: - self._stubs["delete_operation"] = self.grpc_channel.unary_unary( + self._stubs["delete_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/DeleteOperation", request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, response_deserializer=None, @@ -708,7 +714,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -726,7 +732,7 @@ def wait_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "wait_operation" not in self._stubs: - self._stubs["wait_operation"] = self.grpc_channel.unary_unary( + self._stubs["wait_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/WaitOperation", request_serializer=operations_pb2.WaitOperationRequest.SerializeToString, response_deserializer=None, @@ -744,7 +750,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -762,7 +768,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, @@ -780,7 +786,7 @@ def list_locations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_locations" not in self._stubs: - self._stubs["list_locations"] = self.grpc_channel.unary_unary( + self._stubs["list_locations"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/ListLocations", request_serializer=locations_pb2.ListLocationsRequest.SerializeToString, response_deserializer=locations_pb2.ListLocationsResponse.FromString, @@ -798,7 +804,7 @@ def get_location( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_location" not in self._stubs: - self._stubs["get_location"] = self.grpc_channel.unary_unary( + self._stubs["get_location"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/GetLocation", request_serializer=locations_pb2.GetLocationRequest.SerializeToString, response_deserializer=locations_pb2.Location.FromString, diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py index 78b7c4d028..aa699fc707 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py @@ -14,6 +14,9 @@ # limitations under the License. # import inspect +import json +import pickle +import logging as std_logging import warnings from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union @@ -24,8 +27,11 @@ from google.api_core import operations_v1 from google.auth import credentials as ga_credentials # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore +from google.protobuf.json_format import MessageToJson +import google.protobuf.message import grpc # type: ignore +import proto # type: ignore from grpc.experimental import aio # type: ignore from google.cloud.location import locations_pb2 # type: ignore @@ -34,6 +40,73 @@ from .base import CloudRedisTransport, DEFAULT_CLIENT_INFO from .grpc import CloudRedisGrpcTransport +try: + from google.api_core import client_logging # type: ignore + CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER +except ImportError: # pragma: NO COVER + CLIENT_LOGGING_SUPPORTED = False + +_LOGGER = std_logging.getLogger(__name__) + + +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER + async def intercept_unary_unary(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response = await continuation(client_call_details, request) + if logging_enabled: # pragma: NO COVER + response_metadata = await response.trailing_metadata() + # Convert gRPC metadata `` to list of tuples + metadata = dict([(k, str(v)) for k, v in response_metadata]) if response_metadata else None + result = await response + if isinstance(result, proto.Message): + response_payload = type(result).to_json(result) + elif isinstance(result, google.protobuf.message.Message): + response_payload = MessageToJson(result) + else: + response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" + grpc_response = { + "payload": response_payload, + "metadata": metadata, + "status": "OK", + } + _LOGGER.debug( + f"Received response to rpc {client_call_details.method}.", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "response": grpc_response, + "metadata": grpc_response["metadata"], + }, + ) + return response + class CloudRedisGrpcAsyncIOTransport(CloudRedisTransport): """gRPC AsyncIO backend transport for CloudRedis. @@ -247,8 +320,11 @@ def __init__(self, *, ], ) - # Wrap messages. This must be done after self._grpc_channel exists + self._interceptor = _LoggingClientAIOInterceptor() + self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters + # Wrap messages. This must be done after self._logged_channel exists self._prep_wrapped_messages(client_info) @property @@ -271,7 +347,7 @@ def operations_client(self) -> operations_v1.OperationsAsyncClient: # Quick check: Only create a new client if we do not already have one. if self._operations_client is None: self._operations_client = operations_v1.OperationsAsyncClient( - self.grpc_channel + self._logged_channel ) # Return the client from cache. @@ -305,7 +381,7 @@ def list_instances(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'list_instances' not in self._stubs: - self._stubs['list_instances'] = self.grpc_channel.unary_unary( + self._stubs['list_instances'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/ListInstances', request_serializer=cloud_redis.ListInstancesRequest.serialize, response_deserializer=cloud_redis.ListInstancesResponse.deserialize, @@ -331,7 +407,7 @@ def get_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_instance' not in self._stubs: - self._stubs['get_instance'] = self.grpc_channel.unary_unary( + self._stubs['get_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/GetInstance', request_serializer=cloud_redis.GetInstanceRequest.serialize, response_deserializer=cloud_redis.Instance.deserialize, @@ -360,7 +436,7 @@ def get_instance_auth_string(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'get_instance_auth_string' not in self._stubs: - self._stubs['get_instance_auth_string'] = self.grpc_channel.unary_unary( + self._stubs['get_instance_auth_string'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/GetInstanceAuthString', request_serializer=cloud_redis.GetInstanceAuthStringRequest.serialize, response_deserializer=cloud_redis.InstanceAuthString.deserialize, @@ -399,7 +475,7 @@ def create_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'create_instance' not in self._stubs: - self._stubs['create_instance'] = self.grpc_channel.unary_unary( + self._stubs['create_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/CreateInstance', request_serializer=cloud_redis.CreateInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -430,7 +506,7 @@ def update_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'update_instance' not in self._stubs: - self._stubs['update_instance'] = self.grpc_channel.unary_unary( + self._stubs['update_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/UpdateInstance', request_serializer=cloud_redis.UpdateInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -457,7 +533,7 @@ def upgrade_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'upgrade_instance' not in self._stubs: - self._stubs['upgrade_instance'] = self.grpc_channel.unary_unary( + self._stubs['upgrade_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/UpgradeInstance', request_serializer=cloud_redis.UpgradeInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -491,7 +567,7 @@ def import_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'import_instance' not in self._stubs: - self._stubs['import_instance'] = self.grpc_channel.unary_unary( + self._stubs['import_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/ImportInstance', request_serializer=cloud_redis.ImportInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -522,7 +598,7 @@ def export_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'export_instance' not in self._stubs: - self._stubs['export_instance'] = self.grpc_channel.unary_unary( + self._stubs['export_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/ExportInstance', request_serializer=cloud_redis.ExportInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -550,7 +626,7 @@ def failover_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'failover_instance' not in self._stubs: - self._stubs['failover_instance'] = self.grpc_channel.unary_unary( + self._stubs['failover_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/FailoverInstance', request_serializer=cloud_redis.FailoverInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -577,7 +653,7 @@ def delete_instance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'delete_instance' not in self._stubs: - self._stubs['delete_instance'] = self.grpc_channel.unary_unary( + self._stubs['delete_instance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/DeleteInstance', request_serializer=cloud_redis.DeleteInstanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -604,7 +680,7 @@ def reschedule_maintenance(self) -> Callable[ # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if 'reschedule_maintenance' not in self._stubs: - self._stubs['reschedule_maintenance'] = self.grpc_channel.unary_unary( + self._stubs['reschedule_maintenance'] = self._logged_channel.unary_unary( '/google.cloud.redis.v1.CloudRedis/RescheduleMaintenance', request_serializer=cloud_redis.RescheduleMaintenanceRequest.serialize, response_deserializer=operations_pb2.Operation.FromString, @@ -712,7 +788,7 @@ def _wrap_method(self, func, *args, **kwargs): return gapic_v1.method_async.wrap_method(func, *args, **kwargs) def close(self): - return self.grpc_channel.close() + return self._logged_channel.close() @property def kind(self) -> str: @@ -729,7 +805,7 @@ def delete_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "delete_operation" not in self._stubs: - self._stubs["delete_operation"] = self.grpc_channel.unary_unary( + self._stubs["delete_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/DeleteOperation", request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, response_deserializer=None, @@ -747,7 +823,7 @@ def cancel_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "cancel_operation" not in self._stubs: - self._stubs["cancel_operation"] = self.grpc_channel.unary_unary( + self._stubs["cancel_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/CancelOperation", request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, response_deserializer=None, @@ -765,7 +841,7 @@ def wait_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "wait_operation" not in self._stubs: - self._stubs["wait_operation"] = self.grpc_channel.unary_unary( + self._stubs["wait_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/WaitOperation", request_serializer=operations_pb2.WaitOperationRequest.SerializeToString, response_deserializer=None, @@ -783,7 +859,7 @@ def get_operation( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_operation" not in self._stubs: - self._stubs["get_operation"] = self.grpc_channel.unary_unary( + self._stubs["get_operation"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/GetOperation", request_serializer=operations_pb2.GetOperationRequest.SerializeToString, response_deserializer=operations_pb2.Operation.FromString, @@ -801,7 +877,7 @@ def list_operations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_operations" not in self._stubs: - self._stubs["list_operations"] = self.grpc_channel.unary_unary( + self._stubs["list_operations"] = self._logged_channel.unary_unary( "/google.longrunning.Operations/ListOperations", request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, response_deserializer=operations_pb2.ListOperationsResponse.FromString, @@ -819,7 +895,7 @@ def list_locations( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "list_locations" not in self._stubs: - self._stubs["list_locations"] = self.grpc_channel.unary_unary( + self._stubs["list_locations"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/ListLocations", request_serializer=locations_pb2.ListLocationsRequest.SerializeToString, response_deserializer=locations_pb2.ListLocationsResponse.FromString, @@ -837,7 +913,7 @@ def get_location( # gRPC handles serialization and deserialization, so we just need # to pass in the functions for each. if "get_location" not in self._stubs: - self._stubs["get_location"] = self.grpc_channel.unary_unary( + self._stubs["get_location"] = self._logged_channel.unary_unary( "/google.cloud.location.Locations/GetLocation", request_serializer=locations_pb2.GetLocationRequest.SerializeToString, response_deserializer=locations_pb2.Location.FromString,