Skip to content

Commit

Permalink
Fix stream handling (#109)
Browse files Browse the repository at this point in the history
* Fix stream handling

* Update

* Uncrustified

Co-authored-by: GitHub Actions Bot <>
  • Loading branch information
pablogs9 authored Mar 9, 2021
1 parent 68adec1 commit 8a4b7f6
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 64 deletions.
4 changes: 2 additions & 2 deletions rmw_microxrcedds_c/src/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void on_request(
while (service_item != NULL)
{
rmw_uxrce_service_t* custom_service = (rmw_uxrce_service_t*)service_item->data;
if (custom_service->request_id == request_id)
if (custom_service->service_data_resquest == request_id)
{
custom_service->micro_buffer_lenght[custom_service->history_write_index] = length;
ucdr_deserialize_array_uint8_t(
Expand Down Expand Up @@ -151,7 +151,7 @@ void on_reply(
while (client_item != NULL)
{
rmw_uxrce_client_t* custom_client = (rmw_uxrce_client_t*)client_item->data;
if (custom_client->request_id == request_id)
if (custom_client->client_data_request == request_id)
{
custom_client->micro_buffer_lenght[custom_client->history_write_index] = length;
ucdr_deserialize_array_uint8_t(
Expand Down
18 changes: 12 additions & 6 deletions rmw_microxrcedds_c/src/rmw_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rmw_create_client(
const rmw_qos_profile_t* qos_policies)
{
EPROS_PRINT_TRACE()
rmw_client_t * rmw_client = NULL;
rmw_client_t* rmw_client = NULL;
if (!node)
{
RMW_SET_ERROR_MSG("node handle is null");
Expand Down Expand Up @@ -145,12 +145,13 @@ rmw_create_client(
}
client_req = uxr_buffer_create_requester_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_client->client_id,
*custom_node->context->creation_destroy_stream,
custom_client->client_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
// TODO(pablogs9): Is possible to instantiate a replier by ref?
// client_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
// custom_node->context->reliable_output, custom_service->subscriber_id,
// *custom_node->context->creation_destroy_stream, custom_service->subscriber_id,
// custom_node->participant_id, "", UXR_REPLACE);
#endif

Expand All @@ -169,14 +170,19 @@ rmw_create_client(
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

custom_client->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;

custom_client->request_id = uxr_buffer_request_data(
custom_client->client_data_request = uxr_buffer_request_data(
&custom_node->context->session,
custom_node->context->reliable_output, custom_client->client_id,
custom_client->stream_id, &delivery_control);
*custom_node->context->creation_destroy_stream, custom_client->client_id,
data_request_stream_id, &delivery_control);
}
return(rmw_client);

Expand Down
10 changes: 5 additions & 5 deletions rmw_microxrcedds_c/src/rmw_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ rmw_ret_t rmw_graph_init(

uint16_t participant_req = uxr_buffer_create_participant_xml(
&context->session,
context->reliable_output,
*context->creation_destroy_stream,
graph_info->participant_id, (int16_t)microros_domain_id,
rmw_uxrce_xml_buffer, UXR_REPLACE);

Expand Down Expand Up @@ -84,7 +84,7 @@ rmw_ret_t rmw_graph_init(
}

uint16_t subscriber_req = uxr_buffer_create_subscriber_xml(
&context->session, context->reliable_output, graph_info->subscriber_id,
&context->session, *context->creation_destroy_stream, graph_info->subscriber_id,
graph_info->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);

graph_info->datareader_id = uxr_object_id(context->id_datareader++, UXR_DATAREADER_ID);
Expand All @@ -105,7 +105,7 @@ rmw_ret_t rmw_graph_init(
}

uint16_t topic_req = uxr_buffer_create_topic_xml(
&context->session, context->reliable_output, graph_info->topic_id,
&context->session, *context->creation_destroy_stream, graph_info->topic_id,
graph_info->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);

// Create graph datareader request
Expand All @@ -120,7 +120,7 @@ rmw_ret_t rmw_graph_init(
}

uint16_t datareader_req = uxr_buffer_create_datareader_xml(
&context->session, context->reliable_output, graph_info->datareader_id,
&context->session, *context->creation_destroy_stream, graph_info->datareader_id,
graph_info->subscriber_id, rmw_uxrce_xml_buffer, UXR_REPLACE);

// Run session
Expand All @@ -142,7 +142,7 @@ rmw_ret_t rmw_graph_init(
delivery_control.max_elapsed_time = UXR_MAX_ELAPSED_TIME_UNLIMITED;
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

graph_info->subscription_request = uxr_buffer_request_data(
uxr_buffer_request_data(
&context->session,
context->reliable_output, graph_info->datareader_id,
context->reliable_input, &delivery_control);
Expand Down
2 changes: 1 addition & 1 deletion rmw_microxrcedds_c/src/rmw_guard_condition.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ rmw_create_guard_condition(rmw_context_t* context)
(void)context;
EPROS_PRINT_TRACE()

rmw_guard_condition_t * rmw_guard_condition = (rmw_guard_condition_t*)rmw_allocate(
rmw_guard_condition_t* rmw_guard_condition = (rmw_guard_condition_t*)rmw_allocate(
sizeof(rmw_guard_condition_t));

rmw_guard_condition->context = context;
Expand Down
4 changes: 2 additions & 2 deletions rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ create_topic(

topic_req = uxr_buffer_create_topic_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_topic->topic_id,
*custom_node->context->creation_destroy_stream, custom_topic->topic_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
(void)qos_policies;
Expand All @@ -79,7 +79,7 @@ create_topic(

topic_req = uxr_buffer_create_topic_ref(
&custom_node->context->session,
custom_node->context->reliable_output, custom_topic->topic_id,
*custom_node->context->creation_destroy_stream, custom_topic->topic_id,
custom_node->participant_id, rmw_uxrce_profile_name, UXR_REPLACE);
#endif

Expand Down
6 changes: 3 additions & 3 deletions rmw_microxrcedds_c/src/rmw_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ rmw_node_t* create_node(
participant_req =
uxr_buffer_create_participant_xml(
&node_info->context->session,
node_info->context->reliable_output,
*node_info->context->creation_destroy_stream,
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
if (!build_participant_profile(rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name)))
Expand All @@ -102,7 +102,7 @@ rmw_node_t* create_node(
participant_req =
uxr_buffer_create_participant_ref(
&node_info->context->session,
node_info->context->reliable_output,
*node_info->context->creation_destroy_stream,
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_profile_name, UXR_REPLACE);
#endif

Expand Down Expand Up @@ -134,7 +134,7 @@ rmw_create_node(
(void)context;
(void)localhost_only;
EPROS_PRINT_TRACE()
rmw_node_t * rmw_node = NULL;
rmw_node_t* rmw_node = NULL;
if (!name || strlen(name) == 0)
{
RMW_SET_ERROR_MSG("name is null");
Expand Down
14 changes: 7 additions & 7 deletions rmw_microxrcedds_c/src/rmw_publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ rmw_create_publisher(
(void)publisher_options;

EPROS_PRINT_TRACE()
rmw_publisher_t * rmw_publisher = NULL;
rmw_publisher_t* rmw_publisher = NULL;
if (!node)
{
RMW_SET_ERROR_MSG("node handle is null");
Expand Down Expand Up @@ -119,8 +119,8 @@ rmw_create_publisher(

custom_publisher->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;
custom_node->context->best_effort_output :
custom_node->context->reliable_output;

custom_publisher->cs_cb_size = NULL;
custom_publisher->cs_cb_serialization = NULL;
Expand Down Expand Up @@ -185,13 +185,13 @@ rmw_create_publisher(
}
publisher_req = uxr_buffer_create_publisher_xml(
&custom_publisher->owner_node->context->session,
custom_node->context->reliable_output,
*custom_node->context->creation_destroy_stream,
custom_publisher->publisher_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
publisher_req = uxr_buffer_create_publisher_xml(
&custom_publisher->owner_node->context->session,
custom_node->context->reliable_output,
*custom_node->context->creation_destroy_stream,
custom_publisher->publisher_id,
custom_node->participant_id, "", UXR_REPLACE);
#endif
Expand Down Expand Up @@ -220,7 +220,7 @@ rmw_create_publisher(

datawriter_req = uxr_buffer_create_datawriter_xml(
&custom_publisher->owner_node->context->session,
custom_node->context->reliable_output,
*custom_node->context->creation_destroy_stream,
custom_publisher->datawriter_id,
custom_publisher->publisher_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
Expand All @@ -232,7 +232,7 @@ rmw_create_publisher(

datawriter_req = uxr_buffer_create_datawriter_ref(
&custom_publisher->owner_node->context->session,
custom_node->context->reliable_output,
*custom_node->context->creation_destroy_stream,
custom_publisher->datawriter_id,
custom_publisher->publisher_id, rmw_uxrce_profile_name, UXR_REPLACE);
#endif
Expand Down
10 changes: 5 additions & 5 deletions rmw_microxrcedds_c/src/rmw_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ rmw_send_request(
(const message_type_support_callbacks_t*)req_members->data;

ucdrBuffer mb;
uint32_t request_length = functions->get_serialized_size(ros_request);
uint32_t request_length = functions->get_serialized_size(ros_request);
*sequence_id = uxr_prepare_output_stream(
&custom_node->context->session,
custom_client->stream_id, custom_client->client_id, &mb,
request_length);
&custom_node->context->session,
custom_client->stream_id, custom_client->client_id, &mb,
request_length);

functions->cdr_serialize(ros_request, &mb);

if (UXR_INVALID_REQUEST_ID == *sequence_id)
Expand Down
12 changes: 6 additions & 6 deletions rmw_microxrcedds_c/src/rmw_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ rmw_send_response(
(const message_type_support_callbacks_t*)res_members->data;

ucdrBuffer mb;
uint32_t response_length = functions->get_serialized_size(ros_response) + 24; // Adding sample indentity size
uint16_t rc = uxr_prepare_output_stream(
&custom_node->context->session,
custom_service->stream_id, custom_service->service_id, &mb,
response_length);
uint32_t response_length = functions->get_serialized_size(ros_response) + 24; // Adding sample indentity size
uint16_t rc = uxr_prepare_output_stream(
&custom_node->context->session,
custom_service->stream_id, custom_service->service_id, &mb,
response_length);

uxr_serialize_SampleIdentity(&mb, &sample_id);
functions->cdr_serialize(ros_response, &mb);

Expand Down
17 changes: 11 additions & 6 deletions rmw_microxrcedds_c/src/rmw_service.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rmw_create_service(
const rmw_qos_profile_t* qos_policies)
{
EPROS_PRINT_TRACE()
rmw_service_t * rmw_service = NULL;
rmw_service_t* rmw_service = NULL;
if (!node)
{
RMW_SET_ERROR_MSG("node handle is null");
Expand Down Expand Up @@ -142,12 +142,12 @@ rmw_create_service(
}
service_req = uxr_buffer_create_replier_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_service->service_id,
*custom_node->context->creation_destroy_stream, custom_service->service_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
// CHECK IF THIS IS NECESSARY
// service_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
// custom_node->context->reliable_output, custom_service->subscriber_id,
// *custom_node->context->creation_destroy_stream, custom_service->subscriber_id,
// custom_node->participant_id, "", UXR_REPLACE);
#endif

Expand All @@ -167,14 +167,19 @@ rmw_create_service(
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

custom_service->stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_output :
custom_node->context->reliable_output;

uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;

custom_service->request_id = uxr_buffer_request_data(
custom_service->service_data_resquest = uxr_buffer_request_data(
&custom_node->context->session,
custom_node->context->reliable_output, custom_service->service_id,
custom_service->stream_id, &delivery_control);
*custom_node->context->creation_destroy_stream, custom_service->service_id,
data_request_stream_id, &delivery_control);
}
return(rmw_service);

Expand Down
18 changes: 9 additions & 9 deletions rmw_microxrcedds_c/src/rmw_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ rmw_create_subscription(
(void)subscription_options;

EPROS_PRINT_TRACE()
rmw_subscription_t * rmw_subscription = NULL;
rmw_subscription_t* rmw_subscription = NULL;
if (!node)
{
RMW_SET_ERROR_MSG("node handle is null");
Expand Down Expand Up @@ -181,14 +181,14 @@ rmw_create_subscription(
}
subscriber_req = uxr_buffer_create_subscriber_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_subscription->subscriber_id,
*custom_node->context->creation_destroy_stream, custom_subscription->subscriber_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
// TODO(BORJA) Publisher by reference does not make sense in
// current micro XRCE-DDS implementation.
subscriber_req = uxr_buffer_create_subscriber_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_subscription->subscriber_id,
*custom_node->context->creation_destroy_stream, custom_subscription->subscriber_id,
custom_node->participant_id, "", UXR_REPLACE);
#endif

Expand All @@ -215,7 +215,7 @@ rmw_create_subscription(

datareader_req = uxr_buffer_create_datareader_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_subscription->datareader_id,
*custom_node->context->creation_destroy_stream, custom_subscription->datareader_id,
custom_subscription->subscriber_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
if (!build_datareader_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name)))
Expand All @@ -226,7 +226,7 @@ rmw_create_subscription(

datareader_req = uxr_buffer_create_datareader_ref(
&custom_node->context->session,
custom_node->context->reliable_output, custom_subscription->datareader_id,
*custom_node->context->creation_destroy_stream, custom_subscription->datareader_id,
custom_subscription->subscriber_id, rmw_uxrce_profile_name, UXR_REPLACE);
#endif

Expand All @@ -245,15 +245,15 @@ rmw_create_subscription(
delivery_control.max_elapsed_time = UXR_MAX_ELAPSED_TIME_UNLIMITED;
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;

custom_subscription->stream_id =
uxrStreamId data_request_stream_id =
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
custom_node->context->best_effort_input :
custom_node->context->reliable_input;

custom_subscription->subscription_request = uxr_buffer_request_data(
uxr_buffer_request_data(
&custom_node->context->session,
custom_node->context->reliable_output, custom_subscription->datareader_id,
custom_subscription->stream_id, &delivery_control);
*custom_node->context->creation_destroy_stream, custom_subscription->datareader_id,
data_request_stream_id, &delivery_control);
}
return(rmw_subscription);

Expand Down
12 changes: 6 additions & 6 deletions rmw_microxrcedds_c/src/rmw_uros_options.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ rmw_ret_t rmw_uros_ping_agent(const int timeout_ms, const uint8_t attempts)
uxrUDPTransport transport;
#elif defined(RMW_UXRCE_TRANSPORT_CUSTOM)
uxrCustomTransport transport;
transport.framing = rmw_uxrce_transport_default_params.framing;
transport.args = rmw_uxrce_transport_default_params.args;
transport.open = rmw_uxrce_transport_default_params.open_cb;
transport.close = rmw_uxrce_transport_default_params.close_cb;
transport.write = rmw_uxrce_transport_default_params.write_cb;
transport.read = rmw_uxrce_transport_default_params.read_cb;
transport.framing = rmw_uxrce_transport_default_params.framing;
transport.args = rmw_uxrce_transport_default_params.args;
transport.open = rmw_uxrce_transport_default_params.open_cb;
transport.close = rmw_uxrce_transport_default_params.close_cb;
transport.write = rmw_uxrce_transport_default_params.write_cb;
transport.read = rmw_uxrce_transport_default_params.read_cb;
#endif
rmw_ret_t ret = rmw_uxrce_transport_init(NULL, NULL, (void*)&transport);

Expand Down
2 changes: 1 addition & 1 deletion rmw_microxrcedds_c/src/rmw_wait_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rmw_create_wait_set(rmw_context_t* context, size_t max_conditions)
(void)max_conditions;
EPROS_PRINT_TRACE()

rmw_wait_set_t * rmw_wait_set = (rmw_wait_set_t*)rmw_allocate(
rmw_wait_set_t* rmw_wait_set = (rmw_wait_set_t*)rmw_allocate(
sizeof(rmw_wait_set_t));

return(rmw_wait_set);
Expand Down
Loading

0 comments on commit 8a4b7f6

Please sign in to comment.