Skip to content

Commit

Permalink
Fix memory and data race issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed May 1, 2024
1 parent 91d09b7 commit 72b2582
Showing 1 changed file with 113 additions and 105 deletions.
218 changes: 113 additions & 105 deletions servicetests/tests/JobsExecution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,114 @@
using namespace Aws::Crt;
using namespace Aws::Iotjobs;

void getAvailableJobs(
Aws::Crt::String thingName,
IotJobsClient &jobsClient,
std::vector<Aws::Crt::String> &availableJobs);
class AvailableJobsHandler
{
public:
void receiveAvailableJobs(const Aws::Crt::String &thingName, IotJobsClient &jobsClient)
{
GetPendingJobExecutionsSubscriptionRequest subscriptionRequest;
subscriptionRequest.ThingName = thingName;

auto handler = [this](Aws::Iotjobs::GetPendingJobExecutionsResponse *response, int ioErr) {
fprintf(stderr, "running the jobs handler\n");
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
if (response)
{
if (response->InProgressJobs.has_value())
{
for (const JobExecutionSummary &job : response->InProgressJobs.value())
{
std::lock_guard<std::mutex> lock(m_jobsMutex);
m_availableJobs.push_back(job.JobId.value());
fprintf(stderr, "In Progress jobs %s\n", job.JobId->c_str());
}
}
else
{
fprintf(stderr, "In Progress jobs: empty\n");
}
if (response->QueuedJobs.has_value())
{
for (const JobExecutionSummary &job : response->QueuedJobs.value())
{
std::lock_guard<std::mutex> lock(m_jobsMutex);
m_availableJobs.push_back(job.JobId.value());
fprintf(stderr, "Queued jobs %s\n", job.JobId->c_str());
}
}
else
{
fprintf(stderr, "Queued jobs: empty\n");
}
}
m_getResponse.set_value();
};

auto err_handler = [](Aws::Iotjobs::RejectedError *rejectedError, int ioErr) {
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
if (rejectedError)
{
fprintf(
stderr,
"Service Error %d occurred. Message %s\n",
(int)rejectedError->Code.value(),
rejectedError->Message->c_str());
}
fprintf(stderr, "Error handler\n");
exit(-1);
};

auto publishHandler = [this](int ioErr) {
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
m_publishDescribeJobExeCompletedPromise.set_value();
};

jobsClient.SubscribeToGetPendingJobExecutionsAccepted(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, handler, publishHandler);
m_publishDescribeJobExeCompletedPromise.get_future().wait();

m_publishDescribeJobExeCompletedPromise = std::promise<void>();
jobsClient.SubscribeToGetPendingJobExecutionsRejected(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, err_handler, publishHandler);
m_publishDescribeJobExeCompletedPromise.get_future().wait();

m_publishDescribeJobExeCompletedPromise = std::promise<void>();
GetPendingJobExecutionsRequest publishRequest;
publishRequest.ThingName = thingName;
jobsClient.PublishGetPendingJobExecutions(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);
m_publishDescribeJobExeCompletedPromise.get_future().wait();

if (m_getResponse.get_future().wait_for(std::chrono::seconds(10)) == std::future_status::timeout)
{
fprintf(stderr, "get available jobs error timedout\n");
exit(-1);
}
}

std::vector<Aws::Crt::String> getAvailableJobs() const
{
std::lock_guard<std::mutex> lock(m_jobsMutex);
return m_availableJobs;
}

private:
std::vector<Aws::Crt::String> m_availableJobs;
mutable std::mutex m_jobsMutex;
std::promise<void> m_getResponse;
std::promise<void> m_publishDescribeJobExeCompletedPromise;
};

std::shared_ptr<IotJobsClient> build_mqtt3_client(
Utils::cmdData &cmdData,
Expand Down Expand Up @@ -178,7 +282,7 @@ std::shared_ptr<IotJobsClient> build_mqtt5_client(
int main(int argc, char *argv[])
{

fprintf(stdout, "Starting the jobs execution programm\n");
fprintf(stdout, "Starting the jobs execution program\n");
/************************ Setup ****************************/

// Do the global initialization for the API
Expand Down Expand Up @@ -215,10 +319,11 @@ int main(int argc, char *argv[])
exit(-1);
}
/************************ Run the sample ****************************/
AvailableJobsHandler handler;
if (connectionCompletedPromise.get_future().get())
{
std::vector<Aws::Crt::String> availableJobs;
getAvailableJobs(cmdData.input_thingName, *jobsClient, availableJobs);
handler.receiveAvailableJobs(cmdData.input_thingName, *jobsClient);
auto availableJobs = handler.getAvailableJobs();
for (auto jobid : availableJobs)
{
DescribeJobExecutionSubscriptionRequest describeJobExecutionSubscriptionRequest;
Expand Down Expand Up @@ -298,7 +403,7 @@ int main(int argc, char *argv[])
};

jobsClient->PublishDescribeJobExecution(
std::move(describeJobExecutionRequest), AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);
describeJobExecutionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);
publishDescribeJobExeCompletedPromise.get_future().wait();

Aws::Crt::String currentJobId;
Expand Down Expand Up @@ -466,100 +571,3 @@ int main(int argc, char *argv[])
}
return 0;
}

void getAvailableJobs(
Aws::Crt::String thingName,
IotJobsClient &jobsClient,
std::vector<Aws::Crt::String> &availableJobs)
{
std::promise<void> getResponse;
std::promise<void> publishDescribeJobExeCompletedPromise;

GetPendingJobExecutionsSubscriptionRequest subscriptionRequest;
subscriptionRequest.ThingName = thingName;

auto handler = [&](Aws::Iotjobs::GetPendingJobExecutionsResponse *response, int ioErr) {
fprintf(stderr, "running the jobs handler\n");
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
if (response)
{
if (response->InProgressJobs.has_value())
{
for (JobExecutionSummary job : response->InProgressJobs.value())
{
availableJobs.push_back(job.JobId.value());
fprintf(stderr, "In Progress jobs %s\n", job.JobId->c_str());
}
}
else
{
fprintf(stderr, "In Progress jobs: empty\n");
}
if (response->QueuedJobs.has_value())
{
for (JobExecutionSummary job : response->QueuedJobs.value())
{
availableJobs.push_back(job.JobId.value());
fprintf(stderr, "Queued jobs %s\n", job.JobId->c_str());
}
}
else
{
fprintf(stderr, "Queued jobs: empty\n");
}
}
getResponse.set_value();
};

auto err_handler = [&](Aws::Iotjobs::RejectedError *rejectedError, int ioErr) {
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
if (rejectedError)
{
fprintf(
stderr,
"Service Error %d occurred. Message %s\n",
(int)rejectedError->Code.value(),
rejectedError->Message->c_str());
}
fprintf(stderr, "Error handler\n");
exit(-1);
};

auto publishHandler = [&](int ioErr) {
if (ioErr)
{
fprintf(stderr, "Error %d occurred\n", ioErr);
exit(1);
}
publishDescribeJobExeCompletedPromise.set_value();
};

jobsClient.SubscribeToGetPendingJobExecutionsAccepted(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, handler, publishHandler);
publishDescribeJobExeCompletedPromise.get_future().wait();

publishDescribeJobExeCompletedPromise = std::promise<void>();
jobsClient.SubscribeToGetPendingJobExecutionsRejected(
subscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, err_handler, publishHandler);
publishDescribeJobExeCompletedPromise.get_future().wait();

publishDescribeJobExeCompletedPromise = std::promise<void>();
GetPendingJobExecutionsRequest publishRequest;
publishRequest.ThingName = thingName;
jobsClient.PublishGetPendingJobExecutions(publishRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, publishHandler);
publishDescribeJobExeCompletedPromise.get_future().wait();

if (getResponse.get_future().wait_for(std::chrono::seconds(10)) == std::future_status::timeout)
{
fprintf(stderr, "get available jobs error timedout\n");
exit(-1);
}
}

0 comments on commit 72b2582

Please sign in to comment.