From 09c3b6cba129726b11cd56a72abf6bcdfa6e4fcd Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Mon, 9 Sep 2024 16:11:59 -0700 Subject: [PATCH] Refactor fleet provisioning service test --- .../fleet_provisioning/main.cpp | 2 +- .../mqtt5_fleet_provisioning/main.cpp | 6 +- servicetests/tests/FleetProvisioning/main.cpp | 784 +++++++++--------- 3 files changed, 408 insertions(+), 384 deletions(-) diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index 02b307f7e..cc1f5bcaf 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -426,7 +426,7 @@ int main(int argc, char *argv[]) { /************************ Setup ****************************/ - // Do the global initialization for the API + // Do the global initialization for the API ApiHandle apiHandle; /** diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index 3ea7ef274..6c9d0f15a 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -79,7 +79,7 @@ struct RegisterThingContext /** * Create MQTT5 client. */ -std::shared_ptr createMqtt5Client(Mqtt5ClientContext &ctx, const Utils::cmdData &cmdData) +std::shared_ptr createMqtt5Client(const Utils::cmdData &cmdData, Mqtt5ClientContext &ctx) { // Create the MQTT5 builder and populate it with data from cmdData. Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( @@ -418,7 +418,7 @@ int main(int argc, char *argv[]) { /************************ Setup ****************************/ - // Do the global initialization for the API + // Do the global initialization for the API ApiHandle apiHandle; /** @@ -429,7 +429,7 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); Mqtt5ClientContext mqtt5ClientContext; - auto client = createMqtt5Client(mqtt5ClientContext, cmdData); + auto client = createMqtt5Client(cmdData, mqtt5ClientContext); /************************ Run the sample ****************************/ diff --git a/servicetests/tests/FleetProvisioning/main.cpp b/servicetests/tests/FleetProvisioning/main.cpp index ff4c042aa..101ee1508 100644 --- a/servicetests/tests/FleetProvisioning/main.cpp +++ b/servicetests/tests/FleetProvisioning/main.cpp @@ -2,11 +2,9 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + #include #include -#include -#include - #include #include #include @@ -23,515 +21,525 @@ #include #include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include #include "../../../samples/utils/CommandLineUtils.h" using namespace Aws::Crt; using namespace Aws::Iotidentity; -static std::promise gotResponse; - -static std::string getFileData(std::string const &fileName) +static String getFileData(const String &fileName) { - std::ifstream ifs(fileName); + std::ifstream ifs(fileName.c_str()); std::string str; getline(ifs, str, (char)ifs.eof()); - return str; + return str.c_str(); } -std::shared_ptr build_mqtt3_client( - Utils::cmdData &cmdData, - std::shared_ptr &connection, - std::promise &connectionCompletedPromise, - std::promise &connectionClosedPromise) +/** + * Auxiliary structure for holding data used by MQTT3 connection. + */ +struct Mqtt3ConnectionContext { - Aws::Iot::MqttClientConnectionConfigBuilder clientConfigBuilder; - // Create the MQTT builder and populate it with data from cmdData. - clientConfigBuilder = - Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); - if (cmdData.input_ca != "") + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; +}; + +/** + * Auxiliary structure for holding data used by MQTT5 connection. + */ +struct Mqtt5ClientContext +{ + std::promise connectionPromise; + std::promise stoppedPromise; + std::promise disconnectPromise; + std::promise subscribeSuccess; +}; + +/** + * Auxiliary structure for holding data used when creating a certificate. + */ +struct CreateCertificateContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise tokenPromise; +}; + +/** + * Auxiliary structure for holding data used when registering a thing. + */ +struct RegisterThingContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise thingCreatedPromise; +}; + +/** + * Create MQTT5 client. + */ +std::shared_ptr createMqtt5Client(const Utils::cmdData &cmdData, Mqtt5ClientContext &ctx) +{ + // Create the MQTT5 builder and populate it with data from cmdData. + Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + + // Check if the builder setup correctly. + if (builder == nullptr) { - clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + exit(-1); } - // Create the MQTT connection from the MQTT builder - auto clientConfig = clientConfigBuilder.Build(); - if (!clientConfig) + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) { - fprintf( - stderr, - "Client Configuration initialization failed with error %s\n", - Aws::Crt::ErrorDebugString(clientConfig.LastError())); - exit(-1); + builder->WithPort(static_cast(cmdData.input_port)); } - Aws::Iot::MqttClient client3 = Aws::Iot::MqttClient(); - connection = client3.NewConnection(clientConfig); - if (!*connection) + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback( + [&ctx](const Mqtt5::OnConnectionSuccessEventData &eventData) + { + fprintf( + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + ctx.connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback( + [&ctx](const Mqtt5::OnConnectionFailureEventData &eventData) + { + fprintf( + stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback( + [&ctx](const Mqtt5::OnStoppedEventData &) + { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + ctx.stoppedPromise.set_value(); + }); + builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) + { fprintf(stdout, "Mqtt5 Client attempting connection...\n"); }); + builder->WithClientDisconnectionCallback( + [&ctx](const Mqtt5::OnDisconnectionEventData &eventData) + { + fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.disconnectPromise.set_value(); + }); + + // Create Mqtt5Client + auto client = builder->Build(); + delete builder; + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) { - fprintf( - stderr, - "MQTT Connection Creation failed with error %s\n", - Aws::Crt::ErrorDebugString(connection->LastError())); + fprintf(stderr, "MQTT5 Connection failed to start"); exit(-1); } + if (!ctx.connectionPromise.get_future().get()) + { + return nullptr; + } + + return client; +} + +/** + * Create MQTT3 connection. + */ +std::shared_ptr createMqtt3Connection(const Utils::cmdData &cmdData, Mqtt3ConnectionContext &ctx) +{ + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a promise. + */ + // Invoked when a MQTT connect has completed or failed - auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { + auto onConnectionCompleted = [&ctx](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) + { if (errorCode) { fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); - connectionCompletedPromise.set_value(false); + ctx.connectionCompletedPromise.set_value(false); } else { fprintf(stdout, "Connection completed with return code %d\n", returnCode); - connectionCompletedPromise.set_value(true); + ctx.connectionCompletedPromise.set_value(true); } }; // Invoked when a disconnect has been completed - auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { + auto onDisconnect = [&ctx](Mqtt::MqttConnection & /*conn*/) + { { fprintf(stdout, "Disconnect completed\n"); - connectionClosedPromise.set_value(); + ctx.connectionClosedPromise.set_value(); } }; - connection->OnConnectionCompleted = std::move(onConnectionCompleted); - connection->OnDisconnect = std::move(onDisconnect); - - if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + // Create the MQTT builder and populate it with data from cmdData. + auto clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") { - fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); - exit(-1); + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); } - return std::make_shared(connection); -} -std::shared_ptr build_mqtt5_client( - Utils::cmdData &cmdData, - std::shared_ptr &client5, - std::promise &connectionCompletedPromise, - std::promise &connectionClosedPromise) -{ - std::shared_ptr builder( - Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); - - // Check if the builder setup correctly. - if (builder == nullptr) + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) { - printf( - "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); - return nullptr; + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); + exit(-1); } - // Create the MQTT5 builder and populate it with data from cmdData. - // Setup connection options - std::shared_ptr connectOptions = std::make_shared(); - connectOptions->WithClientId(cmdData.input_clientId); - builder->WithConnectOptions(connectOptions); - if (cmdData.input_port != 0) + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + if (!*connection) { - builder->WithPort(static_cast(cmdData.input_port)); + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + exit(-1); } - // Setup lifecycle callbacks - builder->WithClientConnectionSuccessCallback( - [&connectionCompletedPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { - fprintf( - stdout, - "Mqtt5 Client connection succeed, clientid: %s.\n", - eventData.negotiatedSettings->getClientId().c_str()); - connectionCompletedPromise.set_value(true); - }); - builder->WithClientConnectionFailureCallback([&connectionCompletedPromise]( - const Mqtt5::OnConnectionFailureEventData &eventData) { - fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); - connectionCompletedPromise.set_value(false); - }); - builder->WithClientStoppedCallback([&connectionClosedPromise](const Mqtt5::OnStoppedEventData &) { - fprintf(stdout, "Mqtt5 Client stopped.\n"); - connectionClosedPromise.set_value(); - }); - - client5 = builder->Build(); - if (client5 == nullptr) + + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + fprintf(stdout, "Connecting...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) { - fprintf( - stdout, "Failed to Init Mqtt5Client with error code %d: %s.\n", LastError(), ErrorDebugString(LastError())); + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); exit(-1); } - if (!client5->Start()) + if (!ctx.connectionCompletedPromise.get_future().get()) { - fprintf(stderr, "MQTT5 Connection failed to start"); exit(-1); } - return std::make_shared(client5); + + return connection; } -void SubscribeToRegisterThing(String input_templateName, std::shared_ptr iotIdentityClient) +/** + * Keys-and-Certificate workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void createKeysAndCertificate(const std::shared_ptr &identityClient, CreateCertificateContext &ctx) { - std::promise onSubAckPromise; - std::promise registerRejectedCompletedPromise; - std::promise registerAcceptedCompletedPromise; - - auto onRegisterThingAccepted = [&](RegisterThingResponse *response, int ioErr) { - if (ioErr) + auto onKeysPublishPubAck = [&ctx](int ioErr) + { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onSuback callback error %d\n", ioErr); + fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response) + ctx.pubAckPromise.set_value(); + }; + + auto onKeysAcceptedSubAck = [&ctx](int ioErr) + { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stdout, "Register thing: %s\n", response->ThingName.value().c_str()); + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); } + ctx.acceptedSubAckPromise.set_value(); }; - RegisterThingSubscriptionRequest registerThingSubscriptionRequest; - registerThingSubscriptionRequest.TemplateName = input_templateName; - - auto onRegisterAcceptedSubAck = [&](int ioErr) { + auto onKeysRejectedSubAck = [&ctx](int ioErr) + { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - registerAcceptedCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - iotIdentityClient->SubscribeToRegisterThingAccepted( - registerThingSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onRegisterThingAccepted, - onRegisterAcceptedSubAck); - - auto rejectedHandler = [&](ErrorResponse *error, int ioErr) { - if (ioErr) + auto onKeysAccepted = [&ctx](CreateKeysAndCertificateResponse *response, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: rejectedHandler callback error %d\n", ioErr); - exit(-1); + fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onKeysRejected = [](ErrorResponse *error, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { fprintf( stdout, - "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", *error->StatusCode, error->ErrorMessage->c_str(), error->ErrorCode->c_str()); + exit(-1); } - }; - auto onRegisterRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + else { - fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - registerRejectedCompletedPromise.set_value(); }; - iotIdentityClient->SubscribeToRegisterThingRejected( - registerThingSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, rejectedHandler, onRegisterRejectedSubAck); - - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); -} - -void createKeysAndCertificateWorkflow( - String &input_templateName, - String &input_templateParameters, - std::shared_ptr iotIdentityClient) -{ - std::promise onSubAckPromise; - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; + fprintf(stdout, "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics\n"); + CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; + identityClient->SubscribeToCreateKeysAndCertificateAccepted( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); + identityClient->SubscribeToCreateKeysAndCertificateRejected( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; + // Wait for the subscriptions to the accept and reject keys-and-certificate topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); - CreateKeysAndCertificateResponse *createKeysAndCertificateResponse = nullptr; - String token; + // Now, when we subscribed to the keys and certificate topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateKeysAndCertificate topic\n"); + CreateKeysAndCertificateRequest createKeysAndCertificateRequest; + identityClient->PublishCreateKeysAndCertificate( + createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); + ctx.pubAckPromise.get_future().wait(); +} - auto acceptedHandler = [&createKeysAndCertificateResponse, - &token](CreateKeysAndCertificateResponse *response, int ioErr) { - fprintf(stderr, "acceptedhandler is called\n"); - if (ioErr) +/** + * Certificate-from-CSR workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void createCertificateFromCsr( + const std::shared_ptr &identityClient, + CreateCertificateContext &ctx, + const String &csrFile) +{ + auto onCsrPublishPubAck = [&ctx](int ioErr) + { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: acceptedHandler callback error %d\n", ioErr); + fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response) + ctx.pubAckPromise.set_value(); + }; + + auto onCsrAcceptedSubAck = [&ctx](int ioErr) + { + if (ioErr != AWS_OP_SUCCESS) { - if (createKeysAndCertificateResponse == nullptr) - { - createKeysAndCertificateResponse = response; - token = *response->CertificateOwnershipToken; - } + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); } - gotResponse.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; - auto onKeysAcceptedSubAck = [&](int ioErr) { + auto onCsrRejectedSubAck = [&ctx](int ioErr) + { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - keysAcceptedCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - CreateKeysAndCertificateSubscriptionRequest createKeysAndCertificateSubscriptionRequest; - iotIdentityClient->SubscribeToCreateKeysAndCertificateAccepted( - createKeysAndCertificateSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, acceptedHandler, onKeysAcceptedSubAck); - - auto rejectedHandler = [&](Aws::Iotidentity::ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_ERR) + auto onCsrAccepted = [&ctx](CreateCertificateFromCsrResponse *response, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: rejectedHandler callback error %s\n", ErrorDebugString(ioErr)); + fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - if (error) + }; + + auto onCsrRejected = [](ErrorResponse *error, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { fprintf( stdout, - "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", + "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", *error->StatusCode, error->ErrorMessage->c_str(), error->ErrorCode->c_str()); - } - exit(-1); - }; - - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - keysRejectedCompletedPromise.set_value(); - }; - iotIdentityClient->SubscribeToCreateKeysAndCertificateRejected( - createKeysAndCertificateSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, rejectedHandler, onKeysRejectedSubAck); - - auto onKeysPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + else { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - - keysPublishCompletedPromise.set_value(); }; - CreateKeysAndCertificateRequest createKeysAndCertificateRequest; - iotIdentityClient->PublishCreateKeysAndCertificate( - createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); + // CreateCertificateFromCsr workflow + fprintf(stdout, "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics\n"); + CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; + identityClient->SubscribeToCreateCertificateFromCsrAccepted( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); - SubscribeToRegisterThing(input_templateName, iotIdentityClient); - std::this_thread::sleep_for(std::chrono::seconds(1)); + identityClient->SubscribeToCreateCertificateFromCsrRejected( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); - // Verify the response is good - if (createKeysAndCertificateResponse == nullptr) - { - fprintf(stderr, "Error: 1 createKeysAndCertificateResponse is null\n"); - exit(-1); - } - // reset gotResponse future - gotResponse = std::promise(); + // Wait for the subscriptions to the accept and reject certificates topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = input_templateName; + // Now, when we subscribed to the certificates topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateCertificateFromCsr topic\n"); + CreateCertificateFromCsrRequest createCertificateFromCsrRequest; + createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; + identityClient->PublishCreateCertificateFromCsr( + createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); + ctx.pubAckPromise.get_future().wait(); +} - if (!input_templateParameters.empty()) +/** + * Provision an AWS IoT thing using a pre-defined template. + */ +void registerThing( + const std::shared_ptr &identityClient, + RegisterThingContext &ctx, + const Utils::cmdData &cmdData, + const String &token) +{ + auto onRegisterAcceptedSubAck = [&ctx](int ioErr) { - const Aws::Crt::String jsonValue = input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = Aws::Crt::Map(); - - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - registerThingRequest.Parameters = params; - } - registerThingRequest.CertificateOwnershipToken = token; - - auto onRegisterPublishSubAck = [&](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerPublishCompletedPromise.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; - iotIdentityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - std::this_thread::sleep_for(std::chrono::seconds(1)); - - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); -} -void createCertificateFromCsrWorkflow( - String input_templateName, - String input_templateParameters, - String input_csrPath, - std::shared_ptr iotIdentityClient) -{ - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; - - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; - CreateCertificateFromCsrResponse *createCertificateFromCsrResponse = nullptr; - String token; - - gotResponse = std::promise(); - - auto onCreateCertificateFromCsrResponseAccepted = [&createCertificateFromCsrResponse, - &token](CreateCertificateFromCsrResponse *response, int ioErr) { - if (ioErr) + auto onRegisterRejectedSubAck = [&ctx](int ioErr) + { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onCreateCertificateFromCsrResponseAccepted callback error %d\n", ioErr); + fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response != nullptr) - { - if (createCertificateFromCsrResponse == nullptr) - { - createCertificateFromCsrResponse = response; - token = *response->CertificateOwnershipToken; - } - } - gotResponse.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - auto onSubAck = [&](int ioErr) { - if (ioErr) + auto onRegisterAccepted = [&ctx](RegisterThingResponse *response, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onSubAck callback error %d\n", ioErr); - exit(-1); + fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); + ctx.thingCreatedPromise.set_value(); } - keysAcceptedCompletedPromise.set_value(); - }; - - CreateCertificateFromCsrSubscriptionRequest createCertificateFromCsrSubscriptionRequest; - iotIdentityClient->SubscribeToCreateCertificateFromCsrAccepted( - createCertificateFromCsrSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onCreateCertificateFromCsrResponseAccepted, - onSubAck); - - auto onRejectedCsr = [&](ErrorResponse *response, int ioErr) { - (void)response; - if (ioErr) + else { - fprintf(stderr, "Error: onSubAck callback error %d\n", ioErr); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - exit(1); }; - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + auto onRegisterRejected = [](ErrorResponse *error, int ioErr) + { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); + fprintf( + stdout, + "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - keysRejectedCompletedPromise.set_value(); }; - iotIdentityClient->SubscribeToCreateCertificateFromCsrRejected( - createCertificateFromCsrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRejectedCsr, onKeysRejectedSubAck); - - auto onKeysPublishSubAck = [&](int ioErr) { + auto onRegisterPublishPubAck = [&ctx](int ioErr) + { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); exit(-1); } - keysPublishCompletedPromise.set_value(); + ctx.pubAckPromise.set_value(); }; - std::string csrContents = getFileData(input_csrPath.c_str()); - CreateCertificateFromCsrRequest createCertificateFromCsrRequest; - createCertificateFromCsrRequest.CertificateSigningRequest = csrContents.c_str(); + fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); + RegisterThingSubscriptionRequest registerSubscriptionRequest; + registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - iotIdentityClient->PublishCreateCertificateFromCsr( - createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); + identityClient->SubscribeToRegisterThingAccepted( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - // Subscribes to the register thing accepted and rejected topics - SubscribeToRegisterThing(input_templateName, iotIdentityClient); - std::this_thread::sleep_for(std::chrono::seconds(1)); + identityClient->SubscribeToRegisterThingRejected( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - // Verify the response is good - if (createCertificateFromCsrResponse == nullptr) - { - fprintf(stderr, "Error: 1 createCertificateFromCsrResponse is null\n"); - exit(-1); - } - // reset gotResponse future - gotResponse = std::promise(); + // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + fprintf(stdout, "Publishing to RegisterThing topic\n"); RegisterThingRequest registerThingRequest; - registerThingRequest.CertificateOwnershipToken = token; - registerThingRequest.TemplateName = input_templateName; + registerThingRequest.TemplateName = cmdData.input_templateName; - if (!input_templateParameters.empty()) - { - const Aws::Crt::String jsonValue = input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = Aws::Crt::Map(); + const Aws::Crt::String jsonValue = cmdData.input_templateParameters; + Aws::Crt::JsonObject value(jsonValue); + Map pm = value.View().GetAllObjects(); + Aws::Crt::Map params = Aws::Crt::Map(); - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - registerThingRequest.Parameters = params; + for (const auto &x : pm) + { + params.emplace(x.first, x.second.AsString()); } - auto onRegisterPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - registerPublishCompletedPromise.set_value(); - }; - iotIdentityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); + registerThingRequest.Parameters = params; + // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with + // a critical section. This sample makes only one request for a certificate, so no data race is possible. + registerThingRequest.CertificateOwnershipToken = token; - std::this_thread::sleep_for(std::chrono::seconds(1)); + identityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); + ctx.pubAckPromise.get_future().wait(); - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); + // Wait for registering a thing to succeed. + ctx.thingCreatedPromise.get_future().wait(); } int main(int argc, char *argv[]) { + /************************ Setup ****************************/ + // Do the global initialization for the API ApiHandle apiHandle; @@ -542,62 +550,78 @@ int main(int argc, char *argv[]) */ Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); - /** - * In a real world application you probably don't want to enforce synchronous behavior - * but this is a sample console application, so we'll just do that with a condition variable. - */ - std::promise connectionCompletedPromise; - std::promise connectionClosedPromise; - std::shared_ptr connection = nullptr; - std::shared_ptr client5 = nullptr; - std::shared_ptr iotIdentityClient = nullptr; + Mqtt5ClientContext mqtt5ClientContext; + std::shared_ptr mqtt5Client; + + Mqtt3ConnectionContext mqtt3ConnectionContext; + std::shared_ptr mqtt3Connection; + + std::shared_ptr identityClient; if (cmdData.input_mqtt_version == 5UL) { - iotIdentityClient = build_mqtt5_client(cmdData, client5, connectionCompletedPromise, connectionClosedPromise); + auto client = createMqtt5Client(cmdData, mqtt5ClientContext); + if (!client) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + return -1; + } + identityClient = std::make_shared(client); } else if (cmdData.input_mqtt_version == 3UL) { - iotIdentityClient = - build_mqtt3_client(cmdData, connection, connectionCompletedPromise, connectionClosedPromise); + mqtt3Connection = createMqtt3Connection(cmdData, mqtt3ConnectionContext); + if (!mqtt3Connection) + { + fprintf(stderr, "MQTT3 Connection failed to start"); + return -1; + } + identityClient = std::make_shared(mqtt3Connection); } else { fprintf(stderr, "MQTT Version not supported\n"); exit(-1); } - if (connectionCompletedPromise.get_future().get()) + + /************************ Run the sample ****************************/ + + // Create fleet provisioning client. + + // Create certificate. + CreateCertificateContext certificateContext; + if (cmdData.input_csrPath != "") { - if (cmdData.input_csrPath.empty()) - { - createKeysAndCertificateWorkflow( - cmdData.input_templateName, cmdData.input_templateParameters, iotIdentityClient); - } - else - { - createCertificateFromCsrWorkflow( - cmdData.input_templateName, cmdData.input_templateParameters, cmdData.input_csrPath, iotIdentityClient); - } + auto csrFile = getFileData(cmdData.input_csrPath); + createCertificateFromCsr(identityClient, certificateContext, csrFile); } - // Wait just a little bit to let the console print - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + else + { + createKeysAndCertificate(identityClient, certificateContext); + } + + // Wait for a certificate token to be obtained. + auto token = certificateContext.tokenPromise.get_future().get(); + // After certificate is obtained, it's time to register a thing. + RegisterThingContext registerThingContext; + registerThing(identityClient, registerThingContext, cmdData, token); + + // Disconnect if (cmdData.input_mqtt_version == 5UL) { - // Disconnect - if (client5->Stop() == true) + if (mqtt5Client->Stop()) { - connectionClosedPromise.get_future().wait(); + mqtt5ClientContext.stoppedPromise.get_future().wait(); } } - else - { // mqtt3 - - // Disconnect - if (connection->Disconnect() == true) + else if (cmdData.input_mqtt_version == 3UL) + { + if (mqtt3Connection->Disconnect()) { - connectionClosedPromise.get_future().wait(); + mqtt3ConnectionContext.connectionClosedPromise.get_future().wait(); } } + return 0; }