Skip to content

Commit

Permalink
feat: adds LRO mixin. (#1304)
Browse files Browse the repository at this point in the history
Adds support for LRO to existing MixIns implementation. It extends client interface with get_operation, wait_operation, cancel_operation, delete_operation, list_operations methods (if they were defined in service yaml).
  • Loading branch information
atulep authored May 24, 2022
1 parent ebbf1f9 commit 18af90a
Show file tree
Hide file tree
Showing 19 changed files with 2,300 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,272 @@
{# LRO mixins #}
{% if api.has_operations_mixin %}
{% if "ListOperations" in api.mixin_api_methods %}
def list_operations(
self,
request: operations_pb2.ListOperationsRequest = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operations_pb2.ListOperationsResponse:
r"""Lists operations that match the specified filter in the request.

Args:
request (:class:`~.operations_pb2.ListOperationsRequest`):
The request object. Request message for
`ListOperations` method.
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
~.operations_pb2.ListOperationsResponse:
Response message for ``ListOperations`` method.
"""
# Create or coerce a protobuf request object.
# The request isn't a proto-plus wrapped type,
# so it must be constructed via keyword expansion.
if isinstance(request, dict):
request = operations_pb2.ListOperationsRequest(**request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method.wrap_method(
self._transport.list_operations,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("name", request.name),)),
)

# Send the request.
response = rpc(
request, retry=retry, timeout=timeout, metadata=metadata,)

# Done; return the response.
return response

{% endif %}

{% if "GetOperation" in api.mixin_api_methods %}
def get_operation(
self,
request: operations_pb2.GetOperationRequest = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operations_pb2.Operation:
r"""Gets the latest state of a long-running operation.

Args:
request (:class:`~.operations_pb2.GetOperationRequest`):
The request object. Request message for
`GetOperation` method.
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
~.operations_pb2.Operation:
An ``Operation`` object.
"""
# Create or coerce a protobuf request object.
# The request isn't a proto-plus wrapped type,
# so it must be constructed via keyword expansion.
if isinstance(request, dict):
request = operations_pb2.GetOperationRequest(**request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method.wrap_method(
self._transport.get_operation,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("name", request.name),)),
)

# Send the request.
response = rpc(
request, retry=retry, timeout=timeout, metadata=metadata,)

# Done; return the response.
return response
{% endif %}

{% if "DeleteOperation" in api.mixin_api_methods %}
def delete_operation(
self,
request: operations_pb2.DeleteOperationRequest = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> None:
r"""Deletes a long-running operation.

This method indicates that the client is no longer interested
in the operation result. It does not cancel the operation.
If the server doesn't support this method, it returns
`google.rpc.Code.UNIMPLEMENTED`.

Args:
request (:class:`~.operations_pb2.DeleteOperationRequest`):
The request object. Request message for
`DeleteOperation` method.
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
None
"""
# Create or coerce a protobuf request object.
# The request isn't a proto-plus wrapped type,
# so it must be constructed via keyword expansion.
if isinstance(request, dict):
request = operations_pb2.DeleteOperationRequest(**request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method.wrap_method(
self._transport.delete_operation,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("name", request.name),)),
)

# Send the request.
rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
{% endif %}

{% if "CancelOperation" in api.mixin_api_methods %}
def cancel_operation(
self,
request: operations_pb2.CancelOperationRequest = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> None:
r"""Starts asynchronous cancellation on a long-running operation.

The server makes a best effort to cancel the operation, but success
is not guaranteed. If the server doesn't support this method, it returns
`google.rpc.Code.UNIMPLEMENTED`.

Args:
request (:class:`~.operations_pb2.CancelOperationRequest`):
The request object. Request message for
`CancelOperation` method.
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
None
"""
# Create or coerce a protobuf request object.
# The request isn't a proto-plus wrapped type,
# so it must be constructed via keyword expansion.
if isinstance(request, dict):
request = operations_pb2.CancelOperationRequest(**request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method.wrap_method(
self._transport.cancel_operation,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("name", request.name),)),
)

# Send the request.
rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
{% endif %}

{% if "WaitOperation" in api.mixin_api_methods %}
def wait_operation(
self,
request: operations_pb2.WaitOperationRequest = None,
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> operations_pb2.Operation:
r"""Waits until the specified long-running operation is done or reaches at most
a specified timeout, returning the latest state.

If the operation is already done, the latest state is immediately returned.
If the timeout specified is greater than the default HTTP/RPC timeout, the HTTP/RPC
timeout is used. If the server does not support this method, it returns
`google.rpc.Code.UNIMPLEMENTED`.

Args:
request (:class:`~.operations_pb2.WaitOperationRequest`):
The request object. Request message for
`WaitOperation` method.
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Returns:
~.operations_pb2.Operation:
An ``Operation`` object.
"""
# Create or coerce a protobuf request object.
# The request isn't a proto-plus wrapped type,
# so it must be constructed via keyword expansion.
if isinstance(request, dict):
request = operations_pb2.WaitOperationRequest(**request)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method.wrap_method(
self._transport.wait_operation,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)

metadata = tuple(metadata)

# Send the request.
response = rpc(
request, retry=retry, timeout=timeout, metadata=metadata,)

# Done; return the response.
return response
{% endif %}
{% endif %} {# LRO #}

{# IAM mixins #}

{# TODO: Remove after https://github.com/googleapis/gapic-generator-python/pull/1240 is merged. #}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ from google.iam.v1 import policy_pb2 # type: ignore
{% if api.has_location_mixin %}
from google.cloud.location import locations_pb2 # type: ignore
{% endif %}
{% if api.has_operations_mixin %}
from google.longrunning import operations_pb2
{% endif %}
{% endfilter %}
from .transports.base import {{ service.name }}Transport, DEFAULT_CLIENT_INFO
{% if 'grpc' in opts.transport %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,109 @@
{% if "grpc" in opts.transport %}

{% if api.has_operations_mixin %}

{% if "DeleteOperation" in api.mixin_api_methods %}
@property
def delete_operation(
self,
) -> Callable[[operations_pb2.DeleteOperationRequest], None]:
r"""Return a callable for the delete_operation method over gRPC.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "delete_operation" not in self._stubs:
self._stubs["delete_operation"] = self.grpc_channel.unary_unary(
"/google.longrunning.Operations/DeleteOperation",
request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString,
response_deserializer=None,
)
return self._stubs["delete_operation"]
{% endif %}

{% if "CancelOperation" in api.mixin_api_methods %}
@property
def cancel_operation(
self,
) -> Callable[[operations_pb2.CancelOperationRequest], None]:
r"""Return a callable for the cancel_operation method over gRPC.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "cancel_operation" not in self._stubs:
self._stubs["cancel_operation"] = self.grpc_channel.unary_unary(
"/google.longrunning.Operations/CancelOperation",
request_serializer=operations_pb2.CancelOperationRequest.SerializeToString,
response_deserializer=None,
)
return self._stubs["cancel_operation"]
{% endif %}

{% if "WaitOperation" in api.mixin_api_methods %}
@property
def wait_operation(
self,
) -> Callable[[operations_pb2.WaitOperationRequest], None]:
r"""Return a callable for the wait_operation method over gRPC.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "delete_operation" not in self._stubs:
self._stubs["wait_operation"] = self.grpc_channel.unary_unary(
"/google.longrunning.Operations/WaitOperation",
request_serializer=operations_pb2.WaitOperationRequest.SerializeToString,
response_deserializer=None,
)
return self._stubs["wait_operation"]
{% endif %}

{% if "GetOperation" in api.mixin_api_methods %}
@property
def get_operation(
self,
) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
r"""Return a callable for the get_operation method over gRPC.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_operation" not in self._stubs:
self._stubs["get_operation"] = self.grpc_channel.unary_unary(
"/google.longrunning.Operations/GetOperation",
request_serializer=operations_pb2.GetOperationRequest.SerializeToString,
response_deserializer=operations_pb2.Operation.FromString,
)
return self._stubs["get_operation"]
{% endif %}

{% if "ListOperations" in api.mixin_api_methods %}
@property
def list_operations(
self,
) -> Callable[[operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse]:
r"""Return a callable for the list_operations method over gRPC.
"""
# Generate a "stub function" on-the-fly which will actually make
# the request.
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_operations" not in self._stubs:
self._stubs["list_operations"] = self.grpc_channel.unary_unary(
"/google.longrunning.Operations/ListOperations",
request_serializer=operations_pb2.ListOperationsRequest.SerializeToString,
response_deserializer=operations_pb2.ListOperationsResponse.FromString,
)
return self._stubs["list_operations"]
{% endif %}

{% endif %} {# LRO #}

{% if api.has_location_mixin %}

{% if "ListLocations" in api.mixin_api_methods %}
Expand Down
Loading

0 comments on commit 18af90a

Please sign in to comment.