Skip to content

Commit

Permalink
[core] Reduce allocation / deallocations necessary for Registration. (#…
Browse files Browse the repository at this point in the history
…1744)

Provide `Util::CExpandingVector` (and unit tests) to be used as a replacement to std::vector / std::list occurences in the `Registration::SampleList` and sub-objects.
This vector does not destruct elements upon calling clear(), rather calls their clear() methods.
  • Loading branch information
KerstinKeller authored Sep 26, 2024
1 parent 3a51a0d commit 6df2a8e
Show file tree
Hide file tree
Showing 26 changed files with 696 additions and 141 deletions.
1 change: 1 addition & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ endif()
set(ecal_util_src
src/util/ecal_expmap.h
src/util/ecal_thread.h
src/util/expanding_vector.h
src/util/frequency_calculator.h
src/util/getenvvar.h
)
Expand Down
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ namespace eCAL
{
return std::tie(name, encoding, descriptor) < std::tie(rhs.name, rhs.encoding, rhs.descriptor);
}

void clear()
{
name.clear();
encoding.clear();
descriptor.clear();
}
//!< @endcond
};

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ namespace eCAL
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
for (const auto& iter : m_topic_name_datawriter_map)
{
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
iter.second->GetRegistration(reg_sample_list_.push_back());
}
}
}
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ namespace eCAL
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (const auto& iter : m_topic_name_datareader_map)
{
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
iter.second->GetRegistration(reg_sample_list_.push_back());
}
}
}
24 changes: 10 additions & 14 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ namespace eCAL
void CDataReader::Register()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample());
Registration::Sample sample;
GetRegistrationSample(sample);
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(sample);

#ifndef NDEBUG
// log it
Expand All @@ -606,7 +608,9 @@ namespace eCAL
void CDataReader::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample());
Registration::Sample sample;
GetUnregistrationSample(sample);
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(sample);

#ifndef NDEBUG
// log it
Expand All @@ -615,10 +619,10 @@ namespace eCAL
#endif // ECAL_CORE_REGISTRATION
}

Registration::Sample CDataReader::GetRegistration()
void CDataReader::GetRegistration(Registration::Sample& sample)
{
// return registration
return GetRegistrationSample();
return GetRegistrationSample(sample);
}

bool CDataReader::IsPublished() const
Expand All @@ -631,10 +635,8 @@ namespace eCAL
return m_connection_count;
}

Registration::Sample CDataReader::GetRegistrationSample()
void CDataReader::GetRegistrationSample(Registration::Sample& ecal_reg_sample)
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_subscriber;

auto& ecal_reg_sample_identifier = ecal_reg_sample.identifier;
Expand Down Expand Up @@ -706,14 +708,10 @@ namespace eCAL
// we do not know the number of connections ..
ecal_reg_sample_topic.connections_loc = 0;
ecal_reg_sample_topic.connections_ext = 0;

return ecal_reg_sample;
}

Registration::Sample CDataReader::GetUnregistrationSample()
void CDataReader::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample)
{
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_subscriber;

auto& ecal_reg_sample_identifier = ecal_unreg_sample.identifier;
Expand All @@ -726,8 +724,6 @@ namespace eCAL
ecal_reg_sample_topic.pname = m_pname;
ecal_reg_sample_topic.tname = m_topic_name;
ecal_reg_sample_topic.uname = Process::GetUnitName();

return ecal_unreg_sample;
}

void CDataReader::StartTransportLayer()
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace eCAL

void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_);

Registration::Sample GetRegistration();
void GetRegistration(Registration::Sample& sample);
bool IsCreated() const { return(m_created); }

bool IsPublished() const;
Expand Down Expand Up @@ -115,8 +115,8 @@ namespace eCAL
void Register();
void Unregister();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();
void GetRegistrationSample(Registration::Sample& sample);
void GetUnregistrationSample(Registration::Sample& sample);

void StartTransportLayer();
void StopTransportLayer();
Expand Down
24 changes: 10 additions & 14 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,9 @@ namespace eCAL
void CDataWriter::Register()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample());
Registration::Sample registration_sample;
GetRegistrationSample(registration_sample);
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(registration_sample);

#ifndef NDEBUG
// log it
Expand All @@ -640,7 +642,9 @@ namespace eCAL
void CDataWriter::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample());
Registration::Sample unregistration_sample;
GetUnregistrationSample(unregistration_sample);
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(unregistration_sample);

#ifndef NDEBUG
// log it
Expand All @@ -649,15 +653,13 @@ namespace eCAL
#endif // ECAL_CORE_REGISTRATION
}

Registration::Sample CDataWriter::GetRegistration()
void CDataWriter::GetRegistration(Registration::Sample& sample)
{
return GetRegistrationSample();
GetRegistrationSample(sample);
}

Registration::Sample CDataWriter::GetRegistrationSample()
void CDataWriter::GetRegistrationSample(Registration::Sample& ecal_reg_sample)
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_publisher;

auto& ecal_reg_sample_identifier = ecal_reg_sample.identifier;
Expand Down Expand Up @@ -748,14 +750,10 @@ namespace eCAL
}
ecal_reg_sample_topic.connections_loc = static_cast<int32_t>(loc_connections);
ecal_reg_sample_topic.connections_ext = static_cast<int32_t>(ext_connections);

return ecal_reg_sample;
}

Registration::Sample CDataWriter::GetUnregistrationSample()
void CDataWriter::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample)
{
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_publisher;

auto& ecal_reg_sample_identifier = ecal_unreg_sample.identifier;
Expand All @@ -768,8 +766,6 @@ namespace eCAL
ecal_reg_sample_topic.pname = m_pname;
ecal_reg_sample_topic.tname = m_topic_name;
ecal_reg_sample_topic.uname = Process::GetUnitName();

return ecal_unreg_sample;
}

void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ namespace eCAL
void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_);
void RemoveSubscription(const SSubscriptionInfo& subscription_info_);

Registration::Sample GetRegistration();
void GetRegistration(Registration::Sample& sample);
void RefreshSendCounter();

bool IsCreated() const { return(m_created); }
Expand All @@ -119,8 +119,8 @@ namespace eCAL
void Register();
void Unregister();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();
void GetRegistrationSample(Registration::Sample& sample);
void GetUnregistrationSample(Registration::Sample& sample);

bool StartUdpLayer();
bool StartShmLayer();
Expand Down
22 changes: 12 additions & 10 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <atomic>
#include <chrono>
#include <functional>
#include <iterator>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -136,46 +137,47 @@ namespace eCAL
void CRegistrationProvider::AddSingleSample(const Registration::Sample& sample_)
{
const std::lock_guard<std::mutex> lock(m_applied_sample_list_mtx);
m_applied_sample_list.samples.push_back(sample_);
m_applied_sample_list.push_back(sample_);
}

void CRegistrationProvider::RegisterSendThread()
{
// collect all registrations and send them out cyclic
{
// create sample list
Registration::SampleList sample_list;
m_send_thread_sample_list.clear();

// and add process registration sample
sample_list.samples.push_back(Registration::GetProcessRegisterSample());
m_send_thread_sample_list.push_back(Registration::GetProcessRegisterSample());

#if ECAL_CORE_SUBSCRIBER
// add subscriber registrations
if (g_subgate() != nullptr) g_subgate()->GetRegistrations(sample_list);
if (g_subgate() != nullptr) g_subgate()->GetRegistrations(m_send_thread_sample_list);
#endif

#if ECAL_CORE_PUBLISHER
// add publisher registrations
if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(sample_list);
if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(m_send_thread_sample_list);
#endif

#if ECAL_CORE_SERVICE
// add server registrations
if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(sample_list);
if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(m_send_thread_sample_list);

// add client registrations
if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(sample_list);
if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(m_send_thread_sample_list);
#endif

// append applied samples list to sample list
if (!m_applied_sample_list.samples.empty())
if (!m_applied_sample_list.empty())
{
const std::lock_guard<std::mutex> lock(m_applied_sample_list_mtx);
sample_list.samples.splice(sample_list.samples.end(), m_applied_sample_list.samples);
std::copy(m_applied_sample_list.begin(), m_applied_sample_list.end(), std::back_inserter(m_send_thread_sample_list));
m_applied_sample_list.clear();
}

// send collected registration sample list
m_reg_sender->SendSampleList(sample_list);
m_reg_sender->SendSampleList(m_send_thread_sample_list);
}
}
}
2 changes: 2 additions & 0 deletions ecal/core/src/registration/ecal_registration_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ namespace eCAL
std::mutex m_applied_sample_list_mtx;
Registration::SampleList m_applied_sample_list;

Registration::SampleList m_send_thread_sample_list;

Registration::SAttributes m_attributes;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ namespace eCAL

void CRegistrationReceiverSHM::Receive()
{
// At the moment this function is called synchronously by a dedicated thread.
// If this changes, we need to protect the sample list member variable
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
{
eCAL::Registration::SampleList sample_list;
m_sample_list.clear();
for (const auto& message : message_list)
{
if (DeserializeFromBuffer(static_cast<const char*>(message.data), message.size, sample_list))
if (DeserializeFromBuffer(static_cast<const char*>(message.data), message.size, m_sample_list))
{
for (const auto& sample : sample_list.samples)
for (const auto& sample : m_sample_list)
{
m_apply_sample_callback(sample);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace eCAL
std::unique_ptr<CMemoryFileBroadcastReader> m_memfile_broadcast_reader;
std::unique_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;

eCAL::Registration::SampleList m_sample_list;

RegistrationApplySampleCallbackT m_apply_sample_callback;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace eCAL
bool CRegistrationSenderUDP::SendSampleList(const Registration::SampleList& sample_list)
{
bool return_value{ true };
for (const auto& sample : sample_list.samples)
for (const auto& sample : sample_list)
{
return_value &= SendSample(sample);
}
Expand Down
Loading

0 comments on commit 6df2a8e

Please sign in to comment.