Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments and readme file for eventstream module #700

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions eventstream_rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Eventstream RPC

This module implements client for eventstream RPC. The eventstream RPC client provides a base for defining custom RPC
operations and implements interaction with RPC server.

## Client Connection

`ClientConnection` class represents the connection to the RPC server. It's responsible for connection lifetime and
can send connection-level messages (e.g. sending PING or opening a new stream).

## Client Stream/Continuation

Application-level messages are supposed to be sent over streams. `ClientConnection::NewStream` creates a new stream which is represented
by the `ClientContinuation` class.
The new stream must be activated with `ClientContinuation::Activate` method. This method initiates sending messages over the wire.

Application-level messages can be sent using `ClientContinuation::SendMessage` method.

## Client Operation

Based upon a client continuation, this entity represents an abstract operation that the client can perform. Concrete operations
should be implemented as classes inherited from the `ClientOperation` class.

Each operation has input data (referred as **Request**) and output data (referred as **Response**).
Requests are sent by the `Activate` method.

### Example of a Client Operation

[DeleteThingShadowOperation](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html)
defines an RPC operation for deleting a thing shadow.

[DeleteThingShadowOperation::Activate](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a7aca4de69329780dfa9ff17315e68b23)
takes [DeleteThingShadowRequest](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_request.html)
instance as an argument and sends it to the server.

After the request is done, you can use [DeleteThingShadowOperation::GetResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_operation.html#a261fa97489a6e2015e11f7c25aa13ee9)
to get an instance of the [DeleteThingShadowResult](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html)
class, an auxiliary wrapper class returning a response in case of success or an error in case of failure. To obtain
an actual response, use [DeleteThingShadowResult::GetOperationResponse](https://aws.github.io/aws-iot-device-sdk-cpp-v2/class_aws_1_1_greengrass_1_1_delete_thing_shadow_result.html#a6f4c4987311e016d20cf8e82808181fd)
method.
188 changes: 186 additions & 2 deletions eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ namespace Aws
Crt::String StatusToString();
};

/**
* Handler interface for connection lifecycle events.
*/
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
{
public:
Expand Down Expand Up @@ -255,7 +258,9 @@ namespace Aws
const Crt::Optional<Crt::ByteBuf> &payload);
};

/* User data passed to callbacks for a new stream. */
/**
* User data passed to callbacks for a new stream.
*/
class AWS_EVENTSTREAMRPC_API ContinuationCallbackData
{
public:
Expand All @@ -273,6 +278,9 @@ namespace Aws
Crt::Allocator *allocator;
};

/**
* Handler interface for continuation events.
*/
class AWS_EVENTSTREAMRPC_API ClientContinuationHandler
{
public:
Expand All @@ -299,23 +307,63 @@ namespace Aws
ContinuationCallbackData *m_callbackData;
};

/**
* A wrapper for event-stream-rpc client continuation.
*/
class AWS_EVENTSTREAMRPC_API ClientContinuation final
{
public:
/**
* Create a new continuation.
*
* @note continuation_option's callbacks will not be invoked, and nothing will be sent across
* the wire until Activate() is invoked.
* @param connection Connection on which open a new stream.
* @param continuationHandler A set of callbacks that will be invoked for continuation events.
* @param allocator Allocator to use.
*/
ClientContinuation(
ClientConnection *connection,
ClientContinuationHandler &continuationHandler,
Crt::Allocator *allocator) noexcept;
~ClientContinuation() noexcept;

/**
* Initiate a new client stream. Send new message for the new stream.
* @param operation Name for the operation to be invoked by the peer endpoint.
* @param headers Headers for the eventstream message.
* @param payload Payload for the eventstream message.
* @param messageType Message type for the message.
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
* transport.
* @return Future that will be resolved when the message has either been written to the wire or it fails.
*/
std::future<RpcError> Activate(
const Crt::String &operation,
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
MessageType messageType,
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Check if the continuation has been closed.
* @return True if the continuation has been closed, false otherwise.
*/
bool IsClosed() noexcept;
void Release() noexcept;
bretambrose marked this conversation as resolved.
Show resolved Hide resolved

/**
* Send message on the continuation.
* @param headers List of additional event stream headers to include on the message.
* @param payload Message payload.
* @param messageType Message type for the message.
* @param messageFlags Bitmask of aws_event_stream_rpc_message_flag values.
* @param onMessageFlushCallback Callback to be invoked upon the message being flushed to the underlying
* transport.
* @return Future that will be resolved when the message has either been written to the wire or it fails.
*/
std::future<RpcError> SendMessage(
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
Expand All @@ -329,6 +377,7 @@ namespace Aws
ClientContinuationHandler &m_continuationHandler;
struct aws_event_stream_rpc_client_continuation_token *m_continuationToken;
ContinuationCallbackData *m_callbackData;

static void s_onContinuationMessage(
struct aws_event_stream_rpc_client_continuation_token *continuationToken,
const struct aws_event_stream_rpc_message_args *messageArgs,
Expand All @@ -338,6 +387,9 @@ namespace Aws
void *userData) noexcept;
};

/**
* Base class for types used by operations.
*/
class AWS_EVENTSTREAMRPC_API AbstractShapeBase
{
public:
Expand All @@ -351,6 +403,9 @@ namespace Aws
Crt::Allocator *m_allocator;
};

/**
* Base class for errors used by operations.
*/
class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase
{
public:
Expand Down Expand Up @@ -394,6 +449,9 @@ namespace Aws
RPC_ERROR
};

/**
* A wrapper for operation result.
*/
class AWS_EVENTSTREAMRPC_API TaggedResult
{
public:
Expand All @@ -410,9 +468,28 @@ namespace Aws
*/
operator bool() const noexcept;

/**
* Get operation result.
* @return A pointer to the resulting object in case of success, nullptr otherwise.
*/
AbstractShapeBase *GetOperationResponse() const noexcept;

/**
* Get error for a failed operation.
* @return A pointer to the error object in case of failure, nullptr otherwise.
*/
OperationError *GetOperationError() const noexcept;

/**
* Get RPC-level error.
* @return A pointer to the error object in case of RPC-level failure, nullptr otherwise.
*/
RpcError GetRpcError() const noexcept;

/**
* Get the type of the result with which the operation has completed.
* @return Result type.
*/
ResultType GetResultType() const noexcept { return m_responseType; }

private:
Expand Down Expand Up @@ -463,20 +540,67 @@ namespace Aws
Crt::Allocator *allocator) const noexcept = 0;
};

/**
* All generated model types implement this interface, including errors.
*/
class AWS_EVENTSTREAMRPC_API OperationModelContext
{
public:
OperationModelContext(const ServiceModel &serviceModel) noexcept;

/**
* Parse the given string into an initial response object.
* @param stringView String to parse the response from.
* @param allocator Allocator to use.
* @return The initial response object.
*/
virtual Crt::ScopedResource<AbstractShapeBase> AllocateInitialResponseFromPayload(
Crt::StringView stringView,
Crt::Allocator *allocator) const noexcept = 0;

/**
* Parse the given string into a streaming response object.
* @param stringView String to parse the response from.
* @param allocator Allocator to use.
* @return The streaming response object.
*/
virtual Crt::ScopedResource<AbstractShapeBase> AllocateStreamingResponseFromPayload(
Crt::StringView stringView,
Crt::Allocator *allocator) const noexcept = 0;

/**
* Get the initial response type name.
* @return The initial response type name.
*/
virtual Crt::String GetInitialResponseModelName() const noexcept = 0;

/**
* Get the request type name.
* @return The request type name.
*/
virtual Crt::String GetRequestModelName() const noexcept = 0;

/**
* Get the streaming response type name.
* @return The streaming response type name.
*/
virtual Crt::Optional<Crt::String> GetStreamingResponseModelName() const noexcept = 0;

/**
* Returns the canonical operation name associated with this context across any client language.
* Namespace included.
* Example: aws.greengrass#SubscribeToTopic
* @return The canonical operation name associated with this context across any client language.
*/
virtual Crt::String GetOperationName() const noexcept = 0;

/**
* Parse the given string into an operation error.
* @param errorModelName The model name.
* @param stringView String to parse the error from.
* @param allocator Allocator to use.
* @return The operation error.
*/
Crt::ScopedResource<OperationError> AllocateOperationErrorFromPayload(
const Crt::String &errorModelName,
Crt::StringView stringView,
Expand All @@ -489,6 +613,9 @@ namespace Aws
const ServiceModel &m_serviceModel;
};

/**
* Interface for an RPC operation.
*/
class AWS_EVENTSTREAMRPC_API ClientOperation : public ClientContinuationHandler
{
public:
Expand All @@ -504,18 +631,48 @@ namespace Aws
bool operator=(const ClientOperation &clientOperation) noexcept = delete;
bool operator=(ClientOperation &&clientOperation) noexcept = delete;

/**
* Close the stream on which operation is sent.
* @note This function sends a message with the message flag set to terminate the stream.
* @param onMessageFlushCallback Callback to invoke when the closing message is flushed to the underlying
* transport.
* @return Future which will be resolved once the message is sent.
*/
std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;

/**
* Get an operation result.
* @return Future which will be resolved when the corresponding RPC request completes.
*/
std::future<TaggedResult> GetOperationResult() noexcept;

/**
* Set the launch mode for executing operations. The mode is set to std::launch::deferred by default.
* @param mode The launch mode to use.
*/
void WithLaunchMode(std::launch mode) noexcept;

protected:
/**
* Initiate a new client stream. Send the shape for the new stream.
* @param shape A parameter for RPC operation.
* @param onMessageFlushCallback Callback to invoke when the shape is flushed to the underlying transport.
* @return Future which will be resolved once the message is sent.
*/
std::future<RpcError> Activate(
const AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
std::future<RpcError> SendStreamEvent(
AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Returns the canonical model name associated with this operation across any client language.
* Namespace included.
* @return The model name.
*/
virtual Crt::String GetModelName() const noexcept = 0;

const OperationModelContext &m_operationModelContext;
std::launch m_asyncLaunchMode;

Expand Down Expand Up @@ -566,6 +723,9 @@ namespace Aws
std::condition_variable m_closeReady;
};

/**
* Class representing a connection to an RPC server.
*/
class AWS_EVENTSTREAMRPC_API ClientConnection final
{
public:
Expand All @@ -576,6 +736,13 @@ namespace Aws
ClientConnection(ClientConnection &&) noexcept;
ClientConnection &operator=(ClientConnection &&) noexcept;

/**
* Initiates a new outgoing event-stream-rpc connection.
* @param connectionOptions Connection options.
* @param connectionLifecycleHandler Handler to process connection lifecycle events.
* @param clientBootstrap ClientBootstrap object to run the connection on.
* @return Future that will be resolved when connection either succeeds or fails.
*/
std::future<RpcError> Connect(
const ConnectionConfig &connectionOptions,
ConnectionLifecycleHandler *connectionLifecycleHandler,
Expand All @@ -591,10 +758,23 @@ namespace Aws
const Crt::Optional<Crt::ByteBuf> &payload,
OnMessageFlushCallback onMessageFlushCallback) noexcept;

/**
* Create a new stream.
* @note Activate() must be called on the stream for it to actually initiate the new stream.
* @param clientContinuationHandler Handler to process continuation events.
* @return A newly created continuation.
*/
ClientContinuation NewStream(ClientContinuationHandler &clientContinuationHandler) noexcept;

/**
* Close the connection.
*/
void Close() noexcept;

/**
* Check if the connection is open.
* @return True if the connection is open, false otherwise.
*/
bool IsOpen() const noexcept
{
if (this->m_underlyingConnection == nullptr)
Expand All @@ -608,7 +788,7 @@ namespace Aws
}

/**
* @return true if the instance is in a valid state, false otherwise.
* @return true if the connection is open, false otherwise.
bretambrose marked this conversation as resolved.
Show resolved Hide resolved
*/
operator bool() const noexcept { return IsOpen(); }

Expand Down Expand Up @@ -660,6 +840,10 @@ namespace Aws
void *userData) noexcept;

static void s_protocolMessageCallback(int errorCode, void *userData) noexcept;

/**
* Sends a message on the connection. These must be connection level messages (not application messages).
*/
static std::future<RpcError> s_sendProtocolMessage(
ClientConnection *connection,
const Crt::List<EventStreamHeader> &headers,
Expand Down
Loading