diff --git a/ecal/core/include/ecal/ecal_callback.h b/ecal/core/include/ecal/ecal_callback.h index 00dbd5d3b3..97015dcbc0 100644 --- a/ecal/core/include/ecal/ecal_callback.h +++ b/ecal/core/include/ecal/ecal_callback.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,6 +131,13 @@ namespace eCAL **/ using ReceiveCallbackT = std::function; + /** + * @brief Raw data receive callback function type. + * + * @param data_ Data struct containing payload, sender_id, timestamp and publication clock. + **/ + using ReceiveCallbackTV6 = std::function; + /** * @brief Timer callback function type. **/ diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 5ba5610d46..910f9dfda4 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -208,6 +208,15 @@ namespace eCAL **/ ECAL_API bool AddReceiveCallback(ReceiveCallbackT callback_); + /** + * @brief Add callback function for incoming receives. + * + * @param callback_ The callback function to add. + * + * @return True if succeeded, false if not. + **/ + ECAL_API bool AddReceiveCallback(ReceiveCallbackTV6 callback_); + /** * @brief Remove callback function for incoming receives. * diff --git a/ecal/core/src/io/shm/ecal_memfile_pool.cpp b/ecal/core/src/io/shm/ecal_memfile_pool.cpp index e2a1ed816b..9a982e48ae 100644 --- a/ecal/core/src/io/shm/ecal_memfile_pool.cpp +++ b/ecal/core/src/io/shm/ecal_memfile_pool.cpp @@ -96,7 +96,7 @@ namespace eCAL return true; } - bool CMemFileObserver::Start(const std::string& topic_name_, const std::string& topic_id_, const int timeout_, const MemFileDataCallbackT& callback_) + bool CMemFileObserver::Start(const int timeout_, const MemFileDataCallbackT& callback_) { if (!m_created) return false; if (m_is_observing) return false; @@ -108,11 +108,11 @@ namespace eCAL m_is_observing = true; // start observer thread - m_thread = std::thread(&CMemFileObserver::Observe, this, topic_name_, topic_id_, timeout_); + m_thread = std::thread(&CMemFileObserver::Observe, this, timeout_); #ifndef NDEBUG // log it - Logging::Log(log_level_debug2, std::string("CMemFileObserver started (" + topic_name_ + ", " + topic_id_ + ")")); + Logging::Log(log_level_debug2, std::string("CMemFileObserver started.")); #endif return true; @@ -147,7 +147,7 @@ namespace eCAL return true; } - void CMemFileObserver::Observe(const std::string& topic_name_, const std::string& topic_id_, const int timeout_) + void CMemFileObserver::Observe(const int timeout_) { // internal clock sample update checking uint64_t last_sample_clock(0); @@ -223,13 +223,13 @@ namespace eCAL // calculate user payload address data_buf = static_cast(buf) + mfile_hdr.hdr_size; // call user callback function - m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); + m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); } } else { // call user callback function - m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); + m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); } } } @@ -264,7 +264,7 @@ namespace eCAL if (post_process_buffer) { // add sample to data reader (and call user callback function) - if (m_data_callback) m_data_callback(topic_name_, topic_id_, receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); + if (m_data_callback) m_data_callback(receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash); } // send acknowledge event @@ -365,7 +365,7 @@ namespace eCAL m_created = false; } - bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_) + bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_) { if(!m_created) return(false); if(memfile_name_.empty()) return(false); @@ -388,7 +388,7 @@ namespace eCAL else { observer->Stop(); - observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_); + observer->Start(timeout_observation_ms, callback_); } return(true); @@ -398,7 +398,7 @@ namespace eCAL { auto observer = std::make_shared(); observer->Create(memfile_name_, memfile_event_); - observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_); + observer->Start(timeout_observation_ms, callback_); m_observer_pool[memfile_name_] = observer; #ifndef NDEBUG // log it diff --git a/ecal/core/src/io/shm/ecal_memfile_pool.h b/ecal/core/src/io/shm/ecal_memfile_pool.h index 7d83e7570a..cf64438df8 100644 --- a/ecal/core/src/io/shm/ecal_memfile_pool.h +++ b/ecal/core/src/io/shm/ecal_memfile_pool.h @@ -42,7 +42,7 @@ namespace eCAL { - using MemFileDataCallbackT = std::function; + using MemFileDataCallbackT = std::function; //////////////////////////////////////// // CMemFileObserver @@ -61,14 +61,14 @@ namespace eCAL bool Create(const std::string& memfile_name_, const std::string& memfile_event_); bool Destroy(); - bool Start(const std::string& topic_name_, const std::string& topic_id_, int timeout_, const MemFileDataCallbackT& callback_); + bool Start(int timeout_, const MemFileDataCallbackT& callback_); bool Stop(); bool IsObserving() {return(m_is_observing);}; bool ResetTimeout(); protected: - void Observe(const std::string& topic_name_, const std::string& topic_id_, int timeout_); + void Observe(int timeout_); bool ReadFileHeader(SMemFileHeader& memfile_hdr); std::atomic m_created; @@ -97,7 +97,7 @@ namespace eCAL void Start(); void Stop(); - bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_); + bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_); protected: void CleanupPoolThread(); diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 314d4b005a..bb356603fe 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -163,7 +163,7 @@ namespace eCAL for (const auto& reader : readers_to_apply) { applied_size = reader->ApplySample( - ecal_sample.topic.tid, + ecal_sample.topic, payload_addr, payload_size, ecal_sample_content.id, @@ -182,7 +182,7 @@ namespace eCAL return (applied_size > 0); } - bool CSubGate::ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) + bool CSubGate::ApplySample(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { if (!m_created) return false; @@ -194,7 +194,7 @@ namespace eCAL // Apply the samples to the readers afterwards. { const std::shared_lock lock(m_topic_name_datareader_sync); - auto res = m_topic_name_datareader_map.equal_range(topic_name_); + auto res = m_topic_name_datareader_map.equal_range(topic_id_.tname); std::transform( res.first, res.second, std::back_inserter(readers_to_apply), [](const auto& match) { return match.second; } ); diff --git a/ecal/core/src/pubsub/ecal_subgate.h b/ecal/core/src/pubsub/ecal_subgate.h index cc1b589c76..8e1d79e0c2 100644 --- a/ecal/core/src/pubsub/ecal_subgate.h +++ b/ecal/core/src/pubsub/ecal_subgate.h @@ -49,7 +49,7 @@ namespace eCAL bool HasSample(const std::string& sample_name_); bool ApplySample(const char* serialized_sample_data_, size_t serialized_sample_size_, eTLayerType layer_); - bool ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); + bool ApplySample(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); void ApplyPubRegistration(const Registration::Sample& ecal_sample_); void ApplyPubUnregistration(const Registration::Sample& ecal_sample_); diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index 4e4366050b..5d18cf6e7d 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -148,7 +148,16 @@ namespace eCAL bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_) { - if(m_datareader == nullptr) return(false); + auto v6_callback = [callback_](const Registration::STopicId& topic_id_, const SReceiveCallbackData& data_) + { + callback_(topic_id_.topic_name.c_str(), &data_); + }; + return AddReceiveCallback(v6_callback); + } + + bool CSubscriber::AddReceiveCallback(ReceiveCallbackTV6 callback_) + { + if (m_datareader == nullptr) return(false); RemReceiveCallback(); return(m_datareader->AddReceiveCallback(std::move(callback_))); } diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 122db1b657..0082a82221 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -180,7 +180,7 @@ namespace eCAL return(false); } - bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_) + bool CDataReader::AddReceiveCallback(ReceiveCallbackTV6 callback_) { if (!m_created) return(false); @@ -361,7 +361,7 @@ namespace eCAL #endif } - size_t CDataReader::ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) + size_t CDataReader::ApplySample(const Payload::Topic& topic_id_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { // ensure thread safety const std::lock_guard lock(m_receive_callback_mtx); @@ -419,7 +419,7 @@ namespace eCAL // - a dropped message // - an out-of-order message // - a multiple sent message - if (!CheckMessageClock(tid_, clock_)) + if (!CheckMessageClock(topic_id_.tid, clock_)) { // we will not process that message return(0); @@ -463,8 +463,15 @@ namespace eCAL cb_data.id = id_; cb_data.time = time_; cb_data.clock = clock_; + + Registration::STopicId topic_id; + topic_id.topic_name = topic_id_.tname; + topic_id.topic_id.host_name = topic_id_.hname; + topic_id.topic_id.entity_id = topic_id_.tid; + topic_id.topic_id.process_id = topic_id_.pid; + // execute it - (m_receive_callback)(m_topic_name.c_str(), &cb_data); + (m_receive_callback)(topic_id, cb_data); processed = true; } } diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 594b62bd74..2095dce39a 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -71,7 +71,7 @@ namespace eCAL bool Read(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); - bool AddReceiveCallback(ReceiveCallbackT callback_); + bool AddReceiveCallback(ReceiveCallbackTV6 callback_); bool RemReceiveCallback(); bool AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_); @@ -107,7 +107,7 @@ namespace eCAL SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); } void InitializeLayers(); - size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); + size_t ApplySample(const Payload::Topic& topic_id_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); std::string Dump(const std::string& indent_ = ""); @@ -153,7 +153,7 @@ namespace eCAL long long m_read_time = 0; std::mutex m_receive_callback_mtx; - ReceiveCallbackT m_receive_callback; + ReceiveCallbackTV6 m_receive_callback; std::atomic m_receive_time; std::deque m_sample_hash_queue; diff --git a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp index a20c0db7c5..27a8ff9907 100644 --- a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp @@ -47,25 +47,27 @@ namespace eCAL { const std::string process_id = std::to_string(Process::GetProcessID()); const std::string memfile_event = memfile_name + "_" + process_id; - const MemFileDataCallbackT memfile_data_callback = std::bind(&CSHMReaderLayer::OnNewShmFileContent, this, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4, - std::placeholders::_5, - std::placeholders::_6, - std::placeholders::_7, - std::placeholders::_8); - g_memfile_pool()->ObserveFile(memfile_name, memfile_event, par_.topic_name, par_.topic_id, Config::GetRegistrationTimeoutMs(), memfile_data_callback); + + Payload::Topic topic; + topic.tname = par_.topic_name; + topic.hname = par_.host_name; + topic.tid = par_.topic_id; + topic.pid = par_.process_id; + + auto data_callback = [this, topic](const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_)->size_t + { + return OnNewShmFileContent(topic, buf_, len_, id_, clock_, time_, hash_); + }; + g_memfile_pool()->ObserveFile(memfile_name, memfile_event, Config::GetRegistrationTimeoutMs(), data_callback); } } } - size_t CSHMReaderLayer::OnNewShmFileContent(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_) + size_t CSHMReaderLayer::OnNewShmFileContent(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_) { if (g_subgate() != nullptr) { - if (g_subgate()->ApplySample(topic_name_, topic_id_, buf_, len_, id_, clock_, time_, hash_, tl_ecal_shm)) + if (g_subgate()->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, tl_ecal_shm)) { return len_; } diff --git a/ecal/core/src/readwrite/shm/ecal_reader_shm.h b/ecal/core/src/readwrite/shm/ecal_reader_shm.h index e3c5cb3926..920c72f535 100644 --- a/ecal/core/src/readwrite/shm/ecal_reader_shm.h +++ b/ecal/core/src/readwrite/shm/ecal_reader_shm.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ #include "ecal_def.h" #include "readwrite/ecal_reader_layer.h" +#include "serialization/ecal_struct_sample_payload.h" #include #include @@ -48,6 +49,6 @@ namespace eCAL void SetConnectionParameter(SReaderLayerPar& par_) override; private: - size_t OnNewShmFileContent(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_); + size_t OnNewShmFileContent(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_); }; } diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp index e916fcdc11..7423a80fff 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp @@ -107,10 +107,9 @@ namespace eCAL // use this intermediate variables as optimization const auto& ecal_header_topic = m_ecal_header.topic; const auto& ecal_header_content = m_ecal_header.content; - // apply sample + g_subgate()->ApplySample( - ecal_header_topic.tname, - ecal_header_topic.tid, + ecal_header_topic, data_payload, static_cast(ecal_header_content.size), ecal_header_content.id, diff --git a/ecal/core/src/serialization/ecal_struct_sample_payload.h b/ecal/core/src/serialization/ecal_struct_sample_payload.h index 47b033ac7d..3219906ec9 100644 --- a/ecal/core/src/serialization/ecal_struct_sample_payload.h +++ b/ecal/core/src/serialization/ecal_struct_sample_payload.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ namespace eCAL std::string hname; // host name std::string tid; // topic id std::string tname; // topic name + int32_t pid; // process id }; // Topic content payload diff --git a/ecal/samples/cpp/benchmarks/many_connections_rec/src/many_connections_rec.cpp b/ecal/samples/cpp/benchmarks/many_connections_rec/src/many_connections_rec.cpp index 6f6682a59a..5912fcba6d 100644 --- a/ecal/samples/cpp/benchmarks/many_connections_rec/src/many_connections_rec.cpp +++ b/ecal/samples/cpp/benchmarks/many_connections_rec/src/many_connections_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,11 @@ class SubscriberCreator std::ostringstream tname; tname << std::setw(5) << std::setfill('0') << i; subscribers.emplace_back("Topic" + tname.str(), eCAL::SDataTypeInformation{ ttype, "", tdesc }); - subscribers.at(i).AddReceiveCallback(std::bind(&SubscriberCreator::Receive, this)); + auto on_receive = [this](const eCAL::Registration::STopicId&, const eCAL::SReceiveCallbackData&) + { + Receive(); + }; + subscribers.at(i).AddReceiveCallback(on_receive); } }