Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Sep 19, 2024
1 parent aa12ba8 commit 0327dcf
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 156 deletions.
2 changes: 1 addition & 1 deletion crt/aws-crt-cpp
222 changes: 185 additions & 37 deletions samples/shadow/v2/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,51 @@
#include <aws/iot/Mqtt5Client.h>

#include <aws/iotshadow/ErrorResponse.h>
#include <aws/iotshadow/IotShadowClientv2.h>
#include <aws/iotshadow/IotShadowClientV2.h>
#include <aws/iotshadow/DeleteNamedShadowRequest.h>
#include <aws/iotshadow/DeleteShadowResponse.h>
#include <aws/iotshadow/GetNamedShadowRequest.h>
#include <aws/iotshadow/GetShadowResponse.h>
#include <aws/iotshadow/NamedShadowDeltaUpdatedSubscriptionRequest.h>
#include <aws/iotshadow/NamedShadowUpdatedSubscriptionRequest.h>
#include <aws/iotshadow/ShadowDeltaUpdatedEvent.h>
#include <aws/iotshadow/ShadowUpdatedEvent.h>
#include <aws/iotshadow/UpdateNamedShadowRequest.h>
#include <aws/iotshadow/UpdateShadowResponse.h>

#include <algorithm>
#include <condition_variable>
#include <cinttypes>
#include <iostream>
#include <mutex>
#include <thread>

#include "../../utils/CommandLineUtils.h"

using namespace Aws::Crt;
using namespace Aws::Iotshadow;

struct StreamingOperationWrapper {
Aws::Crt::String m_thingName;

Aws::Crt::String m_shadowName;

Aws::Crt::String m_type;

std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> m_stream;
};

struct ApplicationContext {

std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> m_protocolClient;

std::shared_ptr<Aws::Iotshadow::IClientV2> m_shadowClient;

uint64_t m_nextStreamId;

std::unordered_map<uint64_t, StreamingOperationWrapper> m_streams;
};


static void s_onConnectionSuccess(const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
Expand All @@ -40,7 +66,6 @@ static void s_onStopped(const Mqtt5::OnStoppedEventData &event) {
fprintf(stdout, "Protocol client stopped.\n");
}


static Aws::Crt::String s_nibbleNextToken(Aws::Crt::String &input) {
Aws::Crt::String token;
Aws::Crt::String remaining;
Expand Down Expand Up @@ -68,17 +93,21 @@ static void s_printHelp() {
fprintf(stdout, "\nShadow sandbox:\n\n");
fprintf(stdout, " quit -- quits the program\n");
fprintf(stdout, " start -- starts the protocol client\n");
fprintf(stdout, " stop -- stops the protocol client\n");
fprintf(stdout, " stop -- stops the protocol client\n\n");
fprintf(stdout, " get <thing-name> <shadow-name> -- gets the state of a named shadow belonging to the specified thing\n");
fprintf(stdout, " delete <thing-name> <shadow-name> -- deletes a named shadow belonging to the specified thing\n");
fprintf(stdout, " update-desired <thing-name> <shadow-name> <desired-state-JSON> -- updates the desired state of a named shadow belonging to the specified thing\n");
fprintf(stdout, " update-reported <thing-name> <shadow-name> <reported-state-JSON> -- updates the reported state a named shadow belonging to the specified thing\n");
fprintf(stdout, " update-reported <thing-name> <shadow-name> <reported-state-JSON> -- updates the reported state a named shadow belonging to the specified thing\n\n");
fprintf(stdout, " list-streams -- lists all open streaming operations\n");
fprintf(stdout, " open-delta-stream <thing-name> <shadow-name> -- opens a new streaming operation that receives delta events about changes to a particular shadow belonging to a thing\n");
fprintf(stdout, " open-document-stream <thing-name> <shadow-name> -- opens a new streaming operation that receives document events about changes to a particular shadow belonging to a thing\n");
fprintf(stdout, " close-stream <stream-id> -- closes a streaming operation\n");
}

static void s_onServiceError(const Aws::Iotshadow::ServiceErrorV2<Aws::Iotshadow::ErrorResponse> &serviceError, Aws::Crt::String operationName) {
fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.getErrorCode()));
if (serviceError.hasModeledError()) {
const auto &modeledError = serviceError.getModeledError();
fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.GetErrorCode()));
if (serviceError.HasModeledError()) {
const auto &modeledError = serviceError.GetModeledError();

Aws::Crt::JsonObject jsonObject;
modeledError.SerializeToObject(jsonObject);
Expand All @@ -88,15 +117,15 @@ static void s_onServiceError(const Aws::Iotshadow::ServiceErrorV2<Aws::Iotshadow
}

static void s_onGetShadowResult(GetShadowResult &&result) {
if (result.isSuccess()) {
const auto &response = result.getResponse();
if (result.IsSuccess()) {
const auto &response = result.GetResponse();

Aws::Crt::JsonObject jsonObject;
response.SerializeToObject(jsonObject);
Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true);
fprintf(stdout, "get result: %s\n", outgoingJson.c_str());
} else {
s_onServiceError(result.getError(), "get");
s_onServiceError(result.GetError(), "get");
}
}

Expand All @@ -121,15 +150,15 @@ static void s_handleGetNamedShadow(const Aws::Crt::String params, const std::sha
}

static void s_onDeleteShadowResult(DeleteShadowResult &&result) {
if (result.isSuccess()) {
const auto &response = result.getResponse();
if (result.IsSuccess()) {
const auto &response = result.GetResponse();

Aws::Crt::JsonObject jsonObject;
response.SerializeToObject(jsonObject);
Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true);
fprintf(stdout, "delete result: %s\n", outgoingJson.c_str());
} else {
s_onServiceError(result.getError(), "delete");
s_onServiceError(result.GetError(), "delete");
}
}

Expand All @@ -154,15 +183,15 @@ static void s_handleDeleteNamedShadow(const Aws::Crt::String params, const std::
}

static void s_onUpdateShadowResult(UpdateShadowResult &&result, Aws::Crt::String operationName) {
if (result.isSuccess()) {
const auto &response = result.getResponse();
if (result.IsSuccess()) {
const auto &response = result.GetResponse();

Aws::Crt::JsonObject jsonObject;
response.SerializeToObject(jsonObject);
Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true);
fprintf(stdout, "%s result: %s\n", operationName.c_str(), outgoingJson.c_str());
} else {
s_onServiceError(result.getError(), operationName);
s_onServiceError(result.GetError(), operationName);
}
}

Expand Down Expand Up @@ -224,7 +253,123 @@ static void s_handleUpdateReportedNamedShadow(const Aws::Crt::String params, con
});
}

static bool s_handleInput(const Aws::Crt::String &input, const std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> &protocolClient, const std::shared_ptr<Aws::Iotshadow::IClientV2> &shadowClient) {
static void s_handleListStreams(const ApplicationContext &context) {
fprintf(stdout, "Streams:\n");
for (const auto &iter : context.m_streams) {
uint64_t streamId = iter.first;
const StreamingOperationWrapper &wrapper = iter.second;
fprintf(stdout, " %" PRIu64": type '%s', thing '%s', shadow '%s'\n", streamId, wrapper.m_type.c_str(), wrapper.m_thingName.c_str(), wrapper.m_shadowName.c_str());
}
}

static void s_handleCloseStream(const Aws::Crt::String params, ApplicationContext &context) {
Aws::Crt::String remaining = params;
Aws::Crt::String streamId = s_nibbleNextToken(remaining);

if (streamId.length() == 0) {
fprintf(stdout, "Invalid arguments to close-stream command!\n\n");
s_printHelp();
return;
}

uint64_t id = std::stoull(streamId.c_str());
fprintf(stdout, "Closing stream %" PRIu64 "\n", id);
context.m_streams.erase(id);
}

static void s_registerStream(ApplicationContext &context, uint64_t id, std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> operation, Aws::Crt::String type, Aws::Crt::String thing, Aws::Crt::String shadow) {
StreamingOperationWrapper wrapper;
wrapper.m_stream = operation;
wrapper.m_type = type;
wrapper.m_thingName = thing;
wrapper.m_shadowName = shadow;

context.m_streams[id] = wrapper;

operation->Open();
}

static void s_onSubscriptionStatusEvent(uint64_t id, Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) {
fprintf(stdout, "Stream %" PRIu64 ": subscription status event with type %d and error %s\n", id, event.GetErrorCode(), Aws::Crt::ErrorDebugString(event.GetErrorCode()));
}

static void s_onShadowDeltaUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) {
fprintf(stdout, "Stream %" PRIu64 ": received shadow delta updated event:\n", id);

Aws::Crt::JsonObject jsonObject;
event.SerializeToObject(jsonObject);
Aws::Crt::String json = jsonObject.View().WriteCompact(true);
fprintf(stdout, " %s\n", json.c_str());
}

static void s_handleOpenDeltaStream(const Aws::Crt::String params, ApplicationContext &context) {
Aws::Crt::String remaining = params;
Aws::Crt::String thing = s_nibbleNextToken(remaining);
Aws::Crt::String shadow = s_nibbleNextToken(remaining);

if (thing.length() == 0 || shadow.length() == 0) {
fprintf(stdout, "Invalid arguments to open-delta-stream command!\n\n");
s_printHelp();
return;
}

uint64_t streamId = context.m_nextStreamId++;

Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest request;
request.ThingName = thing;
request.ShadowName = shadow;

Aws::Iot::RequestResponse::StreamingOperationOptions<Aws::Iotshadow::ShadowDeltaUpdatedEvent> options;
options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) {
s_onSubscriptionStatusEvent(streamId, std::move(event));
});
options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) {
s_onShadowDeltaUpdatedEvent(streamId, std::move(event));
});

auto operation = context.m_shadowClient->CreateNamedShadowDeltaUpdatedStream(request, options);
s_registerStream(context, streamId, operation, "Delta", thing, shadow);
}

static void s_onShadowUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowUpdatedEvent &&event) {
fprintf(stdout, "Stream %" PRIu64 ": received shadow updated event:\n", id);

Aws::Crt::JsonObject jsonObject;
event.SerializeToObject(jsonObject);
Aws::Crt::String json = jsonObject.View().WriteCompact(true);
fprintf(stdout, " %s\n", json.c_str());
}

static void s_handleOpenDocumentStream(const Aws::Crt::String params, ApplicationContext &context) {
Aws::Crt::String remaining = params;
Aws::Crt::String thing = s_nibbleNextToken(remaining);
Aws::Crt::String shadow = s_nibbleNextToken(remaining);

if (thing.length() == 0 || shadow.length() == 0) {
fprintf(stdout, "Invalid arguments to open-document-stream command!\n\n");
s_printHelp();
return;
}

uint64_t streamId = context.m_nextStreamId++;

Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest request;
request.ThingName = thing;
request.ShadowName = shadow;

Aws::Iot::RequestResponse::StreamingOperationOptions<Aws::Iotshadow::ShadowUpdatedEvent> options;
options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) {
s_onSubscriptionStatusEvent(streamId, std::move(event));
});
options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowUpdatedEvent &&event) {
s_onShadowUpdatedEvent(streamId, std::move(event));
});

auto operation = context.m_shadowClient->CreateNamedShadowUpdatedStream(request, options);
s_registerStream(context, streamId, operation, "Document", thing, shadow);
}

static bool s_handleInput(const Aws::Crt::String &input, ApplicationContext &context) {
Aws::Crt::String remaining = input;
Aws::Crt::String command = s_nibbleNextToken(remaining);

Expand All @@ -233,18 +378,26 @@ static bool s_handleInput(const Aws::Crt::String &input, const std::shared_ptr<A
return true;
} else if (command == "start") {
fprintf(stdout, "Starting protocol client!\n");
protocolClient->Start();
context.m_protocolClient->Start();
} else if (command == "stop") {
fprintf(stdout, "Stopping protocol client!\n");
protocolClient->Stop();
context.m_protocolClient->Stop();
} else if (command == "get") {
s_handleGetNamedShadow(remaining, shadowClient);
s_handleGetNamedShadow(remaining, context.m_shadowClient);
} else if (command == "delete") {
s_handleDeleteNamedShadow(remaining, shadowClient);
s_handleDeleteNamedShadow(remaining, context.m_shadowClient);
} else if (command == "update-desired") {
s_handleUpdateDesiredNamedShadow(remaining, shadowClient);
s_handleUpdateDesiredNamedShadow(remaining, context.m_shadowClient);
} else if (command == "update-reported") {
s_handleUpdateReportedNamedShadow(remaining, shadowClient);
s_handleUpdateReportedNamedShadow(remaining, context.m_shadowClient);
} else if (command == "list-streams") {
s_handleListStreams(context);
} else if (command == "open-delta-stream") {
s_handleOpenDeltaStream(remaining, context);
} else if (command == "open-document-stream") {
s_handleOpenDocumentStream(remaining, context);
} else if (command == "close-stream") {
s_handleCloseStream(remaining, context);
} else {
s_printHelp();
}
Expand Down Expand Up @@ -273,25 +426,20 @@ int main(int argc, char *argv[])
return -1;
}

if (cmdData.input_port != 0)
{
builder->WithPort(static_cast<uint32_t>(cmdData.input_port));
}

// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(s_onConnectionSuccess);
builder->WithClientConnectionFailureCallback(s_onConnectionFailure);
builder->WithClientStoppedCallback(s_onStopped);

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();

Aws::Iot::RequestResponse::RequestResponseClientOptions requestResponseOptions;
requestResponseOptions.maxRequestResponseSubscriptions = 4;
requestResponseOptions.maxStreamingSubscriptions = 10;
requestResponseOptions.operationTimeoutInSeconds = 30;
requestResponseOptions.WithMaxRequestResponseSubscriptions(4);
requestResponseOptions.WithMaxStreamingSubscriptions(10);
requestResponseOptions.WithOperationTimeoutInSeconds(30);

std::shared_ptr<Aws::Iotshadow::IClientV2> shadowClient = Aws::Iotshadow::IClientV2::newFrom5(*client, requestResponseOptions);
ApplicationContext context;
context.m_protocolClient = builder->Build();;
context.m_shadowClient = Aws::Iotshadow::NewClientFrom5(*context.m_protocolClient, requestResponseOptions);
context.m_nextStreamId = 1;

while (true)
{
Expand All @@ -300,7 +448,7 @@ int main(int argc, char *argv[])
String input;
std::getline(std::cin, input);

if (s_handleInput(input, client, shadowClient))
if (s_handleInput(input, context))
{
fprintf(stdout, "Exiting...");
break;
Expand Down
Loading

0 comments on commit 0327dcf

Please sign in to comment.