diff --git a/samples/device_defender/mqtt5_basic_report/main.cpp b/samples/device_defender/mqtt5_basic_report/main.cpp index 61f2bf522..6bbddf391 100644 --- a/samples/device_defender/mqtt5_basic_report/main.cpp +++ b/samples/device_defender/mqtt5_basic_report/main.cpp @@ -73,8 +73,19 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputDeviceDefender(argc, argv, &apiHandle); // Create the MQTT builder and populate it with data from cmdData. - auto clientConfigBuilder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + auto clientConfigBuilder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); + if (clientConfigBuilder == nullptr) + { + fprintf( + stdout, + "Failed to setup MQTT5 client builder with error code %d: %s", + LastError(), + ErrorDebugString(LastError())); + return -1; + } + if (cmdData.input_ca != "") { clientConfigBuilder->WithCertificateAuthority(cmdData.input_ca.c_str()); @@ -119,9 +130,6 @@ int main(int argc, char *argv[]) // Create Mqtt5Client std::shared_ptr client = clientConfigBuilder->Build(); - // Clean up the builder - delete clientConfigBuilder; - if (client == nullptr) { fprintf( diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/fleet_provisioning/fleet_provisioning/main.cpp index 02b307f7e..cc1f5bcaf 100644 --- a/samples/fleet_provisioning/fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/fleet_provisioning/main.cpp @@ -426,7 +426,7 @@ int main(int argc, char *argv[]) { /************************ Setup ****************************/ - // Do the global initialization for the API + // Do the global initialization for the API ApiHandle apiHandle; /** diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp index 3ea7ef274..f065fcfd8 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp +++ b/samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp @@ -79,11 +79,12 @@ struct RegisterThingContext /** * Create MQTT5 client. */ -std::shared_ptr createMqtt5Client(Mqtt5ClientContext &ctx, const Utils::cmdData &cmdData) +std::shared_ptr createMqtt5Client(const Utils::cmdData &cmdData, Mqtt5ClientContext &ctx) { // Create the MQTT5 builder and populate it with data from cmdData. - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); // Check if the builder setup correctly. if (builder == nullptr) @@ -128,7 +129,6 @@ std::shared_ptr createMqtt5Client(Mqtt5ClientConte // Create Mqtt5Client std::shared_ptr client = builder->Build(); - delete builder; return client; } @@ -418,7 +418,7 @@ int main(int argc, char *argv[]) { /************************ Setup ****************************/ - // Do the global initialization for the API + // Do the global initialization for the API ApiHandle apiHandle; /** @@ -429,7 +429,7 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); Mqtt5ClientContext mqtt5ClientContext; - auto client = createMqtt5Client(mqtt5ClientContext, cmdData); + auto client = createMqtt5Client(cmdData, mqtt5ClientContext); /************************ Run the sample ****************************/ diff --git a/samples/jobs/mqtt5_job_execution/main.cpp b/samples/jobs/mqtt5_job_execution/main.cpp index 65e91d520..d3e3dbcd8 100644 --- a/samples/jobs/mqtt5_job_execution/main.cpp +++ b/samples/jobs/mqtt5_job_execution/main.cpp @@ -57,8 +57,9 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle); // Create the MQTT5 builder and populate it with data from cmdData. - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); // Check if the builder setup correctly. if (builder == nullptr) @@ -101,7 +102,6 @@ int main(int argc, char *argv[]) // Create Mqtt5Client std::shared_ptr client = builder->Build(); - delete builder; /************************ Run the sample ****************************/ fprintf(stdout, "Connecting...\n"); diff --git a/samples/mqtt5/mqtt5_pubsub/main.cpp b/samples/mqtt5/mqtt5_pubsub/main.cpp index 9f65cae4e..479146c5a 100644 --- a/samples/mqtt5/mqtt5_pubsub/main.cpp +++ b/samples/mqtt5/mqtt5_pubsub/main.cpp @@ -34,8 +34,9 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputPubSub(argc, argv, &apiHandle, "mqtt5-pubsub"); // Create the MQTT5 builder and populate it with data from cmdData. - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); // Check if the builder setup correctly. if (builder == nullptr) @@ -107,9 +108,6 @@ int main(int argc, char *argv[]) // Create Mqtt5Client std::shared_ptr client = builder->Build(); - // Clean up the builder - delete builder; - if (client == nullptr) { fprintf( diff --git a/samples/mqtt5/mqtt5_shared_subscription/main.cpp b/samples/mqtt5/mqtt5_shared_subscription/main.cpp index 998792ae9..6f8085699 100644 --- a/samples/mqtt5/mqtt5_shared_subscription/main.cpp +++ b/samples/mqtt5/mqtt5_shared_subscription/main.cpp @@ -45,8 +45,9 @@ class sample_mqtt5_client std::shared_ptr result = std::make_shared(); result->name = input_clientName; - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - input_endpoint, input_cert.c_str(), input_key.c_str()); + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + input_endpoint, input_cert.c_str(), input_key.c_str())); if (builder == nullptr) { return nullptr; @@ -136,7 +137,6 @@ class sample_mqtt5_client }); result->client = builder->Build(); - delete builder; return result; } }; diff --git a/samples/secure_tunneling/secure_tunnel/main.cpp b/samples/secure_tunneling/secure_tunnel/main.cpp index 4f39c3c10..cbae47690 100644 --- a/samples/secure_tunneling/secure_tunnel/main.cpp +++ b/samples/secure_tunneling/secure_tunnel/main.cpp @@ -133,6 +133,7 @@ int main(int argc, char *argv[]) * In a real world application you probably don't want to enforce synchronous behavior * but this is a sample console application, so we'll just do that with a condition variable. */ + std::promise clientConnectedPromise; std::promise clientStoppedPromise; // service id storage for use in sample @@ -285,6 +286,7 @@ int main(int argc, char *argv[]) fprintf(stdout, "Sending Stream Start request\n"); secureTunnel->SendStreamStart(); } + clientConnectedPromise.set_value(); } }); @@ -345,6 +347,8 @@ int main(int argc, char *argv[]) exit(-1); } + clientConnectedPromise.get_future().wait_for(std::chrono::seconds(5)); + /* * In Destination mode the Secure Tunnel Client will remain open and echo messages that come in. * In Source mode the Secure Tunnel Client will send 4 messages and then disconnect and terminate. diff --git a/samples/shadow/mqtt5_shadow_sync/main.cpp b/samples/shadow/mqtt5_shadow_sync/main.cpp index 12bc00353..b55b1d1cf 100644 --- a/samples/shadow/mqtt5_shadow_sync/main.cpp +++ b/samples/shadow/mqtt5_shadow_sync/main.cpp @@ -104,9 +104,9 @@ int main(int argc, char *argv[]) Utils::cmdData cmdData = Utils::parseSampleInputShadow(argc, argv, &apiHandle); // Create the MQTT5 builder and populate it with data from cmdData. - Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); // Check if the builder setup correctly. if (builder == nullptr) { @@ -148,7 +148,6 @@ int main(int argc, char *argv[]) // Create Mqtt5Client std::shared_ptr client = builder->Build(); - delete builder; /************************ Run the sample ****************************/ fprintf(stdout, "Connecting...\n"); diff --git a/servicetests/tests/FleetProvisioning/main.cpp b/servicetests/tests/FleetProvisioning/main.cpp index ff4c042aa..f99bd17e7 100644 --- a/servicetests/tests/FleetProvisioning/main.cpp +++ b/servicetests/tests/FleetProvisioning/main.cpp @@ -2,11 +2,9 @@ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ + #include #include -#include -#include - #include #include #include @@ -23,515 +21,500 @@ #include #include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include #include "../../../samples/utils/CommandLineUtils.h" using namespace Aws::Crt; using namespace Aws::Iotidentity; -static std::promise gotResponse; - -static std::string getFileData(std::string const &fileName) +static String getFileData(const String &fileName) { - std::ifstream ifs(fileName); + std::ifstream ifs(fileName.c_str()); std::string str; getline(ifs, str, (char)ifs.eof()); - return str; + return str.c_str(); } -std::shared_ptr build_mqtt3_client( - Utils::cmdData &cmdData, - std::shared_ptr &connection, - std::promise &connectionCompletedPromise, - std::promise &connectionClosedPromise) +/** + * Auxiliary structure for holding data used by MQTT3 connection. + */ +struct Mqtt3ConnectionContext { - Aws::Iot::MqttClientConnectionConfigBuilder clientConfigBuilder; - // Create the MQTT builder and populate it with data from cmdData. - clientConfigBuilder = - Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); - if (cmdData.input_ca != "") + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; +}; + +/** + * Auxiliary structure for holding data used by MQTT5 connection. + */ +struct Mqtt5ClientContext +{ + std::promise connectionPromise; + std::promise stoppedPromise; + std::promise disconnectPromise; + std::promise subscribeSuccess; +}; + +/** + * Auxiliary structure for holding data used when creating a certificate. + */ +struct CreateCertificateContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise tokenPromise; +}; + +/** + * Auxiliary structure for holding data used when registering a thing. + */ +struct RegisterThingContext +{ + std::promise pubAckPromise; + std::promise acceptedSubAckPromise; + std::promise rejectedSubAckPromise; + std::promise thingCreatedPromise; +}; + +/** + * Create MQTT5 client. + */ +std::shared_ptr createMqtt5Client(const Utils::cmdData &cmdData, Mqtt5ClientContext &ctx) +{ + // Create the MQTT5 builder and populate it with data from cmdData. + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); + + // Check if the builder setup correctly. + if (builder == nullptr) { - clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return nullptr; } - // Create the MQTT connection from the MQTT builder - auto clientConfig = clientConfigBuilder.Build(); - if (!clientConfig) + // Setup connection options + std::shared_ptr connectOptions = std::make_shared(); + connectOptions->WithClientId(cmdData.input_clientId); + builder->WithConnectOptions(connectOptions); + if (cmdData.input_port != 0) { + builder->WithPort(static_cast(cmdData.input_port)); + } + + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback([&ctx](const Mqtt5::OnConnectionSuccessEventData &eventData) { fprintf( - stderr, - "Client Configuration initialization failed with error %s\n", - Aws::Crt::ErrorDebugString(clientConfig.LastError())); - exit(-1); + stdout, + "Mqtt5 Client connection succeed, clientid: %s.\n", + eventData.negotiatedSettings->getClientId().c_str()); + ctx.connectionPromise.set_value(true); + }); + builder->WithClientConnectionFailureCallback([&ctx](const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.connectionPromise.set_value(false); + }); + builder->WithClientStoppedCallback([&ctx](const Mqtt5::OnStoppedEventData &) { + fprintf(stdout, "Mqtt5 Client stopped.\n"); + ctx.stoppedPromise.set_value(); + }); + builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) { + fprintf(stdout, "Mqtt5 Client attempting connection...\n"); + }); + builder->WithClientDisconnectionCallback([&ctx](const Mqtt5::OnDisconnectionEventData &eventData) { + fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode)); + ctx.disconnectPromise.set_value(); + }); + + // Create Mqtt5Client + auto client = builder->Build(); + + fprintf(stdout, "Connecting...\n"); + if (!client->Start()) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + return nullptr; } - Aws::Iot::MqttClient client3 = Aws::Iot::MqttClient(); - connection = client3.NewConnection(clientConfig); - if (!*connection) + if (!ctx.connectionPromise.get_future().get()) { - fprintf( - stderr, - "MQTT Connection Creation failed with error %s\n", - Aws::Crt::ErrorDebugString(connection->LastError())); - exit(-1); + return nullptr; } + return client; +} + +/** + * Create MQTT3 connection. + */ +std::shared_ptr createMqtt3Connection(const Utils::cmdData &cmdData, Mqtt3ConnectionContext &ctx) +{ + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a promise. + */ + // Invoked when a MQTT connect has completed or failed - auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { + auto onConnectionCompleted = [&ctx](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) { if (errorCode) { fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode)); - connectionCompletedPromise.set_value(false); + ctx.connectionCompletedPromise.set_value(false); } else { fprintf(stdout, "Connection completed with return code %d\n", returnCode); - connectionCompletedPromise.set_value(true); + ctx.connectionCompletedPromise.set_value(true); } }; // Invoked when a disconnect has been completed - auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) { + auto onDisconnect = [&ctx](Mqtt::MqttConnection & /*conn*/) { { fprintf(stdout, "Disconnect completed\n"); - connectionClosedPromise.set_value(); + ctx.connectionClosedPromise.set_value(); } }; - connection->OnConnectionCompleted = std::move(onConnectionCompleted); - connection->OnDisconnect = std::move(onDisconnect); - - if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + // Create the MQTT builder and populate it with data from cmdData. + auto clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") { - fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); - exit(-1); + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); } - return std::make_shared(connection); -} -std::shared_ptr build_mqtt5_client( - Utils::cmdData &cmdData, - std::shared_ptr &client5, - std::promise &connectionCompletedPromise, - std::promise &connectionClosedPromise) -{ - std::shared_ptr builder( - Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); - - // Check if the builder setup correctly. - if (builder == nullptr) + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) { - printf( - "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); return nullptr; } - // Create the MQTT5 builder and populate it with data from cmdData. - // Setup connection options - std::shared_ptr connectOptions = std::make_shared(); - connectOptions->WithClientId(cmdData.input_clientId); - builder->WithConnectOptions(connectOptions); - if (cmdData.input_port != 0) + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + if (!*connection) { - builder->WithPort(static_cast(cmdData.input_port)); + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + return nullptr; } - // Setup lifecycle callbacks - builder->WithClientConnectionSuccessCallback( - [&connectionCompletedPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) { - fprintf( - stdout, - "Mqtt5 Client connection succeed, clientid: %s.\n", - eventData.negotiatedSettings->getClientId().c_str()); - connectionCompletedPromise.set_value(true); - }); - builder->WithClientConnectionFailureCallback([&connectionCompletedPromise]( - const Mqtt5::OnConnectionFailureEventData &eventData) { - fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); - connectionCompletedPromise.set_value(false); - }); - builder->WithClientStoppedCallback([&connectionClosedPromise](const Mqtt5::OnStoppedEventData &) { - fprintf(stdout, "Mqtt5 Client stopped.\n"); - connectionClosedPromise.set_value(); - }); - client5 = builder->Build(); - if (client5 == nullptr) + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + fprintf(stdout, "Connecting...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) { - fprintf( - stdout, "Failed to Init Mqtt5Client with error code %d: %s.\n", LastError(), ErrorDebugString(LastError())); - exit(-1); + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); + return nullptr; } - if (!client5->Start()) + if (!ctx.connectionCompletedPromise.get_future().get()) { - fprintf(stderr, "MQTT5 Connection failed to start"); - exit(-1); + return nullptr; } - return std::make_shared(client5); + + return connection; } -void SubscribeToRegisterThing(String input_templateName, std::shared_ptr iotIdentityClient) +/** + * Keys-and-Certificate workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void createKeysAndCertificate(const std::shared_ptr &identityClient, CreateCertificateContext &ctx) { - std::promise onSubAckPromise; - std::promise registerRejectedCompletedPromise; - std::promise registerAcceptedCompletedPromise; - - auto onRegisterThingAccepted = [&](RegisterThingResponse *response, int ioErr) { - if (ioErr) + auto onKeysPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onSuback callback error %d\n", ioErr); + fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response) + ctx.pubAckPromise.set_value(); + }; + + auto onKeysAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stdout, "Register thing: %s\n", response->ThingName.value().c_str()); + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); } + ctx.acceptedSubAckPromise.set_value(); }; - RegisterThingSubscriptionRequest registerThingSubscriptionRequest; - registerThingSubscriptionRequest.TemplateName = input_templateName; - - auto onRegisterAcceptedSubAck = [&](int ioErr) { + auto onKeysRejectedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - registerAcceptedCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - iotIdentityClient->SubscribeToRegisterThingAccepted( - registerThingSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onRegisterThingAccepted, - onRegisterAcceptedSubAck); - - auto rejectedHandler = [&](ErrorResponse *error, int ioErr) { - if (ioErr) + auto onKeysAccepted = [&ctx](CreateKeysAndCertificateResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: rejectedHandler callback error %d\n", ioErr); - exit(-1); + fprintf(stdout, "CreateKeysAndCertificateResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); } else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + }; + + auto onKeysRejected = [](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { fprintf( stdout, - "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", *error->StatusCode, error->ErrorMessage->c_str(), error->ErrorCode->c_str()); + exit(-1); } - }; - auto onRegisterRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + else { - fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - registerRejectedCompletedPromise.set_value(); }; - iotIdentityClient->SubscribeToRegisterThingRejected( - registerThingSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, rejectedHandler, onRegisterRejectedSubAck); - - registerAcceptedCompletedPromise.get_future().wait(); - registerRejectedCompletedPromise.get_future().wait(); -} -void createKeysAndCertificateWorkflow( - String &input_templateName, - String &input_templateParameters, - std::shared_ptr iotIdentityClient) -{ - std::promise onSubAckPromise; + fprintf(stdout, "Subscribing to CreateKeysAndCertificate Accepted and Rejected topics\n"); + CreateKeysAndCertificateSubscriptionRequest keySubscriptionRequest; + identityClient->SubscribeToCreateKeysAndCertificateAccepted( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysAccepted, onKeysAcceptedSubAck); + identityClient->SubscribeToCreateKeysAndCertificateRejected( + keySubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysRejected, onKeysRejectedSubAck); - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; + // Wait for the subscriptions to the accept and reject keys-and-certificate topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; - - CreateKeysAndCertificateResponse *createKeysAndCertificateResponse = nullptr; - String token; + // Now, when we subscribed to the keys and certificate topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateKeysAndCertificate topic\n"); + CreateKeysAndCertificateRequest createKeysAndCertificateRequest; + identityClient->PublishCreateKeysAndCertificate( + createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishPubAck); + ctx.pubAckPromise.get_future().wait(); +} - auto acceptedHandler = [&createKeysAndCertificateResponse, - &token](CreateKeysAndCertificateResponse *response, int ioErr) { - fprintf(stderr, "acceptedhandler is called\n"); - if (ioErr) +/** + * Certificate-from-CSR workflow. + * + * @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the + * callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of + * CreateCertificateContext is used to store variables used by the callbacks. + */ +void createCertificateFromCsr( + const std::shared_ptr &identityClient, + CreateCertificateContext &ctx, + const String &csrFile) +{ + auto onCsrPublishPubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: acceptedHandler callback error %d\n", ioErr); + fprintf(stderr, "Error publishing to CreateCertificateFromCsr: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response) + ctx.pubAckPromise.set_value(); + }; + + auto onCsrAcceptedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) { - if (createKeysAndCertificateResponse == nullptr) - { - createKeysAndCertificateResponse = response; - token = *response->CertificateOwnershipToken; - } + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); } - gotResponse.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; - auto onKeysAcceptedSubAck = [&](int ioErr) { + auto onCsrRejectedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate accepted: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to CreateCertificateFromCsr rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - keysAcceptedCompletedPromise.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - CreateKeysAndCertificateSubscriptionRequest createKeysAndCertificateSubscriptionRequest; - iotIdentityClient->SubscribeToCreateKeysAndCertificateAccepted( - createKeysAndCertificateSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, acceptedHandler, onKeysAcceptedSubAck); - - auto rejectedHandler = [&](Aws::Iotidentity::ErrorResponse *error, int ioErr) { - if (ioErr == AWS_OP_ERR) + auto onCsrAccepted = [&ctx](CreateCertificateFromCsrResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: rejectedHandler callback error %s\n", ErrorDebugString(ioErr)); + fprintf(stdout, "CreateCertificateFromCsrResponse certificateId: %s.\n", response->CertificateId->c_str()); + ctx.tokenPromise.set_value(*response->CertificateOwnershipToken); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - if (error) + }; + + auto onCsrRejected = [](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { fprintf( stdout, - "CreateKeysAndCertificate failed with statusCode %d, errorMessage %s and errorCode %s.", + "CreateCertificateFromCsr failed with statusCode %d, errorMessage %s and errorCode %s.", *error->StatusCode, error->ErrorMessage->c_str(), error->ErrorCode->c_str()); - } - exit(-1); - }; - - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - keysRejectedCompletedPromise.set_value(); - }; - iotIdentityClient->SubscribeToCreateKeysAndCertificateRejected( - createKeysAndCertificateSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, rejectedHandler, onKeysRejectedSubAck); - - auto onKeysPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + else { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - - keysPublishCompletedPromise.set_value(); }; - CreateKeysAndCertificateRequest createKeysAndCertificateRequest; - iotIdentityClient->PublishCreateKeysAndCertificate( - createKeysAndCertificateRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); - - SubscribeToRegisterThing(input_templateName, iotIdentityClient); - std::this_thread::sleep_for(std::chrono::seconds(1)); - - // Verify the response is good - if (createKeysAndCertificateResponse == nullptr) - { - fprintf(stderr, "Error: 1 createKeysAndCertificateResponse is null\n"); - exit(-1); - } - // reset gotResponse future - gotResponse = std::promise(); + // CreateCertificateFromCsr workflow + fprintf(stdout, "Subscribing to CreateCertificateFromCsr Accepted and Rejected topics\n"); + CreateCertificateFromCsrSubscriptionRequest csrSubscriptionRequest; + identityClient->SubscribeToCreateCertificateFromCsrAccepted( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrAccepted, onCsrAcceptedSubAck); - RegisterThingRequest registerThingRequest; - registerThingRequest.TemplateName = input_templateName; + identityClient->SubscribeToCreateCertificateFromCsrRejected( + csrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrRejected, onCsrRejectedSubAck); - if (!input_templateParameters.empty()) - { - const Aws::Crt::String jsonValue = input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = Aws::Crt::Map(); + // Wait for the subscriptions to the accept and reject certificates topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - registerThingRequest.Parameters = params; - } - registerThingRequest.CertificateOwnershipToken = token; + // Now, when we subscribed to the certificates topics, we can make a request for a certificate. + fprintf(stdout, "Publishing to CreateCertificateFromCsr topic\n"); + CreateCertificateFromCsrRequest createCertificateFromCsrRequest; + createCertificateFromCsrRequest.CertificateSigningRequest = csrFile; + identityClient->PublishCreateCertificateFromCsr( + createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onCsrPublishPubAck); + ctx.pubAckPromise.get_future().wait(); +} - auto onRegisterPublishSubAck = [&](int ioErr) { +/** + * Provision an AWS IoT thing using a pre-defined template. + */ +void registerThing( + const std::shared_ptr &identityClient, + RegisterThingContext &ctx, + const Utils::cmdData &cmdData, + const String &token) +{ + auto onRegisterAcceptedSubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr)); exit(-1); } - - registerPublishCompletedPromise.set_value(); + ctx.acceptedSubAckPromise.set_value(); }; - iotIdentityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); - std::this_thread::sleep_for(std::chrono::seconds(1)); - - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); -} -void createCertificateFromCsrWorkflow( - String input_templateName, - String input_templateParameters, - String input_csrPath, - std::shared_ptr iotIdentityClient) -{ - std::promise keysPublishCompletedPromise; - std::promise keysAcceptedCompletedPromise; - std::promise keysRejectedCompletedPromise; - - std::promise registerPublishCompletedPromise; - std::promise registerAcceptedCompletedPromise; - CreateCertificateFromCsrResponse *createCertificateFromCsrResponse = nullptr; - String token; - - gotResponse = std::promise(); - - auto onCreateCertificateFromCsrResponseAccepted = [&createCertificateFromCsrResponse, - &token](CreateCertificateFromCsrResponse *response, int ioErr) { - if (ioErr) + auto onRegisterRejectedSubAck = [&ctx](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onCreateCertificateFromCsrResponseAccepted callback error %d\n", ioErr); + fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr)); exit(-1); } - if (response != nullptr) - { - if (createCertificateFromCsrResponse == nullptr) - { - createCertificateFromCsrResponse = response; - token = *response->CertificateOwnershipToken; - } - } - gotResponse.set_value(); + ctx.rejectedSubAckPromise.set_value(); }; - auto onSubAck = [&](int ioErr) { - if (ioErr) + auto onRegisterAccepted = [&ctx](RegisterThingResponse *response, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error: onSubAck callback error %d\n", ioErr); - exit(-1); + fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str()); + ctx.thingCreatedPromise.set_value(); } - keysAcceptedCompletedPromise.set_value(); - }; - - CreateCertificateFromCsrSubscriptionRequest createCertificateFromCsrSubscriptionRequest; - iotIdentityClient->SubscribeToCreateCertificateFromCsrAccepted( - createCertificateFromCsrSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onCreateCertificateFromCsrResponseAccepted, - onSubAck); - - auto onRejectedCsr = [&](ErrorResponse *response, int ioErr) { - (void)response; - if (ioErr) + else { - fprintf(stderr, "Error: onSubAck callback error %d\n", ioErr); + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - exit(1); }; - auto onKeysRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) + auto onRegisterRejected = [](ErrorResponse *error, int ioErr) { + if (ioErr == AWS_OP_SUCCESS) { - fprintf(stderr, "Error subscribing to CreateKeysAndCertificate rejected: %s\n", ErrorDebugString(ioErr)); + fprintf( + stdout, + "RegisterThing failed with statusCode %d, errorMessage %s and errorCode %s.", + *error->StatusCode, + error->ErrorMessage->c_str(), + error->ErrorCode->c_str()); + } + else + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); exit(-1); } - keysRejectedCompletedPromise.set_value(); }; - iotIdentityClient->SubscribeToCreateCertificateFromCsrRejected( - createCertificateFromCsrSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRejectedCsr, onKeysRejectedSubAck); - - auto onKeysPublishSubAck = [&](int ioErr) { + auto onRegisterPublishPubAck = [&ctx](int ioErr) { if (ioErr != AWS_OP_SUCCESS) { - fprintf(stderr, "Error publishing to CreateKeysAndCertificate: %s\n", ErrorDebugString(ioErr)); + fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); exit(-1); } - keysPublishCompletedPromise.set_value(); + ctx.pubAckPromise.set_value(); }; - std::string csrContents = getFileData(input_csrPath.c_str()); - CreateCertificateFromCsrRequest createCertificateFromCsrRequest; - createCertificateFromCsrRequest.CertificateSigningRequest = csrContents.c_str(); + fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n"); + RegisterThingSubscriptionRequest registerSubscriptionRequest; + registerSubscriptionRequest.TemplateName = cmdData.input_templateName; - iotIdentityClient->PublishCreateCertificateFromCsr( - createCertificateFromCsrRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onKeysPublishSubAck); + identityClient->SubscribeToRegisterThingAccepted( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterAccepted, onRegisterAcceptedSubAck); - // Subscribes to the register thing accepted and rejected topics - SubscribeToRegisterThing(input_templateName, iotIdentityClient); - std::this_thread::sleep_for(std::chrono::seconds(1)); + identityClient->SubscribeToRegisterThingRejected( + registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck); - // Verify the response is good - if (createCertificateFromCsrResponse == nullptr) - { - fprintf(stderr, "Error: 1 createCertificateFromCsrResponse is null\n"); - exit(-1); - } - // reset gotResponse future - gotResponse = std::promise(); + // Wait for the subscriptions to the accept and reject RegisterThing topics to be established. + ctx.acceptedSubAckPromise.get_future().wait(); + ctx.rejectedSubAckPromise.get_future().wait(); + fprintf(stdout, "Publishing to RegisterThing topic\n"); RegisterThingRequest registerThingRequest; - registerThingRequest.CertificateOwnershipToken = token; - registerThingRequest.TemplateName = input_templateName; + registerThingRequest.TemplateName = cmdData.input_templateName; - if (!input_templateParameters.empty()) - { - const Aws::Crt::String jsonValue = input_templateParameters; - Aws::Crt::JsonObject value(jsonValue); - Map pm = value.View().GetAllObjects(); - Aws::Crt::Map params = Aws::Crt::Map(); + const Aws::Crt::String jsonValue = cmdData.input_templateParameters; + Aws::Crt::JsonObject value(jsonValue); + Map pm = value.View().GetAllObjects(); + Aws::Crt::Map params = Aws::Crt::Map(); - for (const auto &x : pm) - { - params.emplace(x.first, x.second.AsString()); - } - registerThingRequest.Parameters = params; + for (const auto &x : pm) + { + params.emplace(x.first, x.second.AsString()); } - auto onRegisterPublishSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - - registerPublishCompletedPromise.set_value(); - }; - iotIdentityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishSubAck); + registerThingRequest.Parameters = params; + // NOTE: In a real application creating multiple certificates you'll probably need to protect token var with + // a critical section. This sample makes only one request for a certificate, so no data race is possible. + registerThingRequest.CertificateOwnershipToken = token; - std::this_thread::sleep_for(std::chrono::seconds(1)); + identityClient->PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck); + ctx.pubAckPromise.get_future().wait(); - keysPublishCompletedPromise.get_future().wait(); - keysAcceptedCompletedPromise.get_future().wait(); - keysRejectedCompletedPromise.get_future().wait(); - registerPublishCompletedPromise.get_future().wait(); + // Wait for registering a thing to succeed. + ctx.thingCreatedPromise.get_future().wait(); } int main(int argc, char *argv[]) { + /************************ Setup ****************************/ + // Do the global initialization for the API ApiHandle apiHandle; @@ -542,62 +525,78 @@ int main(int argc, char *argv[]) */ Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle); - /** - * In a real world application you probably don't want to enforce synchronous behavior - * but this is a sample console application, so we'll just do that with a condition variable. - */ - std::promise connectionCompletedPromise; - std::promise connectionClosedPromise; - std::shared_ptr connection = nullptr; - std::shared_ptr client5 = nullptr; - std::shared_ptr iotIdentityClient = nullptr; + Mqtt5ClientContext mqtt5ClientContext; + std::shared_ptr mqtt5Client; + + Mqtt3ConnectionContext mqtt3ConnectionContext; + std::shared_ptr mqtt3Connection; + + std::shared_ptr identityClient; if (cmdData.input_mqtt_version == 5UL) { - iotIdentityClient = build_mqtt5_client(cmdData, client5, connectionCompletedPromise, connectionClosedPromise); + mqtt5Client = createMqtt5Client(cmdData, mqtt5ClientContext); + if (!mqtt5Client) + { + fprintf(stderr, "MQTT5 Connection failed to start"); + return -1; + } + identityClient = std::make_shared(mqtt5Client); } else if (cmdData.input_mqtt_version == 3UL) { - iotIdentityClient = - build_mqtt3_client(cmdData, connection, connectionCompletedPromise, connectionClosedPromise); + mqtt3Connection = createMqtt3Connection(cmdData, mqtt3ConnectionContext); + if (!mqtt3Connection) + { + fprintf(stderr, "MQTT3 Connection failed to start"); + return -1; + } + identityClient = std::make_shared(mqtt3Connection); } else { fprintf(stderr, "MQTT Version not supported\n"); - exit(-1); + return -1; } - if (connectionCompletedPromise.get_future().get()) + + /************************ Run the sample ****************************/ + + // Create fleet provisioning client. + + // Create certificate. + CreateCertificateContext certificateContext; + if (cmdData.input_csrPath != "") { - if (cmdData.input_csrPath.empty()) - { - createKeysAndCertificateWorkflow( - cmdData.input_templateName, cmdData.input_templateParameters, iotIdentityClient); - } - else - { - createCertificateFromCsrWorkflow( - cmdData.input_templateName, cmdData.input_templateParameters, cmdData.input_csrPath, iotIdentityClient); - } + auto csrFile = getFileData(cmdData.input_csrPath); + createCertificateFromCsr(identityClient, certificateContext, csrFile); + } + else + { + createKeysAndCertificate(identityClient, certificateContext); } - // Wait just a little bit to let the console print - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // Wait for a certificate token to be obtained. + auto token = certificateContext.tokenPromise.get_future().get(); + + // After certificate is obtained, it's time to register a thing. + RegisterThingContext registerThingContext; + registerThing(identityClient, registerThingContext, cmdData, token); + + // Disconnect if (cmdData.input_mqtt_version == 5UL) { - // Disconnect - if (client5->Stop() == true) + if (mqtt5Client->Stop()) { - connectionClosedPromise.get_future().wait(); + mqtt5ClientContext.stoppedPromise.get_future().wait(); } } - else - { // mqtt3 - - // Disconnect - if (connection->Disconnect() == true) + else if (cmdData.input_mqtt_version == 3UL) + { + if (mqtt3Connection->Disconnect()) { - connectionClosedPromise.get_future().wait(); + mqtt3ConnectionContext.connectionClosedPromise.get_future().wait(); } } + return 0; }