Skip to content

Commit

Permalink
[core] Make STopicId available in subscriber callbacks
Browse files Browse the repository at this point in the history
- Add new Callback Type which includes STopicId (ecal_callback.h)
- Remove topic_name and topic_id from memfile_pool
- Subgate applies Payload::Topic

TODO: take care of PID for TCP and UDP layer.
  • Loading branch information
KerstinKeller committed Aug 22, 2024
1 parent bfea308 commit 13f952b
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 47 deletions.
9 changes: 8 additions & 1 deletion ecal/core/include/ecal/ecal_callback.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -131,6 +131,13 @@ namespace eCAL
**/
using ReceiveCallbackT = std::function<void (const char *, const struct SReceiveCallbackData *)>;

/**
* @brief Raw data receive callback function type.
*
* @param data_ Data struct containing payload, sender_id, timestamp and publication clock.
**/
using ReceiveCallbackTV6 = std::function<void(const Registration::STopicId&, const SReceiveCallbackData&)>;

/**
* @brief Timer callback function type.
**/
Expand Down
9 changes: 9 additions & 0 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ namespace eCAL
**/
ECAL_API bool AddReceiveCallback(ReceiveCallbackT callback_);

/**
* @brief Add callback function for incoming receives.
*
* @param callback_ The callback function to add.
*
* @return True if succeeded, false if not.
**/
ECAL_API bool AddReceiveCallback(ReceiveCallbackTV6 callback_);

/**
* @brief Remove callback function for incoming receives.
*
Expand Down
20 changes: 10 additions & 10 deletions ecal/core/src/io/shm/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace eCAL
return true;
}

bool CMemFileObserver::Start(const std::string& topic_name_, const std::string& topic_id_, const int timeout_, const MemFileDataCallbackT& callback_)
bool CMemFileObserver::Start(const int timeout_, const MemFileDataCallbackT& callback_)
{
if (!m_created) return false;
if (m_is_observing) return false;
Expand All @@ -108,11 +108,11 @@ namespace eCAL
m_is_observing = true;

// start observer thread
m_thread = std::thread(&CMemFileObserver::Observe, this, topic_name_, topic_id_, timeout_);
m_thread = std::thread(&CMemFileObserver::Observe, this, timeout_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, std::string("CMemFileObserver started (" + topic_name_ + ", " + topic_id_ + ")"));
Logging::Log(log_level_debug2, std::string("CMemFileObserver started."));
#endif

return true;
Expand Down Expand Up @@ -147,7 +147,7 @@ namespace eCAL
return true;
}

void CMemFileObserver::Observe(const std::string& topic_name_, const std::string& topic_id_, const int timeout_)
void CMemFileObserver::Observe(const int timeout_)
{
// internal clock sample update checking
uint64_t last_sample_clock(0);
Expand Down Expand Up @@ -223,13 +223,13 @@ namespace eCAL
// calculate user payload address
data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
else
{
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
}
Expand Down Expand Up @@ -264,7 +264,7 @@ namespace eCAL
if (post_process_buffer)
{
// add sample to data reader (and call user callback function)
if (m_data_callback) m_data_callback(topic_name_, topic_id_, receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
if (m_data_callback) m_data_callback(receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}

// send acknowledge event
Expand Down Expand Up @@ -365,7 +365,7 @@ namespace eCAL
m_created = false;
}

bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_)
bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_)
{
if(!m_created) return(false);
if(memfile_name_.empty()) return(false);
Expand All @@ -388,7 +388,7 @@ namespace eCAL
else
{
observer->Stop();
observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_);
observer->Start(timeout_observation_ms, callback_);
}

return(true);
Expand All @@ -398,7 +398,7 @@ namespace eCAL
{
auto observer = std::make_shared<CMemFileObserver>();
observer->Create(memfile_name_, memfile_event_);
observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_);
observer->Start(timeout_observation_ms, callback_);
m_observer_pool[memfile_name_] = observer;
#ifndef NDEBUG
// log it
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/io/shm/ecal_memfile_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

namespace eCAL
{
using MemFileDataCallbackT = std::function<size_t (const std::string &, const std::string &, const char *, size_t, long long, long long, long long, size_t)>;
using MemFileDataCallbackT = std::function<size_t (const char *, size_t, long long, long long, long long, size_t)>;

////////////////////////////////////////
// CMemFileObserver
Expand All @@ -61,14 +61,14 @@ namespace eCAL
bool Create(const std::string& memfile_name_, const std::string& memfile_event_);
bool Destroy();

bool Start(const std::string& topic_name_, const std::string& topic_id_, int timeout_, const MemFileDataCallbackT& callback_);
bool Start(int timeout_, const MemFileDataCallbackT& callback_);
bool Stop();
bool IsObserving() {return(m_is_observing);};

bool ResetTimeout();

protected:
void Observe(const std::string& topic_name_, const std::string& topic_id_, int timeout_);
void Observe(int timeout_);
bool ReadFileHeader(SMemFileHeader& memfile_hdr);

std::atomic<bool> m_created;
Expand Down Expand Up @@ -97,7 +97,7 @@ namespace eCAL
void Start();
void Stop();

bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_);
bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_);

protected:
void CleanupPoolThread();
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ namespace eCAL
for (const auto& reader : readers_to_apply)
{
applied_size = reader->ApplySample(
ecal_sample.topic.tid,
ecal_sample.topic,
payload_addr,
payload_size,
ecal_sample_content.id,
Expand All @@ -182,7 +182,7 @@ namespace eCAL
return (applied_size > 0);
}

bool CSubGate::ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
bool CSubGate::ApplySample(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
{
if (!m_created) return false;

Expand All @@ -194,7 +194,7 @@ namespace eCAL
// Apply the samples to the readers afterwards.
{
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
auto res = m_topic_name_datareader_map.equal_range(topic_name_);
auto res = m_topic_name_datareader_map.equal_range(topic_id_.tname);
std::transform(
res.first, res.second, std::back_inserter(readers_to_apply), [](const auto& match) { return match.second; }
);
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_subgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace eCAL
bool HasSample(const std::string& sample_name_);

bool ApplySample(const char* serialized_sample_data_, size_t serialized_sample_size_, eTLayerType layer_);
bool ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);
bool ApplySample(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);

void ApplyPubRegistration(const Registration::Sample& ecal_sample_);
void ApplyPubUnregistration(const Registration::Sample& ecal_sample_);
Expand Down
11 changes: 10 additions & 1 deletion ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,16 @@ namespace eCAL

bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_)
{
if(m_datareader == nullptr) return(false);
auto v6_callback = [callback_](const Registration::STopicId& topic_id_, const SReceiveCallbackData& data_)
{
callback_(topic_id_.topic_name.c_str(), &data_);
};
return AddReceiveCallback(v6_callback);
}

bool CSubscriber::AddReceiveCallback(ReceiveCallbackTV6 callback_)
{
if (m_datareader == nullptr) return(false);
RemReceiveCallback();
return(m_datareader->AddReceiveCallback(std::move(callback_)));
}
Expand Down
15 changes: 11 additions & 4 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ namespace eCAL
return(false);
}

bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_)
bool CDataReader::AddReceiveCallback(ReceiveCallbackTV6 callback_)
{
if (!m_created) return(false);

Expand Down Expand Up @@ -361,7 +361,7 @@ namespace eCAL
#endif
}

size_t CDataReader::ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
size_t CDataReader::ApplySample(const Payload::Topic& topic_id_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
{
// ensure thread safety
const std::lock_guard<std::mutex> lock(m_receive_callback_mtx);
Expand Down Expand Up @@ -419,7 +419,7 @@ namespace eCAL
// - a dropped message
// - an out-of-order message
// - a multiple sent message
if (!CheckMessageClock(tid_, clock_))
if (!CheckMessageClock(topic_id_.tid, clock_))
{
// we will not process that message
return(0);
Expand Down Expand Up @@ -463,8 +463,15 @@ namespace eCAL
cb_data.id = id_;
cb_data.time = time_;
cb_data.clock = clock_;

Registration::STopicId topic_id;
topic_id.topic_name = topic_id_.tname;
topic_id.topic_id.host_name = topic_id_.hname;
topic_id.topic_id.entity_id = topic_id_.tid;
topic_id.topic_id.process_id = topic_id_.pid;

// execute it
(m_receive_callback)(m_topic_name.c_str(), &cb_data);
(m_receive_callback)(topic_id, cb_data);
processed = true;
}
}
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace eCAL

bool Read(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0);

bool AddReceiveCallback(ReceiveCallbackT callback_);
bool AddReceiveCallback(ReceiveCallbackTV6 callback_);
bool RemReceiveCallback();

bool AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_);
Expand Down Expand Up @@ -107,7 +107,7 @@ namespace eCAL
SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); }

void InitializeLayers();
size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);
size_t ApplySample(const Payload::Topic& topic_id_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);

std::string Dump(const std::string& indent_ = "");

Expand Down Expand Up @@ -153,7 +153,7 @@ namespace eCAL
long long m_read_time = 0;

std::mutex m_receive_callback_mtx;
ReceiveCallbackT m_receive_callback;
ReceiveCallbackTV6 m_receive_callback;
std::atomic<int> m_receive_time;

std::deque<size_t> m_sample_hash_queue;
Expand Down
26 changes: 14 additions & 12 deletions ecal/core/src/readwrite/shm/ecal_reader_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,27 @@ namespace eCAL
{
const std::string process_id = std::to_string(Process::GetProcessID());
const std::string memfile_event = memfile_name + "_" + process_id;
const MemFileDataCallbackT memfile_data_callback = std::bind(&CSHMReaderLayer::OnNewShmFileContent, this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5,
std::placeholders::_6,
std::placeholders::_7,
std::placeholders::_8);
g_memfile_pool()->ObserveFile(memfile_name, memfile_event, par_.topic_name, par_.topic_id, Config::GetRegistrationTimeoutMs(), memfile_data_callback);

Payload::Topic topic;
topic.tname = par_.topic_name;
topic.hname = par_.host_name;
topic.tid = par_.topic_id;
topic.pid = par_.process_id;

auto data_callback = [this, topic](const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_)->size_t
{
return OnNewShmFileContent(topic, buf_, len_, id_, clock_, time_, hash_);
};
g_memfile_pool()->ObserveFile(memfile_name, memfile_event, Config::GetRegistrationTimeoutMs(), data_callback);
}
}
}

size_t CSHMReaderLayer::OnNewShmFileContent(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_)
size_t CSHMReaderLayer::OnNewShmFileContent(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_)
{
if (g_subgate() != nullptr)
{
if (g_subgate()->ApplySample(topic_name_, topic_id_, buf_, len_, id_, clock_, time_, hash_, tl_ecal_shm))
if (g_subgate()->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, tl_ecal_shm))
{
return len_;
}
Expand Down
5 changes: 3 additions & 2 deletions ecal/core/src/readwrite/shm/ecal_reader_shm.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@

#include "ecal_def.h"
#include "readwrite/ecal_reader_layer.h"
#include "serialization/ecal_struct_sample_payload.h"

#include <cstddef>
#include <memory>
Expand All @@ -48,6 +49,6 @@ namespace eCAL
void SetConnectionParameter(SReaderLayerPar& par_) override;

private:
size_t OnNewShmFileContent(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_);
size_t OnNewShmFileContent(const Payload::Topic& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_);
};
}
5 changes: 2 additions & 3 deletions ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ namespace eCAL
// use this intermediate variables as optimization
const auto& ecal_header_topic = m_ecal_header.topic;
const auto& ecal_header_content = m_ecal_header.content;
// apply sample

g_subgate()->ApplySample(
ecal_header_topic.tname,
ecal_header_topic.tid,
ecal_header_topic,
data_payload,
static_cast<size_t>(ecal_header_content.size),
ecal_header_content.id,
Expand Down
3 changes: 2 additions & 1 deletion ecal/core/src/serialization/ecal_struct_sample_payload.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@ namespace eCAL
std::string hname; // host name
std::string tid; // topic id
std::string tname; // topic name
int32_t pid; // process id
};

// Topic content payload
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,11 @@ class SubscriberCreator
std::ostringstream tname;
tname << std::setw(5) << std::setfill('0') << i;
subscribers.emplace_back("Topic" + tname.str(), eCAL::SDataTypeInformation{ ttype, "", tdesc });
subscribers.at(i).AddReceiveCallback(std::bind(&SubscriberCreator::Receive, this));
auto on_receive = [this](const eCAL::Registration::STopicId&, const eCAL::SReceiveCallbackData&)
{
Receive();
};
subscribers.at(i).AddReceiveCallback(on_receive);
}
}

Expand Down

0 comments on commit 13f952b

Please sign in to comment.