Skip to content

Commit

Permalink
Merge branch 'main' into add-sanitizer-ci-job
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g authored Apr 2, 2024
2 parents c6f2980 + 3fda289 commit 40080ac
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 2 deletions.
40 changes: 40 additions & 0 deletions eventstream_rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Eventstream RPC

This module implements client for eventstream RPC. The eventstream RPC client provides a base for defining custom RPC
operations and implements interaction with RPC server.

## Client Connection

`ClientConnection` class represents the connection to the RPC server. It's responsible for connection lifetime and
can send connection-level messages (e.g. sending PING or opening a new stream).

## Client Stream/Continuation

Application-level messages are supposed to be sent over streams. `ClientConnection::NewStream` creates a new stream which is represented
by the `ClientContinuation` class.
The new stream must be activated with `ClientContinuation::Activate` method. This method initiates sending messages over the wire.

Application-level messages can be sent using `ClientContinuation::SendMessage` method.

## Client Operation

Based upon a client continuation, this entity represents an abstract operation that the client can perform. Concrete operations
should be implemented as classes inherited from the `ClientOperation` class.

Each operation has input data (referred as **Request**) and output data (referred as **Response**).
Requests are sent by the `Activate` method.

### Example of a Client Operation

[DeleteThingShadowOperation](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html)
defines an RPC operation for deleting a thing shadow.

[DeleteThingShadowOperation::Activate](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a7aca4de69329780dfa9ff17315e68b23)
takes [DeleteThingShadowRequest](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_request.html)
instance as an argument and sends it to the server.

After the request is done, you can use [DeleteThingShadowOperation::GetResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a261fa97489a6e2015e11f7c25aa13ee9)
to get an instance of the [DeleteThingShadowResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html)
class, an auxiliary wrapper class returning a response in case of success or an error in case of failure. To obtain
an actual response, use [DeleteThingShadowResult::GetOperationResponse](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html#a6f4c4987311e016d20cf8e82808181fd)
method.
188 changes: 186 additions & 2 deletions eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ namespace Aws
Crt::String StatusToString();
};

/**
* Handler interface for connection lifecycle events.
*/
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
{
public:
Expand Down Expand Up @@ -255,7 +258,9 @@ namespace Aws
const Crt::Optional<Crt::ByteBuf> &payload);
};

/* User data passed to callbacks for a new stream. */
/**
* User data passed to callbacks for a new stream.
*/
class AWS_EVENTSTREAMRPC_API ContinuationCallbackData
{
public:
Expand All @@ -273,6 +278,9 @@ namespace Aws
Crt::Allocator *allocator;
};

/**
* Handler interface for continuation events.
*/
class AWS_EVENTSTREAMRPC_API ClientContinuationHandler
{
public:
Expand All @@ -299,23 +307,63 @@ namespace Aws
ContinuationCallbackData *m_callbackData;
};

/**
* A wrapper for event-stream-rpc client continuation.
*/
class AWS_EVENTSTREAMRPC_API ClientContinuation final
{
public:
/**
* Create a new continuation.
*
* @note continuation_option's callbacks will not be invoked, and nothing will be sent across
* the wire until Activate() is invoked.
* @param connection Connection on which open a new stream.
* @param continuationHandler A set of callbacks that will be invoked for continuation events.
* @param allocator Allocator to use.
*/
ClientContinuation(
ClientConnection *connection,
ClientContinuationHandler &continuationHandler,
Crt::Allocator *allocator) noexcept;
~ClientContinuation() noexcept;

/**
* Initiate a new client stream. Send new message for the new stream.
* @param operation Name for the operation to be invoked by the peer endpoint.
* @param headers Headers for the eventstream message.
* @param payload Payload for the eventstream message.
* @param messageType Message type for the message.
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
* transport.
* @return Future that will be resolved when the message has either been written to the wire or it fails.
*/
std::future<RpcError> Activate(
const Crt::String &operation,
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
MessageType messageType,
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Check if the continuation has been closed.
* @return True if the continuation has been closed, false otherwise.
*/
bool IsClosed() noexcept;
void Release() noexcept;

/**
* Send message on the continuation.
* @param headers List of additional event stream headers to include on the message.
* @param payload Message payload.
* @param messageType Message type for the message.
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
* transport.
* @return Future that will be resolved when the message has either been written to the wire or it fails.
*/
std::future<RpcError> SendMessage(
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
Expand All @@ -329,6 +377,7 @@ namespace Aws
ClientContinuationHandler &m_continuationHandler;
struct aws_event_stream_rpc_client_continuation_token *m_continuationToken;
ContinuationCallbackData *m_callbackData;

static void s_onContinuationMessage(
struct aws_event_stream_rpc_client_continuation_token *continuationToken,
const struct aws_event_stream_rpc_message_args *messageArgs,
Expand All @@ -338,6 +387,9 @@ namespace Aws
void *userData) noexcept;
};

/**
* Base class for types used by operations.
*/
class AWS_EVENTSTREAMRPC_API AbstractShapeBase
{
public:
Expand All @@ -351,6 +403,9 @@ namespace Aws
Crt::Allocator *m_allocator;
};

/**
* Base class for errors used by operations.
*/
class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase
{
public:
Expand Down Expand Up @@ -394,6 +449,9 @@ namespace Aws
RPC_ERROR
};

/**
* A wrapper for operation result.
*/
class AWS_EVENTSTREAMRPC_API TaggedResult
{
public:
Expand All @@ -410,9 +468,28 @@ namespace Aws
*/
operator bool() const noexcept;

/**
* Get operation result.
* @return A pointer to the resulting object in case of success, nullptr otherwise.
*/
AbstractShapeBase *GetOperationResponse() const noexcept;

/**
* Get error for a failed operation.
* @return A pointer to the error object in case of failure, nullptr otherwise.
*/
OperationError *GetOperationError() const noexcept;

/**
* Get RPC-level error.
* @return A pointer to the error object in case of RPC-level failure, nullptr otherwise.
*/
RpcError GetRpcError() const noexcept;

/**
* Get the type of the result with which the operation has completed.
* @return Result type.
*/
ResultType GetResultType() const noexcept { return m_responseType; }

private:
Expand Down Expand Up @@ -463,20 +540,67 @@ namespace Aws
Crt::Allocator *allocator) const noexcept = 0;
};

/**
* All generated model types implement this interface, including errors.
*/
class AWS_EVENTSTREAMRPC_API OperationModelContext
{
public:
OperationModelContext(const ServiceModel &serviceModel) noexcept;

/**
* Parse the given string into an initial response object.
* @param stringView String to parse the response from.
* @param allocator Allocator to use.
* @return The initial response object.
*/
virtual Crt::ScopedResource<AbstractShapeBase> AllocateInitialResponseFromPayload(
Crt::StringView stringView,
Crt::Allocator *allocator) const noexcept = 0;

/**
* Parse the given string into a streaming response object.
* @param stringView String to parse the response from.
* @param allocator Allocator to use.
* @return The streaming response object.
*/
virtual Crt::ScopedResource<AbstractShapeBase> AllocateStreamingResponseFromPayload(
Crt::StringView stringView,
Crt::Allocator *allocator) const noexcept = 0;

/**
* Get the initial response type name.
* @return The initial response type name.
*/
virtual Crt::String GetInitialResponseModelName() const noexcept = 0;

/**
* Get the request type name.
* @return The request type name.
*/
virtual Crt::String GetRequestModelName() const noexcept = 0;

/**
* Get the streaming response type name.
* @return The streaming response type name.
*/
virtual Crt::Optional<Crt::String> GetStreamingResponseModelName() const noexcept = 0;

/**
* Returns the canonical operation name associated with this context across any client language.
* Namespace included.
* Example: aws.greengrass#SubscribeToTopic
* @return The canonical operation name associated with this context across any client language.
*/
virtual Crt::String GetOperationName() const noexcept = 0;

/**
* Parse the given string into an operation error.
* @param errorModelName The model name.
* @param stringView String to parse the error from.
* @param allocator Allocator to use.
* @return The operation error.
*/
Crt::ScopedResource<OperationError> AllocateOperationErrorFromPayload(
const Crt::String &errorModelName,
Crt::StringView stringView,
Expand All @@ -489,6 +613,9 @@ namespace Aws
const ServiceModel &m_serviceModel;
};

/**
* Interface for an RPC operation.
*/
class AWS_EVENTSTREAMRPC_API ClientOperation : public ClientContinuationHandler
{
public:
Expand All @@ -504,18 +631,48 @@ namespace Aws
bool operator=(const ClientOperation &clientOperation) noexcept = delete;
bool operator=(ClientOperation &&clientOperation) noexcept = delete;

/**
* Close the stream on which operation is sent.
* @note This function sends a message with the message flag set to terminate the stream.
* @param onMessageFlushCallback Callback to invoke when the closing message is flushed to the underlying
* transport.
* @return Future which will be resolved once the message is sent.
*/
std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;

/**
* Get an operation result.
* @return Future which will be resolved when the corresponding RPC request completes.
*/
std::future<TaggedResult> GetOperationResult() noexcept;

/**
* Set the launch mode for executing operations. The mode is set to std::launch::deferred by default.
* @param mode The launch mode to use.
*/
void WithLaunchMode(std::launch mode) noexcept;

protected:
/**
* Initiate a new client stream. Send the shape for the new stream.
* @param shape A parameter for RPC operation.
* @param onMessageFlushCallback Callback to invoke when the shape is flushed to the underlying transport.
* @return Future which will be resolved once the message is sent.
*/
std::future<RpcError> Activate(
const AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
std::future<RpcError> SendStreamEvent(
AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Returns the canonical model name associated with this operation across any client language.
* Namespace included.
* @return The model name.
*/
virtual Crt::String GetModelName() const noexcept = 0;

const OperationModelContext &m_operationModelContext;
std::launch m_asyncLaunchMode;

Expand Down Expand Up @@ -566,6 +723,9 @@ namespace Aws
std::condition_variable m_closeReady;
};

/**
* Class representing a connection to an RPC server.
*/
class AWS_EVENTSTREAMRPC_API ClientConnection final
{
public:
Expand All @@ -576,6 +736,13 @@ namespace Aws
ClientConnection(ClientConnection &&) noexcept;
ClientConnection &operator=(ClientConnection &&) noexcept;

/**
* Initiates a new outgoing event-stream-rpc connection.
* @param connectionOptions Connection options.
* @param connectionLifecycleHandler Handler to process connection lifecycle events.
* @param clientBootstrap ClientBootstrap object to run the connection on.
* @return Future that will be resolved when connection either succeeds or fails.
*/
std::future<RpcError> Connect(
const ConnectionConfig &connectionOptions,
ConnectionLifecycleHandler *connectionLifecycleHandler,
Expand All @@ -591,10 +758,23 @@ namespace Aws
const Crt::Optional<Crt::ByteBuf> &payload,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Create a new stream.
* @note Activate() must be called on the stream for it to actually initiate the new stream.
* @param clientContinuationHandler Handler to process continuation events.
* @return A newly created continuation.
*/
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;

/**
* Close the connection.
*/
void Close() noexcept;

/**
* Check if the connection is open.
* @return True if the connection is open, false otherwise.
*/
bool IsOpen() const noexcept
{
if (this->m_underlyingConnection == nullptr)
Expand All @@ -608,7 +788,7 @@ namespace Aws
}

/**
* @return true if the instance is in a valid state, false otherwise.
* @return true if the connection is open, false otherwise.
*/
operator bool() const noexcept { return IsOpen(); }

Expand Down Expand Up @@ -660,6 +840,10 @@ namespace Aws
void *userData) noexcept;

static void s_protocolMessageCallback(int errorCode, void *userData) noexcept;

/**
* Sends a message on the connection. These must be connection level messages (not application messages).
*/
static std::future<RpcError> s_sendProtocolMessage(
ClientConnection *connection,
const Crt::List<EventStreamHeader> &headers,
Expand Down

0 comments on commit 40080ac

Please sign in to comment.