Skip to content

Commit

Permalink
Add topic argument to publish callback in request-response stream client
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jan 29, 2025
1 parent 6a55ecc commit 7627b48
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
28 changes: 26 additions & 2 deletions include/aws/iot/MqttRequestResponseClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,28 @@ namespace Aws
/**
* Default constructor
*/
IncomingPublishEvent() : m_payload() { AWS_ZERO_STRUCT(m_payload); }
IncomingPublishEvent() : m_topic(), m_payload()
{
AWS_ZERO_STRUCT(m_topic);
AWS_ZERO_STRUCT(m_payload);
}

/**
* Sets the message response topic associated with this event. The event does not own this topic.
*
* @param topic the message response topic associated with this event
* @return reference to this
*/
IncomingPublishEvent &WithTopic(Aws::Crt::ByteCursor topic)
{
m_topic = topic;
return *this;
}

/**
* Sets the message payload associated with this event. The event does not own this payload.
*
* @param payload he message payload associated with this event
* @param payload the message payload associated with this event
* @return reference to this
*/
IncomingPublishEvent &WithPayload(Aws::Crt::ByteCursor payload)
Expand All @@ -135,6 +151,13 @@ namespace Aws
return *this;
}

/**
* Gets the message response topic associated with this event.
*
* @return the message response topic associated with this event
*/
Aws::Crt::ByteCursor GetTopic() const { return m_topic; }

/**
* Gets the message payload associated with this event.
*
Expand All @@ -143,6 +166,7 @@ namespace Aws
Aws::Crt::ByteCursor GetPayload() const { return m_payload; }

private:
Aws::Crt::ByteCursor m_topic;
Aws::Crt::ByteCursor m_payload;
};

Expand Down
12 changes: 9 additions & 3 deletions source/iot/MqttRequestResponseClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ namespace Aws
int error_code,
void *user_data);

static void OnIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data);
static void OnIncomingPublishCallback(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data);

static void OnTerminatedCallback(void *user_data);

Expand Down Expand Up @@ -187,7 +190,10 @@ namespace Aws
}
}

void StreamingOperationImpl::OnIncomingPublishCallback(struct aws_byte_cursor payload, void *user_data)
void StreamingOperationImpl::OnIncomingPublishCallback(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data)
{
auto *handle = static_cast<StreamingOperationImplHandle *>(user_data);
StreamingOperationImpl *impl = handle->m_impl.get();
Expand All @@ -198,7 +204,7 @@ namespace Aws
if (!impl->m_closed && impl->m_config.incomingPublishEventHandler)
{
IncomingPublishEvent event;
event.WithPayload(payload);
event.WithTopic(topic).WithPayload(payload);

impl->m_config.incomingPublishEventHandler(std::move(event));
}
Expand Down
21 changes: 16 additions & 5 deletions tests/MqttRequestResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ struct ResponseTracker
bool complete;
};

struct TestPublishEvent
{
Aws::Crt::String topic;
Aws::Crt::String payload;
};

struct TestState
{
TestState(Aws::Crt::Allocator *allocator) : allocator(allocator) {}
Expand All @@ -54,7 +60,7 @@ struct TestState
Aws::Crt::Vector<std::shared_ptr<ResponseTracker>> responseTrackers;

Aws::Crt::Vector<Aws::Iot::RequestResponse::SubscriptionStatusEvent> subscriptionStatusEvents;
Aws::Crt::Vector<Aws::Crt::String> incomingPublishEvents;
Aws::Crt::Vector<TestPublishEvent> incomingPublishEvents;
};

static void s_waitForConnected(struct TestState *state)
Expand Down Expand Up @@ -168,17 +174,20 @@ static void s_onIncomingPublishEvent(Aws::Iot::RequestResponse::IncomingPublishE
{
std::unique_lock<std::mutex> lock(state->lock);

auto topicCursor = event.GetTopic();
Aws::Crt::String topicAsString((const char *)topicCursor.ptr, topicCursor.len);

auto payloadCursor = event.GetPayload();
Aws::Crt::String payloadAsString((const char *)payloadCursor.ptr, payloadCursor.len);

state->incomingPublishEvents.push_back(payloadAsString);
state->incomingPublishEvents.push_back({std::move(topicAsString), std::move(payloadAsString)});
}
state->signal.notify_one();
}

static void s_waitForIncomingPublishWithPredicate(
TestState *state,
const std::function<bool(const Aws::Crt::String &)> &predicate)
const std::function<bool(const TestPublishEvent &)> &predicate)
{
{
std::unique_lock<std::mutex> lock(state->lock);
Expand All @@ -189,7 +198,7 @@ static void s_waitForIncomingPublishWithPredicate(
return std::any_of(
state->incomingPublishEvents.cbegin(),
state->incomingPublishEvents.cend(),
[=](const Aws::Crt::String &payload) { return predicate(payload); });
[=](const TestPublishEvent &publishEvent) { return predicate(publishEvent); });
});
}
}
Expand Down Expand Up @@ -1077,7 +1086,9 @@ static int s_doShadowUpdatedStreamIncomingPublishTest(Aws::Crt::Allocator *alloc
s_publishToProtocolClient(context, uuid, s_publishPayload, allocator);

s_waitForIncomingPublishWithPredicate(
&state, [](const Aws::Crt::String &payload) { return payload == Aws::Crt::String(s_publishPayload); });
&state,
[&uuid](const TestPublishEvent &publishEvent)
{ return publishEvent.topic == uuid && publishEvent.payload == Aws::Crt::String(s_publishPayload); });

return AWS_OP_SUCCESS;
}
Expand Down

0 comments on commit 7627b48

Please sign in to comment.