Skip to content

Commit

Permalink
feat: add client logging support for sync gRPC (#2284)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohmayr authored Dec 11, 2024
1 parent b10cc21 commit dddf797
Show file tree
Hide file tree
Showing 8 changed files with 700 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

{% block content %}

import logging as std_logging
import pickle
import warnings
from typing import Callable, Dict, Optional, Sequence, Tuple, Union

Expand All @@ -13,8 +15,11 @@ from google.api_core import gapic_v1
import google.auth # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
from google.protobuf.json_format import MessageToJson
import google.protobuf.message

import grpc # type: ignore
import proto # type: ignore

{% filter sort_lines %}
{% set import_ns = namespace(has_operations_mixin=false) %}
Expand Down Expand Up @@ -42,6 +47,77 @@ from google.longrunning import operations_pb2 # type: ignore
{% endfilter %}
from .base import {{ service.name }}Transport, DEFAULT_CLIENT_INFO

try:
from google.api_core import client_logging # type: ignore
CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
except ImportError: # pragma: NO COVER
CLIENT_LOGGING_SUPPORTED = False

_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic
or wait for next gen protobuf.
#}
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(result).__name__}: {pickle.dumps(request)}"
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "{{ service.meta.address.proto }}",
"rpcName": client_call_details.method,
"request": grpc_request,
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2275): logging `metadata` seems repetitive and may need to be cleaned up. We're including it within "request" for consistency with REST transport. #}
"metadata": grpc_request["metadata"],
},
)

response = continuation(client_call_details, request)
if logging_enabled: # pragma: NO COVER
response_metadata = response.trailing_metadata()
# Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
metadata = dict([(k, v) for k, v in response_metadata]) if response_metadata else None
result = response.result()
if isinstance(result, proto.Message):
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2293): Investigate if we can improve this logic
or wait for next gen protobuf.
#}
response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = MessageToJson(result)
else:
response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
grpc_response = {
"payload": response_payload,
"metadata": metadata,
"status": "OK",
}
_LOGGER.debug(
f"Received response for {client_call_details.method}.",
extra = {
"serviceName": "{{ service.meta.address.proto }}",
"rpcName": client_call_details.method,
"response": grpc_response,
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2275): logging `metadata` seems repetitive and may need to be cleaned up. We're including it within "request" for consistency with REST transport. #}
"metadata": grpc_response["metadata"],
},
)
return response


class {{ service.name }}GrpcTransport({{ service.name }}Transport):
"""gRPC backend transport for {{ service.name }}.
Expand Down Expand Up @@ -195,7 +271,10 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
],
)

# Wrap messages. This must be done after self._grpc_channel exists
self._interceptor = _LoggingClientInterceptor()
self._logged_channel = grpc.intercept_channel(self._grpc_channel, self._interceptor)

# Wrap messages. This must be done after self._logged_channel exists
self._prep_wrapped_messages(client_info)


Expand Down Expand Up @@ -262,7 +341,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
# Quick check: Only create a new client if we do not already have one.
if self._operations_client is None:
self._operations_client = operations_v1.OperationsClient(
self.grpc_channel
self._logged_channel
)

# Return the client from cache.
Expand Down Expand Up @@ -292,7 +371,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if '{{ method.transport_safe_name|snake_case }}' not in self._stubs:
self._stubs['{{ method.transport_safe_name|snake_case }}'] = self.grpc_channel.{{ method.grpc_stub_type }}(
self._stubs['{{ method.transport_safe_name|snake_case }}'] = self._logged_channel.{{ method.grpc_stub_type }}(
'/{{ '.'.join(method.meta.address.package) }}.{{ service.name }}/{{ method.name }}',
request_serializer={{ method.input.ident }}.{% if method.input.ident.python_import.module.endswith('_pb2') %}SerializeToString{% else %}serialize{% endif %},
response_deserializer={{ method.output.ident }}.{% if method.output.ident.python_import.module.endswith('_pb2') %}FromString{% else %}deserialize{% endif %},
Expand Down Expand Up @@ -320,7 +399,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "set_iam_policy" not in self._stubs:
self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/SetIamPolicy",
request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand All @@ -346,7 +425,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_iam_policy" not in self._stubs:
self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/GetIamPolicy",
request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand Down Expand Up @@ -374,7 +453,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "test_iam_permissions" not in self._stubs:
self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/TestIamPermissions",
request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
Expand All @@ -383,7 +462,7 @@ class {{ service.name }}GrpcTransport({{ service.name }}Transport):
{% endif %}

def close(self):
self.grpc_channel.close()
self._logged_channel.close()

{% include '%namespace/%name_%version/%sub/services/%service/transports/_mixins.py.j2' %}

Expand Down
Loading

0 comments on commit dddf797

Please sign in to comment.