From 3fda28936348a4c52a788e90bfdef0aea20abc8b Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Mon, 1 Apr 2024 14:52:56 -0700 Subject: [PATCH] Add comments and readme file for eventstream module (#700) --- eventstream_rpc/README.md | 40 ++++ .../aws/eventstreamrpc/EventStreamClient.h | 188 +++++++++++++++++- 2 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 eventstream_rpc/README.md diff --git a/eventstream_rpc/README.md b/eventstream_rpc/README.md new file mode 100644 index 000000000..db057c527 --- /dev/null +++ b/eventstream_rpc/README.md @@ -0,0 +1,40 @@ +# Eventstream RPC + +This module implements client for eventstream RPC. The eventstream RPC client provides a base for defining custom RPC +operations and implements interaction with RPC server. + +## Client Connection + +`ClientConnection` class represents the connection to the RPC server. It's responsible for connection lifetime and +can send connection-level messages (e.g. sending PING or opening a new stream). + +## Client Stream/Continuation + +Application-level messages are supposed to be sent over streams. `ClientConnection::NewStream` creates a new stream which is represented +by the `ClientContinuation` class. +The new stream must be activated with `ClientContinuation::Activate` method. This method initiates sending messages over the wire. + +Application-level messages can be sent using `ClientContinuation::SendMessage` method. + +## Client Operation + +Based upon a client continuation, this entity represents an abstract operation that the client can perform. Concrete operations +should be implemented as classes inherited from the `ClientOperation` class. + +Each operation has input data (referred as **Request**) and output data (referred as **Response**). +Requests are sent by the `Activate` method. + +### Example of a Client Operation + +[DeleteThingShadowOperation](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html) +defines an RPC operation for deleting a thing shadow. + +[DeleteThingShadowOperation::Activate](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a7aca4de69329780dfa9ff17315e68b23) +takes [DeleteThingShadowRequest](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_request.html) +instance as an argument and sends it to the server. + +After the request is done, you can use [DeleteThingShadowOperation::GetResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a261fa97489a6e2015e11f7c25aa13ee9) +to get an instance of the [DeleteThingShadowResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html) +class, an auxiliary wrapper class returning a response in case of success or an error in case of failure. To obtain +an actual response, use [DeleteThingShadowResult::GetOperationResponse](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html#a6f4c4987311e016d20cf8e82808181fd) +method. diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 34240a7ee..95d914e97 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -223,6 +223,9 @@ namespace Aws Crt::String StatusToString(); }; + /** + * Handler interface for connection lifecycle events. + */ class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler { public: @@ -255,7 +258,9 @@ namespace Aws const Crt::Optional &payload); }; - /* User data passed to callbacks for a new stream. */ + /** + * User data passed to callbacks for a new stream. + */ class AWS_EVENTSTREAMRPC_API ContinuationCallbackData { public: @@ -273,6 +278,9 @@ namespace Aws Crt::Allocator *allocator; }; + /** + * Handler interface for continuation events. + */ class AWS_EVENTSTREAMRPC_API ClientContinuationHandler { public: @@ -299,14 +307,38 @@ namespace Aws ContinuationCallbackData *m_callbackData; }; + /** + * A wrapper for event-stream-rpc client continuation. + */ class AWS_EVENTSTREAMRPC_API ClientContinuation final { public: + /** + * Create a new continuation. + * + * @note continuation_option's callbacks will not be invoked, and nothing will be sent across + * the wire until Activate() is invoked. + * @param connection Connection on which open a new stream. + * @param continuationHandler A set of callbacks that will be invoked for continuation events. + * @param allocator Allocator to use. + */ ClientContinuation( ClientConnection *connection, ClientContinuationHandler &continuationHandler, Crt::Allocator *allocator) noexcept; ~ClientContinuation() noexcept; + + /** + * Initiate a new client stream. Send new message for the new stream. + * @param operation Name for the operation to be invoked by the peer endpoint. + * @param headers Headers for the eventstream message. + * @param payload Payload for the eventstream message. + * @param messageType Message type for the message. + * @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values. + * @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying + * transport. + * @return Future that will be resolved when the message has either been written to the wire or it fails. + */ std::future Activate( const Crt::String &operation, const Crt::List &headers, @@ -314,8 +346,24 @@ namespace Aws MessageType messageType, uint32_t messageFlags, OnMessageFlushCallback onMessageFlushCallback) noexcept; + + /** + * Check if the continuation has been closed. + * @return True if the continuation has been closed, false otherwise. + */ bool IsClosed() noexcept; void Release() noexcept; + + /** + * Send message on the continuation. + * @param headers List of additional event stream headers to include on the message. + * @param payload Message payload. + * @param messageType Message type for the message. + * @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values. + * @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying + * transport. + * @return Future that will be resolved when the message has either been written to the wire or it fails. + */ std::future SendMessage( const Crt::List &headers, const Crt::Optional &payload, @@ -329,6 +377,7 @@ namespace Aws ClientContinuationHandler &m_continuationHandler; struct aws_event_stream_rpc_client_continuation_token *m_continuationToken; ContinuationCallbackData *m_callbackData; + static void s_onContinuationMessage( struct aws_event_stream_rpc_client_continuation_token *continuationToken, const struct aws_event_stream_rpc_message_args *messageArgs, @@ -338,6 +387,9 @@ namespace Aws void *userData) noexcept; }; + /** + * Base class for types used by operations. + */ class AWS_EVENTSTREAMRPC_API AbstractShapeBase { public: @@ -351,6 +403,9 @@ namespace Aws Crt::Allocator *m_allocator; }; + /** + * Base class for errors used by operations. + */ class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase { public: @@ -394,6 +449,9 @@ namespace Aws RPC_ERROR }; + /** + * A wrapper for operation result. + */ class AWS_EVENTSTREAMRPC_API TaggedResult { public: @@ -410,9 +468,28 @@ namespace Aws */ operator bool() const noexcept; + /** + * Get operation result. + * @return A pointer to the resulting object in case of success, nullptr otherwise. + */ AbstractShapeBase *GetOperationResponse() const noexcept; + + /** + * Get error for a failed operation. + * @return A pointer to the error object in case of failure, nullptr otherwise. + */ OperationError *GetOperationError() const noexcept; + + /** + * Get RPC-level error. + * @return A pointer to the error object in case of RPC-level failure, nullptr otherwise. + */ RpcError GetRpcError() const noexcept; + + /** + * Get the type of the result with which the operation has completed. + * @return Result type. + */ ResultType GetResultType() const noexcept { return m_responseType; } private: @@ -463,20 +540,67 @@ namespace Aws Crt::Allocator *allocator) const noexcept = 0; }; + /** + * All generated model types implement this interface, including errors. + */ class AWS_EVENTSTREAMRPC_API OperationModelContext { public: OperationModelContext(const ServiceModel &serviceModel) noexcept; + + /** + * Parse the given string into an initial response object. + * @param stringView String to parse the response from. + * @param allocator Allocator to use. + * @return The initial response object. + */ virtual Crt::ScopedResource AllocateInitialResponseFromPayload( Crt::StringView stringView, Crt::Allocator *allocator) const noexcept = 0; + + /** + * Parse the given string into a streaming response object. + * @param stringView String to parse the response from. + * @param allocator Allocator to use. + * @return The streaming response object. + */ virtual Crt::ScopedResource AllocateStreamingResponseFromPayload( Crt::StringView stringView, Crt::Allocator *allocator) const noexcept = 0; + + /** + * Get the initial response type name. + * @return The initial response type name. + */ virtual Crt::String GetInitialResponseModelName() const noexcept = 0; + + /** + * Get the request type name. + * @return The request type name. + */ virtual Crt::String GetRequestModelName() const noexcept = 0; + + /** + * Get the streaming response type name. + * @return The streaming response type name. + */ virtual Crt::Optional GetStreamingResponseModelName() const noexcept = 0; + + /** + * Returns the canonical operation name associated with this context across any client language. + * Namespace included. + * Example: aws.greengrass#SubscribeToTopic + * @return The canonical operation name associated with this context across any client language. + */ virtual Crt::String GetOperationName() const noexcept = 0; + + /** + * Parse the given string into an operation error. + * @param errorModelName The model name. + * @param stringView String to parse the error from. + * @param allocator Allocator to use. + * @return The operation error. + */ Crt::ScopedResource AllocateOperationErrorFromPayload( const Crt::String &errorModelName, Crt::StringView stringView, @@ -489,6 +613,9 @@ namespace Aws const ServiceModel &m_serviceModel; }; + /** + * Interface for an RPC operation. + */ class AWS_EVENTSTREAMRPC_API ClientOperation : public ClientContinuationHandler { public: @@ -504,18 +631,48 @@ namespace Aws bool operator=(const ClientOperation &clientOperation) noexcept = delete; bool operator=(ClientOperation &&clientOperation) noexcept = delete; + /** + * Close the stream on which operation is sent. + * @note This function sends a message with the message flag set to terminate the stream. + * @param onMessageFlushCallback Callback to invoke when the closing message is flushed to the underlying + * transport. + * @return Future which will be resolved once the message is sent. + */ std::future Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept; + + /** + * Get an operation result. + * @return Future which will be resolved when the corresponding RPC request completes. + */ std::future GetOperationResult() noexcept; + + /** + * Set the launch mode for executing operations. The mode is set to std::launch::deferred by default. + * @param mode The launch mode to use. + */ void WithLaunchMode(std::launch mode) noexcept; protected: + /** + * Initiate a new client stream. Send the shape for the new stream. + * @param shape A parameter for RPC operation. + * @param onMessageFlushCallback Callback to invoke when the shape is flushed to the underlying transport. + * @return Future which will be resolved once the message is sent. + */ std::future Activate( const AbstractShapeBase *shape, OnMessageFlushCallback onMessageFlushCallback) noexcept; std::future SendStreamEvent( AbstractShapeBase *shape, OnMessageFlushCallback onMessageFlushCallback) noexcept; + + /** + * Returns the canonical model name associated with this operation across any client language. + * Namespace included. + * @return The model name. + */ virtual Crt::String GetModelName() const noexcept = 0; + const OperationModelContext &m_operationModelContext; std::launch m_asyncLaunchMode; @@ -566,6 +723,9 @@ namespace Aws std::condition_variable m_closeReady; }; + /** + * Class representing a connection to an RPC server. + */ class AWS_EVENTSTREAMRPC_API ClientConnection final { public: @@ -576,6 +736,13 @@ namespace Aws ClientConnection(ClientConnection &&) noexcept; ClientConnection &operator=(ClientConnection &&) noexcept; + /** + * Initiates a new outgoing event-stream-rpc connection. + * @param connectionOptions Connection options. + * @param connectionLifecycleHandler Handler to process connection lifecycle events. + * @param clientBootstrap ClientBootstrap object to run the connection on. + * @return Future that will be resolved when connection either succeeds or fails. + */ std::future Connect( const ConnectionConfig &connectionOptions, ConnectionLifecycleHandler *connectionLifecycleHandler, @@ -591,10 +758,23 @@ namespace Aws const Crt::Optional &payload, OnMessageFlushCallback onMessageFlushCallback) noexcept; + /** + * Create a new stream. + * @note Activate() must be called on the stream for it to actually initiate the new stream. + * @param clientContinuationHandler Handler to process continuation events. + * @return A newly created continuation. + */ ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept; + /** + * Close the connection. + */ void Close() noexcept; + /** + * Check if the connection is open. + * @return True if the connection is open, false otherwise. + */ bool IsOpen() const noexcept { if (this->m_underlyingConnection == nullptr) @@ -608,7 +788,7 @@ namespace Aws } /** - * @return true if the instance is in a valid state, false otherwise. + * @return true if the connection is open, false otherwise. */ operator bool() const noexcept { return IsOpen(); } @@ -660,6 +840,10 @@ namespace Aws void *userData) noexcept; static void s_protocolMessageCallback(int errorCode, void *userData) noexcept; + + /** + * Sends a message on the connection. These must be connection level messages (not application messages). + */ static std::future s_sendProtocolMessage( ClientConnection *connection, const Crt::List &headers,