Skip to content

Commit

Permalink
job client demo
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 27, 2023
1 parent f9b2afc commit 18952e2
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 62 deletions.
2 changes: 2 additions & 0 deletions jobs/include/aws/iotjobs/IotJobsClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <aws/crt/Types.h>

#include <aws/crt/mqtt/MqttClient.h>
#include <aws/crt/mqtt/Mqtt5Client.h>

namespace Aws
{
Expand Down Expand Up @@ -80,6 +81,7 @@ namespace Aws
{
public:
IotJobsClient(const std::shared_ptr<Aws::Crt::Mqtt::MqttConnection> &connection);
IotJobsClient(const std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> &mqtt5client);

operator bool() const noexcept;
int GetLastError() const noexcept;
Expand Down
5 changes: 5 additions & 0 deletions jobs/source/IotJobsClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ namespace Aws
{
}

IotJobsClient::IotJobsClient(const std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> &mqtt5Client)
{
m_connection = Aws::Crt::Mqtt::MqttConnection::NewConnectionFromMqtt5Client(mqtt5Client);
}

IotJobsClient::operator bool() const noexcept { return m_connection && *m_connection; }

int IotJobsClient::GetLastError() const noexcept { return aws_last_error(); }
Expand Down
116 changes: 55 additions & 61 deletions samples/jobs/describe_job_execution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <aws/crt/io/HostResolver.h>

#include <aws/iot/MqttClient.h>
#include <aws/iot/Mqtt5Client.h>

#include <aws/iotjobs/DescribeJobExecutionRequest.h>
#include <aws/iotjobs/DescribeJobExecutionResponse.h>
Expand Down Expand Up @@ -40,80 +41,73 @@ int main(int argc, char *argv[])
*/
Utils::cmdData cmdData = Utils::parseSampleInputJobs(argc, argv, &apiHandle);

// Create the MQTT builder and populate it with data from cmdData.
auto clientConfigBuilder =
Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str());
clientConfigBuilder.WithEndpoint(cmdData.input_endpoint);
if (cmdData.input_ca != "")
// Create the MQTT5 builder and populate it with data from cmdData.
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());

// Check if the builder setup correctly.
if (builder == nullptr)
{
clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str());
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return -1;
}

// Create the MQTT connection from the MQTT builder
auto clientConfig = clientConfigBuilder.Build();
if (!clientConfig)
// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
fprintf(
stderr,
"Client Configuration initialization failed with error %s\n",
Aws::Crt::ErrorDebugString(clientConfig.LastError()));
exit(-1);
}
Aws::Iot::MqttClient client = Aws::Iot::MqttClient();
auto connection = client.NewConnection(clientConfig);
if (!*connection)
{
fprintf(
stderr,
"MQTT Connection Creation failed with error %s\n",
Aws::Crt::ErrorDebugString(connection->LastError()));
exit(-1);
builder->WithPort(static_cast<uint16_t>(cmdData.input_port));
}

/**
* In a real world application you probably don't want to enforce synchronous behavior
* but this is a sample console application, so we'll just do that with a condition variable.
*/
std::promise<bool> connectionCompletedPromise;
std::promise<void> connectionClosedPromise;

// Invoked when a MQTT connect has completed or failed
auto onConnectionCompleted = [&](Mqtt::MqttConnection &, int errorCode, Mqtt::ReturnCode returnCode, bool) {
if (errorCode)
{
fprintf(stdout, "Connection failed with error %s\n", ErrorDebugString(errorCode));
connectionCompletedPromise.set_value(false);
}
else
{
fprintf(stdout, "Connection completed with return code %d\n", returnCode);
connectionCompletedPromise.set_value(true);
}
};

// Invoked when a disconnect has been completed
auto onDisconnect = [&](Mqtt::MqttConnection & /*conn*/) {
{
fprintf(stdout, "Disconnect completed\n");
connectionClosedPromise.set_value();
}
};

connection->OnConnectionCompleted = std::move(onConnectionCompleted);
connection->OnDisconnect = std::move(onDisconnect);
std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;
std::promise<void> disconnectPromise;
std::promise<bool> subscribeSuccess;

// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionPromise.set_value(true);
});
builder->WithClientConnectionFailureCallback([&connectionPromise](
const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
stoppedPromise.set_value();
});
builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) {
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
});
builder->WithClientDisconnectionCallback([&disconnectPromise](const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
disconnectPromise.set_value();
});

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();

/************************ Run the sample ****************************/

fprintf(stdout, "Connecting...\n");
if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0))
if (!client->Start())
{
fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
fprintf(stderr, "MQTT5 Connection failed with error %s\n", ErrorDebugString(connection->LastError()));
exit(-1);
}

if (connectionCompletedPromise.get_future().get())
if (connectionPromise.get_future().get())
{
IotJobsClient jobsClient(connection);
IotJobsClient jobsClient(client);

DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest;
describeJobExecutionSubscriptionRequest.ThingName = cmdData.input_thingName;
Expand Down Expand Up @@ -187,9 +181,9 @@ int main(int argc, char *argv[])
std::this_thread::sleep_for(std::chrono::milliseconds(500));

// Disconnect
if (connection->Disconnect())
if (client->Stop())
{
connectionClosedPromise.get_future().wait();
stoppedPromise.get_future().wait();
}

return 0;
Expand Down

0 comments on commit 18952e2

Please sign in to comment.