From 0327dcfe6fcf4ebbaff12ca1798cf99beda993af Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 19 Sep 2024 12:50:31 -0700 Subject: [PATCH] Updates --- crt/aws-crt-cpp | 2 +- samples/shadow/v2/main.cpp | 222 +++++++++++++++--- ...otShadowClientv2.h => IotShadowClientV2.h} | 29 +-- ...adowClientv2.cpp => IotShadowClientV2.cpp} | 201 ++++++++-------- 4 files changed, 298 insertions(+), 156 deletions(-) rename shadow/include/aws/iotshadow/{IotShadowClientv2.h => IotShadowClientV2.h} (71%) rename shadow/source/{IotShadowClientv2.cpp => IotShadowClientV2.cpp} (64%) diff --git a/crt/aws-crt-cpp b/crt/aws-crt-cpp index 806b0694f..dfdd59a78 160000 --- a/crt/aws-crt-cpp +++ b/crt/aws-crt-cpp @@ -1 +1 @@ -Subproject commit 806b0694f6cf4a354872f41fb64070f428e0844b +Subproject commit dfdd59a7896ccab047d630cd6f38a588857fbcae diff --git a/samples/shadow/v2/main.cpp b/samples/shadow/v2/main.cpp index 47e3178cf..575ded785 100644 --- a/samples/shadow/v2/main.cpp +++ b/samples/shadow/v2/main.cpp @@ -7,18 +7,22 @@ #include #include -#include +#include #include #include #include #include +#include +#include +#include +#include #include #include #include #include +#include #include -#include #include #include "../../utils/CommandLineUtils.h" @@ -26,6 +30,28 @@ using namespace Aws::Crt; using namespace Aws::Iotshadow; +struct StreamingOperationWrapper { + Aws::Crt::String m_thingName; + + Aws::Crt::String m_shadowName; + + Aws::Crt::String m_type; + + std::shared_ptr m_stream; +}; + +struct ApplicationContext { + + std::shared_ptr m_protocolClient; + + std::shared_ptr m_shadowClient; + + uint64_t m_nextStreamId; + + std::unordered_map m_streams; +}; + + static void s_onConnectionSuccess(const Mqtt5::OnConnectionSuccessEventData &eventData) { fprintf( stdout, @@ -40,7 +66,6 @@ static void s_onStopped(const Mqtt5::OnStoppedEventData &event) { fprintf(stdout, "Protocol client stopped.\n"); } - static Aws::Crt::String s_nibbleNextToken(Aws::Crt::String &input) { Aws::Crt::String token; Aws::Crt::String remaining; @@ -68,17 +93,21 @@ static void s_printHelp() { fprintf(stdout, "\nShadow sandbox:\n\n"); fprintf(stdout, " quit -- quits the program\n"); fprintf(stdout, " start -- starts the protocol client\n"); - fprintf(stdout, " stop -- stops the protocol client\n"); + fprintf(stdout, " stop -- stops the protocol client\n\n"); fprintf(stdout, " get -- gets the state of a named shadow belonging to the specified thing\n"); fprintf(stdout, " delete -- deletes a named shadow belonging to the specified thing\n"); fprintf(stdout, " update-desired -- updates the desired state of a named shadow belonging to the specified thing\n"); - fprintf(stdout, " update-reported -- updates the reported state a named shadow belonging to the specified thing\n"); + fprintf(stdout, " update-reported -- updates the reported state a named shadow belonging to the specified thing\n\n"); + fprintf(stdout, " list-streams -- lists all open streaming operations\n"); + fprintf(stdout, " open-delta-stream -- opens a new streaming operation that receives delta events about changes to a particular shadow belonging to a thing\n"); + fprintf(stdout, " open-document-stream -- opens a new streaming operation that receives document events about changes to a particular shadow belonging to a thing\n"); + fprintf(stdout, " close-stream -- closes a streaming operation\n"); } static void s_onServiceError(const Aws::Iotshadow::ServiceErrorV2 &serviceError, Aws::Crt::String operationName) { - fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.getErrorCode())); - if (serviceError.hasModeledError()) { - const auto &modeledError = serviceError.getModeledError(); + fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.GetErrorCode())); + if (serviceError.HasModeledError()) { + const auto &modeledError = serviceError.GetModeledError(); Aws::Crt::JsonObject jsonObject; modeledError.SerializeToObject(jsonObject); @@ -88,15 +117,15 @@ static void s_onServiceError(const Aws::Iotshadow::ServiceErrorV2 &protocolClient, const std::shared_ptr &shadowClient) { +static void s_handleListStreams(const ApplicationContext &context) { + fprintf(stdout, "Streams:\n"); + for (const auto &iter : context.m_streams) { + uint64_t streamId = iter.first; + const StreamingOperationWrapper &wrapper = iter.second; + fprintf(stdout, " %" PRIu64": type '%s', thing '%s', shadow '%s'\n", streamId, wrapper.m_type.c_str(), wrapper.m_thingName.c_str(), wrapper.m_shadowName.c_str()); + } +} + +static void s_handleCloseStream(const Aws::Crt::String params, ApplicationContext &context) { + Aws::Crt::String remaining = params; + Aws::Crt::String streamId = s_nibbleNextToken(remaining); + + if (streamId.length() == 0) { + fprintf(stdout, "Invalid arguments to close-stream command!\n\n"); + s_printHelp(); + return; + } + + uint64_t id = std::stoull(streamId.c_str()); + fprintf(stdout, "Closing stream %" PRIu64 "\n", id); + context.m_streams.erase(id); +} + +static void s_registerStream(ApplicationContext &context, uint64_t id, std::shared_ptr operation, Aws::Crt::String type, Aws::Crt::String thing, Aws::Crt::String shadow) { + StreamingOperationWrapper wrapper; + wrapper.m_stream = operation; + wrapper.m_type = type; + wrapper.m_thingName = thing; + wrapper.m_shadowName = shadow; + + context.m_streams[id] = wrapper; + + operation->Open(); +} + +static void s_onSubscriptionStatusEvent(uint64_t id, Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + fprintf(stdout, "Stream %" PRIu64 ": subscription status event with type %d and error %s\n", id, event.GetErrorCode(), Aws::Crt::ErrorDebugString(event.GetErrorCode())); +} + +static void s_onShadowDeltaUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { + fprintf(stdout, "Stream %" PRIu64 ": received shadow delta updated event:\n", id); + + Aws::Crt::JsonObject jsonObject; + event.SerializeToObject(jsonObject); + Aws::Crt::String json = jsonObject.View().WriteCompact(true); + fprintf(stdout, " %s\n", json.c_str()); +} + +static void s_handleOpenDeltaStream(const Aws::Crt::String params, ApplicationContext &context) { + Aws::Crt::String remaining = params; + Aws::Crt::String thing = s_nibbleNextToken(remaining); + Aws::Crt::String shadow = s_nibbleNextToken(remaining); + + if (thing.length() == 0 || shadow.length() == 0) { + fprintf(stdout, "Invalid arguments to open-delta-stream command!\n\n"); + s_printHelp(); + return; + } + + uint64_t streamId = context.m_nextStreamId++; + + Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest request; + request.ThingName = thing; + request.ShadowName = shadow; + + Aws::Iot::RequestResponse::StreamingOperationOptions options; + options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + s_onSubscriptionStatusEvent(streamId, std::move(event)); + }); + options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { + s_onShadowDeltaUpdatedEvent(streamId, std::move(event)); + }); + + auto operation = context.m_shadowClient->CreateNamedShadowDeltaUpdatedStream(request, options); + s_registerStream(context, streamId, operation, "Delta", thing, shadow); +} + +static void s_onShadowUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowUpdatedEvent &&event) { + fprintf(stdout, "Stream %" PRIu64 ": received shadow updated event:\n", id); + + Aws::Crt::JsonObject jsonObject; + event.SerializeToObject(jsonObject); + Aws::Crt::String json = jsonObject.View().WriteCompact(true); + fprintf(stdout, " %s\n", json.c_str()); +} + +static void s_handleOpenDocumentStream(const Aws::Crt::String params, ApplicationContext &context) { + Aws::Crt::String remaining = params; + Aws::Crt::String thing = s_nibbleNextToken(remaining); + Aws::Crt::String shadow = s_nibbleNextToken(remaining); + + if (thing.length() == 0 || shadow.length() == 0) { + fprintf(stdout, "Invalid arguments to open-document-stream command!\n\n"); + s_printHelp(); + return; + } + + uint64_t streamId = context.m_nextStreamId++; + + Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest request; + request.ThingName = thing; + request.ShadowName = shadow; + + Aws::Iot::RequestResponse::StreamingOperationOptions options; + options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + s_onSubscriptionStatusEvent(streamId, std::move(event)); + }); + options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowUpdatedEvent &&event) { + s_onShadowUpdatedEvent(streamId, std::move(event)); + }); + + auto operation = context.m_shadowClient->CreateNamedShadowUpdatedStream(request, options); + s_registerStream(context, streamId, operation, "Document", thing, shadow); +} + +static bool s_handleInput(const Aws::Crt::String &input, ApplicationContext &context) { Aws::Crt::String remaining = input; Aws::Crt::String command = s_nibbleNextToken(remaining); @@ -233,18 +378,26 @@ static bool s_handleInput(const Aws::Crt::String &input, const std::shared_ptrStart(); + context.m_protocolClient->Start(); } else if (command == "stop") { fprintf(stdout, "Stopping protocol client!\n"); - protocolClient->Stop(); + context.m_protocolClient->Stop(); } else if (command == "get") { - s_handleGetNamedShadow(remaining, shadowClient); + s_handleGetNamedShadow(remaining, context.m_shadowClient); } else if (command == "delete") { - s_handleDeleteNamedShadow(remaining, shadowClient); + s_handleDeleteNamedShadow(remaining, context.m_shadowClient); } else if (command == "update-desired") { - s_handleUpdateDesiredNamedShadow(remaining, shadowClient); + s_handleUpdateDesiredNamedShadow(remaining, context.m_shadowClient); } else if (command == "update-reported") { - s_handleUpdateReportedNamedShadow(remaining, shadowClient); + s_handleUpdateReportedNamedShadow(remaining, context.m_shadowClient); + } else if (command == "list-streams") { + s_handleListStreams(context); + } else if (command == "open-delta-stream") { + s_handleOpenDeltaStream(remaining, context); + } else if (command == "open-document-stream") { + s_handleOpenDocumentStream(remaining, context); + } else if (command == "close-stream") { + s_handleCloseStream(remaining, context); } else { s_printHelp(); } @@ -273,25 +426,20 @@ int main(int argc, char *argv[]) return -1; } - if (cmdData.input_port != 0) - { - builder->WithPort(static_cast(cmdData.input_port)); - } - // Setup lifecycle callbacks builder->WithClientConnectionSuccessCallback(s_onConnectionSuccess); builder->WithClientConnectionFailureCallback(s_onConnectionFailure); builder->WithClientStoppedCallback(s_onStopped); - // Create Mqtt5Client - std::shared_ptr client = builder->Build(); - Aws::Iot::RequestResponse::RequestResponseClientOptions requestResponseOptions; - requestResponseOptions.maxRequestResponseSubscriptions = 4; - requestResponseOptions.maxStreamingSubscriptions = 10; - requestResponseOptions.operationTimeoutInSeconds = 30; + requestResponseOptions.WithMaxRequestResponseSubscriptions(4); + requestResponseOptions.WithMaxStreamingSubscriptions(10); + requestResponseOptions.WithOperationTimeoutInSeconds(30); - std::shared_ptr shadowClient = Aws::Iotshadow::IClientV2::newFrom5(*client, requestResponseOptions); + ApplicationContext context; + context.m_protocolClient = builder->Build();; + context.m_shadowClient = Aws::Iotshadow::NewClientFrom5(*context.m_protocolClient, requestResponseOptions); + context.m_nextStreamId = 1; while (true) { @@ -300,7 +448,7 @@ int main(int argc, char *argv[]) String input; std::getline(std::cin, input); - if (s_handleInput(input, client, shadowClient)) + if (s_handleInput(input, context)) { fprintf(stdout, "Exiting..."); break; diff --git a/shadow/include/aws/iotshadow/IotShadowClientv2.h b/shadow/include/aws/iotshadow/IotShadowClientV2.h similarity index 71% rename from shadow/include/aws/iotshadow/IotShadowClientv2.h rename to shadow/include/aws/iotshadow/IotShadowClientV2.h index 6850ddd8c..46d61e2f9 100644 --- a/shadow/include/aws/iotshadow/IotShadowClientv2.h +++ b/shadow/include/aws/iotshadow/IotShadowClientV2.h @@ -10,8 +10,6 @@ #include #include - - #include namespace Aws @@ -49,10 +47,10 @@ namespace Aws ServiceErrorV2 &operator =(const ServiceErrorV2 &rhs) = default; ServiceErrorV2 &operator =(ServiceErrorV2 &&rhs) = default; - int getErrorCode() const { return m_errorCode; } + int GetErrorCode() const { return m_errorCode; } - bool hasModeledError() const { return m_modeledError.has_value(); } - const E &getModeledError() const { return m_modeledError.value(); } + bool HasModeledError() const { return m_modeledError.has_value(); } + const E &GetModeledError() const { return m_modeledError.value(); } private: @@ -83,24 +81,23 @@ namespace Aws virtual ~IClientV2() = default; - static std::shared_ptr newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, - const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, - Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); - - static std::shared_ptr newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, - const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, - Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); - virtual bool DeleteNamedShadow(const Aws::Iotshadow::DeleteNamedShadowRequest &request, const std::function &handler) = 0; virtual bool GetNamedShadow(const Aws::Iotshadow::GetNamedShadowRequest &request, const std::function &handler) = 0; virtual bool UpdateNamedShadow(const Aws::Iotshadow::UpdateNamedShadowRequest &request, const std::function &handler) = 0; - virtual std::shared_ptr createNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler) = 0; - - virtual std::shared_ptr createNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler) = 0; + virtual std::shared_ptr CreateNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) = 0; + virtual std::shared_ptr CreateNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) = 0; }; + + AWS_IOTSHADOW_API std::shared_ptr NewClientFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, + const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, + Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); + + AWS_IOTSHADOW_API std::shared_ptr NewClientFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, + const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, + Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()); } } \ No newline at end of file diff --git a/shadow/source/IotShadowClientv2.cpp b/shadow/source/IotShadowClientV2.cpp similarity index 64% rename from shadow/source/IotShadowClientv2.cpp rename to shadow/source/IotShadowClientV2.cpp index ccf693d8c..1e722d1de 100644 --- a/shadow/source/IotShadowClientv2.cpp +++ b/shadow/source/IotShadowClientV2.cpp @@ -4,7 +4,7 @@ * This file is generated */ -#include +#include #include #include @@ -25,91 +25,100 @@ namespace Aws namespace Iotshadow { - class Clientv2 : public IClientV2 + class ClientV2 : public IClientV2 { public: + ClientV2(Aws::Crt::Allocator *allocator, std::shared_ptr bindingClient); + virtual ~ClientV2() = default; - Clientv2(Aws::Crt::Allocator *allocator, Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient); - virtual ~Clientv2() = default; + bool DeleteNamedShadow(const Aws::Iotshadow::DeleteNamedShadowRequest &request, const std::function &handler) override; - virtual bool DeleteNamedShadow(const Aws::Iotshadow::DeleteNamedShadowRequest &request, const std::function &handler); + bool GetNamedShadow(const Aws::Iotshadow::GetNamedShadowRequest &request, const std::function &handler) override; - virtual bool GetNamedShadow(const Aws::Iotshadow::GetNamedShadowRequest &request, const std::function &handler); + bool UpdateNamedShadow(const Aws::Iotshadow::UpdateNamedShadowRequest &request, const std::function &handler) override; - virtual bool UpdateNamedShadow(const Aws::Iotshadow::UpdateNamedShadowRequest &request, const std::function &handler); + std::shared_ptr CreateNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) override; - virtual std::shared_ptr createNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler); - - virtual std::shared_ptr createNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler); + std::shared_ptr CreateNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) override; private: Aws::Crt::Allocator *m_allocator; - Aws::Crt::ScopedResource m_bindingClient; + std::shared_ptr m_bindingClient; }; - Clientv2::Clientv2(Aws::Crt::Allocator *allocator, Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient) : + ClientV2::ClientV2(Aws::Crt::Allocator *allocator, std::shared_ptr bindingClient) : m_allocator(allocator), - m_bindingClient(bindingClient, [allocator](Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient) { Aws::Crt::Delete(bindingClient, allocator); }) + m_bindingClient(std::move(bindingClient)) { } - static void s_applyUnmodeledErrorToDeleteNamedShadowHandler(const std::function &handler, int errorCode) { + template + static void s_applyUnmodeledErrorToHandler(const std::function &handler, int errorCode) { ServiceErrorV2 error(errorCode); - Aws::Iotshadow::DeleteShadowResult finalResult(std::move(error)); + R finalResult(std::move(error)); + handler(std::move(finalResult)); + } + + template + static void s_applyModeledErrorToHandler(const std::function &handler, ErrorResponse &&modeledError) { + ServiceErrorV2 error(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, std::move(modeledError)); // TBI: MQTT Service error + R finalResult(std::move(error)); handler(std::move(finalResult)); } static void s_DeleteNamedShadowResponseHandler(Aws::Iot::RequestResponse::UnmodeledResult &&result, const std::function &handler, const Aws::Crt::String &successPathTopic, const Aws::Crt::String &failurePathTopic) { - if (!result.isSuccess()) { - s_applyUnmodeledErrorToDeleteNamedShadowHandler(handler, result.getError()); + if (!result.IsSuccess()) { + s_applyUnmodeledErrorToHandler(handler, result.GetError()); return; } - auto response = result.getResponse(); - Aws::Crt::String objectStr(reinterpret_cast(response.payload.ptr), response.payload.len); + auto response = result.GetResponse(); + const auto &payload = response.GetPayload(); + Aws::Crt::String objectStr(reinterpret_cast(payload.ptr), payload.len); Aws::Crt::JsonObject jsonObject(objectStr); if (!jsonObject.WasParseSuccessful()) { - s_applyUnmodeledErrorToDeleteNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure return; } - auto responseTopic = Aws::Crt::String((const char *)response.topic.ptr, response.topic.len); + const auto &topic = response.GetTopic(); + auto responseTopic = Aws::Crt::String((const char *)topic.ptr, topic.len); if (responseTopic == successPathTopic) { Aws::Iotshadow::DeleteShadowResponse modeledResponse(jsonObject); Aws::Iotshadow::DeleteShadowResult finalResult(std::move(modeledResponse)); handler(std::move(finalResult)); } else if (responseTopic == failurePathTopic) { - Aws::Iotshadow::ErrorResponse modeledResponse(jsonObject); - - Aws::Iotshadow::ServiceErrorV2 modeledError(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, std::move(modeledResponse)); // TBI: MQTT Service error - Aws::Iotshadow::DeleteShadowResult finalResult(std::move(modeledError)); - handler(std::move(finalResult)); + Aws::Iotshadow::ErrorResponse modeledError(jsonObject); + s_applyModeledErrorToHandler(handler, std::move(modeledError)); } else { - s_applyUnmodeledErrorToDeleteNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path } } - bool Clientv2::DeleteNamedShadow(const Aws::Iotshadow::DeleteNamedShadowRequest &request, const std::function &handler) { + bool ClientV2::DeleteNamedShadow(const Aws::Iotshadow::DeleteNamedShadowRequest &request, const std::function &handler) + { Aws::Crt::StringStream publishTopicStream; - publishTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/delete"; + publishTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName + << "/delete"; Aws::Crt::String publishTopic = publishTopicStream.str(); Aws::Crt::StringStream subscriptionTopicStream; - subscriptionTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/delete/+"; + subscriptionTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName + << "/delete/+"; Aws::Crt::String subscriptionTopic = subscriptionTopicStream.str(); - struct aws_byte_cursor subscriptionTopicFilters[1] = { - Aws::Crt::ByteCursorFromString(subscriptionTopic) - }; + struct aws_byte_cursor subscriptionTopicFilters[1] = {Aws::Crt::ByteCursorFromString(subscriptionTopic),}; Aws::Crt::StringStream responsePathTopic1Stream; - responsePathTopic1Stream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/delete/accepted"; + responsePathTopic1Stream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName + << "/delete/accepted"; Aws::Crt::String responsePathTopic1 = responsePathTopic1Stream.str(); Aws::Crt::StringStream responsePathTopic2Stream; - responsePathTopic2Stream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/delete/rejected"; + responsePathTopic2Stream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName + << "/delete/rejected"; Aws::Crt::String responsePathTopic2 = responsePathTopic2Stream.str(); struct aws_mqtt_request_operation_response_path responsePaths[2]; @@ -133,55 +142,49 @@ namespace Aws options.response_paths = responsePaths; options.response_path_count = 2; options.publish_topic = Aws::Crt::ByteCursorFromString(publishTopic); - options.serialized_request = Aws::Crt::ByteCursorFromArray((uint8_t *)outgoingJson.data(), outgoingJson.length()); + options.serialized_request = + Aws::Crt::ByteCursorFromArray((uint8_t *)outgoingJson.data(), outgoingJson.length()); options.correlation_token = Aws::Crt::ByteCursorFromString(uuid); - auto resultHandler = [handler, responsePathTopic1, responsePathTopic2](Aws::Iot::RequestResponse::UnmodeledResult &&result){ - s_DeleteNamedShadowResponseHandler(std::move(result), handler, responsePathTopic1, responsePathTopic2); - }; + auto resultHandler = + [handler, responsePathTopic1, responsePathTopic2](Aws::Iot::RequestResponse::UnmodeledResult &&result) + { s_DeleteNamedShadowResponseHandler(std::move(result), handler, responsePathTopic1, responsePathTopic2); }; - int submitResult = m_bindingClient->submitRequest(options, std::move(resultHandler)); + int submitResult = m_bindingClient->SubmitRequest(options, std::move(resultHandler)); return submitResult == AWS_OP_SUCCESS; } - static void s_applyUnmodeledErrorToGetNamedShadowHandler(const std::function &handler, int errorCode) { - ServiceErrorV2 error(errorCode); - Aws::Iotshadow::GetShadowResult finalResult(std::move(error)); - handler(std::move(finalResult)); - } - static void s_GetNamedShadowResponseHandler(Aws::Iot::RequestResponse::UnmodeledResult &&result, const std::function &handler, const Aws::Crt::String &successPathTopic, const Aws::Crt::String &failurePathTopic) { - if (!result.isSuccess()) { - s_applyUnmodeledErrorToGetNamedShadowHandler(handler, result.getError()); + if (!result.IsSuccess()) { + s_applyUnmodeledErrorToHandler(handler, result.GetError()); return; } - auto response = result.getResponse(); - Aws::Crt::String objectStr(reinterpret_cast(response.payload.ptr), response.payload.len); + auto response = result.GetResponse(); + const auto &payload = response.GetPayload(); + Aws::Crt::String objectStr(reinterpret_cast(payload.ptr), payload.len); Aws::Crt::JsonObject jsonObject(objectStr); if (!jsonObject.WasParseSuccessful()) { - s_applyUnmodeledErrorToGetNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure return; } - auto responseTopic = Aws::Crt::String((const char *)response.topic.ptr, response.topic.len); + const auto &topic = response.GetTopic(); + auto responseTopic = Aws::Crt::String((const char *)topic.ptr, topic.len); if (responseTopic == successPathTopic) { Aws::Iotshadow::GetShadowResponse modeledResponse(jsonObject); Aws::Iotshadow::GetShadowResult finalResult(std::move(modeledResponse)); handler(std::move(finalResult)); } else if (responseTopic == failurePathTopic) { - Aws::Iotshadow::ErrorResponse modeledResponse(jsonObject); - - Aws::Iotshadow::ServiceErrorV2 modeledError(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, std::move(modeledResponse)); // TBI: MQTT Service error - Aws::Iotshadow::GetShadowResult finalResult(std::move(modeledError)); - handler(std::move(finalResult)); + Aws::Iotshadow::ErrorResponse modeledError(jsonObject); + s_applyModeledErrorToHandler(handler, std::move(modeledError)); } else { - s_applyUnmodeledErrorToGetNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path } } - bool Clientv2::GetNamedShadow(const Aws::Iotshadow::GetNamedShadowRequest &request, const std::function &handler) { + bool ClientV2::GetNamedShadow(const Aws::Iotshadow::GetNamedShadowRequest &request, const std::function &handler) { Aws::Crt::StringStream publishTopicStream; publishTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/get"; Aws::Crt::String publishTopic = publishTopicStream.str(); @@ -191,7 +194,7 @@ namespace Aws Aws::Crt::String subscriptionTopic = subscriptionTopicStream.str(); struct aws_byte_cursor subscriptionTopicFilters[1] = { - Aws::Crt::ByteCursorFromString(subscriptionTopic) + Aws::Crt::ByteCursorFromString(subscriptionTopic), }; Aws::Crt::StringStream responsePathTopic1Stream; @@ -230,48 +233,41 @@ namespace Aws s_GetNamedShadowResponseHandler(std::move(result), handler, responsePathTopic1, responsePathTopic2); }; - int submitResult = m_bindingClient->submitRequest(options, std::move(resultHandler)); + int submitResult = m_bindingClient->SubmitRequest(options, std::move(resultHandler)); return submitResult == AWS_OP_SUCCESS; } - static void s_applyUnmodeledErrorToUpdateNamedShadowHandler(const std::function &handler, int errorCode) { - ServiceErrorV2 error(errorCode); - Aws::Iotshadow::UpdateShadowResult finalResult(std::move(error)); - handler(std::move(finalResult)); - } - static void s_UpdateNamedShadowResponseHandler(Aws::Iot::RequestResponse::UnmodeledResult &&result, const std::function &handler, const Aws::Crt::String &successPathTopic, const Aws::Crt::String &failurePathTopic) { - if (!result.isSuccess()) { - s_applyUnmodeledErrorToUpdateNamedShadowHandler(handler, result.getError()); + if (!result.IsSuccess()) { + s_applyUnmodeledErrorToHandler(handler, result.GetError()); return; } - auto response = result.getResponse(); - Aws::Crt::String objectStr(reinterpret_cast(response.payload.ptr), response.payload.len); + auto response = result.GetResponse(); + const auto &payload = response.GetPayload(); + Aws::Crt::String objectStr(reinterpret_cast(payload.ptr), payload.len); Aws::Crt::JsonObject jsonObject(objectStr); if (!jsonObject.WasParseSuccessful()) { - s_applyUnmodeledErrorToUpdateNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: Payload Parse failure return; } - auto responseTopic = Aws::Crt::String((const char *)response.topic.ptr, response.topic.len); + const auto &topic = response.GetTopic(); + auto responseTopic = Aws::Crt::String((const char *)topic.ptr, topic.len); if (responseTopic == successPathTopic) { Aws::Iotshadow::UpdateShadowResponse modeledResponse(jsonObject); Aws::Iotshadow::UpdateShadowResult finalResult(std::move(modeledResponse)); handler(std::move(finalResult)); } else if (responseTopic == failurePathTopic) { - Aws::Iotshadow::ErrorResponse modeledResponse(jsonObject); - - Aws::Iotshadow::ServiceErrorV2 modeledError(AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR, std::move(modeledResponse)); // TBI: MQTT Service error - Aws::Iotshadow::UpdateShadowResult finalResult(std::move(modeledError)); - handler(std::move(finalResult)); + Aws::Iotshadow::ErrorResponse modeledError(jsonObject); + s_applyModeledErrorToHandler(handler, std::move(modeledError)); } else { - s_applyUnmodeledErrorToUpdateNamedShadowHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path + s_applyUnmodeledErrorToHandler(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INTERNAL_ERROR); // TBI: invalid response path } } - bool Clientv2::UpdateNamedShadow(const Aws::Iotshadow::UpdateNamedShadowRequest &request, const std::function &handler) { + bool ClientV2::UpdateNamedShadow(const Aws::Iotshadow::UpdateNamedShadowRequest &request, const std::function &handler) { Aws::Crt::StringStream publishTopicStream; publishTopicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/update"; Aws::Crt::String publishTopic = publishTopicStream.str(); @@ -286,7 +282,7 @@ namespace Aws struct aws_byte_cursor subscriptionTopicFilters[2] = { Aws::Crt::ByteCursorFromString(subscriptionTopic1), - Aws::Crt::ByteCursorFromString(subscriptionTopic2) + Aws::Crt::ByteCursorFromString(subscriptionTopic2), }; Aws::Crt::StringStream responsePathTopic1Stream; @@ -325,7 +321,7 @@ namespace Aws s_UpdateNamedShadowResponseHandler(std::move(result), handler, responsePathTopic1, responsePathTopic2); }; - int submitResult = m_bindingClient->submitRequest(options, std::move(resultHandler)); + int submitResult = m_bindingClient->SubmitRequest(options, std::move(resultHandler)); return submitResult == AWS_OP_SUCCESS; } @@ -338,25 +334,26 @@ namespace Aws { } - static std::shared_ptr create(Aws::Crt::Allocator *allocator, Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient, const Aws::Crt::String &subscriptionTopicFilter, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler) { + static std::shared_ptr Create(Aws::Crt::Allocator *allocator, const std::shared_ptr &bindingClient, const Aws::Crt::String &subscriptionTopicFilter, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) { - std::function unmodeledHandler = [handler](Aws::Iot::RequestResponse::IncomingPublishEvent &&event){ - Aws::Crt::String objectStr(reinterpret_cast(event.payload.ptr), event.payload.len); + std::function unmodeledHandler = [options](Aws::Iot::RequestResponse::IncomingPublishEvent &&event){ + const auto &payload = event.GetPayload(); + Aws::Crt::String objectStr(reinterpret_cast(payload.ptr), payload.len); Aws::Crt::JsonObject jsonObject(objectStr); if (!jsonObject.WasParseSuccessful()) { return; } T modeledEvent(jsonObject); - handler(std::move(modeledEvent)); + options.GetStreamHandler()(std::move(modeledEvent)); }; Aws::Iot::RequestResponse::StreamingOperationOptionsInternal internalOptions; internalOptions.subscriptionTopicFilter = Aws::Crt::ByteCursorFromString(subscriptionTopicFilter); - internalOptions.subscriptionStatusEventHandler = options.subscriptionStatusEventHandler; + internalOptions.subscriptionStatusEventHandler = options.GetSubscriptionStatusEventHandler(); internalOptions.incomingPublishEventHandler = unmodeledHandler; - auto unmodeledStream = bindingClient->createStream(internalOptions); + auto unmodeledStream = bindingClient->CreateStream(internalOptions); if (!unmodeledStream) { return nullptr; } @@ -364,8 +361,8 @@ namespace Aws return Aws::Crt::MakeShared(allocator, unmodeledStream); } - virtual void activate() { - m_stream->activate(); + void Open() override { + m_stream->Open(); } private: @@ -373,44 +370,44 @@ namespace Aws std::shared_ptr m_stream; }; - std::shared_ptr Clientv2::createNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler) { + std::shared_ptr ClientV2::CreateNamedShadowDeltaUpdatedStream(const Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) { Aws::Crt::StringStream topicStream; topicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/update/delta"; Aws::Crt::String topic = topicStream.str(); - return ShadowStreamingOperation::create(m_allocator, m_bindingClient.get(), topic, options, handler); + return ShadowStreamingOperation::Create(m_allocator, m_bindingClient, topic, options); } - std::shared_ptr Clientv2::createNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options, const std::function &handler) { + std::shared_ptr ClientV2::CreateNamedShadowUpdatedStream(const Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest &request, const Aws::Iot::RequestResponse::StreamingOperationOptions &options) { Aws::Crt::StringStream topicStream; topicStream << "$aws/things/" << *request.ThingName << "/shadow/name/" << *request.ShadowName << "/update/documents"; Aws::Crt::String topic = topicStream.str(); - return ShadowStreamingOperation::create(m_allocator, m_bindingClient.get(), topic, options, handler); + return ShadowStreamingOperation::Create(m_allocator, m_bindingClient, topic, options); } - std::shared_ptr IClientV2::newFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, + std::shared_ptr NewClientFrom5(const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient, const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, Aws::Crt::Allocator *allocator) { - Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient = Aws::Iot::RequestResponse::IMqttRequestResponseClient::newFrom5(protocolClient, options, allocator); - if (!bindingClient) { + std::shared_ptr bindingClient = Aws::Iot::RequestResponse::NewClientFrom5(protocolClient, options, allocator); + if (nullptr == bindingClient) { return nullptr; } - return Aws::Crt::MakeShared(allocator, allocator, bindingClient); + return Aws::Crt::MakeShared(allocator, allocator, bindingClient); } - std::shared_ptr IClientV2::newFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, + std::shared_ptr NewClientFrom311(const Aws::Crt::Mqtt::MqttConnection &protocolClient, const Aws::Iot::RequestResponse::RequestResponseClientOptions &options, Aws::Crt::Allocator *allocator) { - Aws::Iot::RequestResponse::IMqttRequestResponseClient *bindingClient = Aws::Iot::RequestResponse::IMqttRequestResponseClient::newFrom311(protocolClient, options, allocator); - if (!bindingClient) { + std::shared_ptr bindingClient = Aws::Iot::RequestResponse::NewClientFrom311(protocolClient, options, allocator); + if (nullptr == bindingClient) { return nullptr; } - return Aws::Crt::MakeShared(allocator, allocator, bindingClient); + return Aws::Crt::MakeShared(allocator, allocator, bindingClient); } }