Skip to content

Commit

Permalink
[core] Make STopicId available in subscriber callbacks
Browse files Browse the repository at this point in the history
- Add new Callback Type which includes STopicId (ecal_callback.h)
- Remove topic_name and topic_id from memfile_pool
- Subgate applies Payload::TopicInfo
- Provide new GetId function for publishers and subscribers (should add also for client / servers in different PR)
- Add testcases to check correct IDs are received in callbacks.
- Do not modify info being sent over the wire (for compatibility reasons). Instead modify == and < operators for STopicId.
  • Loading branch information
KerstinKeller committed Sep 12, 2024
1 parent 173132a commit ab4f097
Show file tree
Hide file tree
Showing 25 changed files with 388 additions and 91 deletions.
13 changes: 11 additions & 2 deletions ecal/core/include/ecal/ecal_callback.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ========================= eCAL LICENSE =================================
*
* Copyright (C) 2016 - 2019 Continental Corporation
* Copyright (C) 2016 - 2024 Continental Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -124,13 +124,22 @@ namespace eCAL
};

/**
* @brief Raw data receive callback function type.
* @brief Receive callback function type with topic name and data struct.
*
* @param topic_name_ The topic name of the received message.
* @param data_ Data struct containing payload, timestamp and publication clock.
**/
using ReceiveCallbackT = std::function<void (const char *, const struct SReceiveCallbackData *)>;

/**
* @brief Receive callback function type with topic id and data struct. The topic id contains the topic name, the process
* name, the host name and a uniques topic identifier.
*
* @param topic_id_ The topic id struct of the received message.
* @param data_ Data struct containing payload, timestamp and publication clock.
**/
using ReceiveIDCallbackT = std::function<void(const Registration::STopicId&, const SReceiveCallbackData&)>;

/**
* @brief Timer callback function type.
**/
Expand Down
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ namespace eCAL
**/
ECAL_API std::string GetTopicName() const;

/**
* @brief Gets a unique ID of this Publisher
*
* @return The topic id.
**/
ECAL_API Registration::STopicId GetId() const;

/**
* @brief Gets description of the connected topic.
*
Expand Down
16 changes: 16 additions & 0 deletions ecal/core/include/ecal/ecal_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ namespace eCAL
**/
ECAL_API bool AddReceiveCallback(ReceiveCallbackT callback_);

/**
* @brief Add callback function for incoming receives.
*
* @param callback_ The callback function to add.
*
* @return True if succeeded, false if not.
**/
ECAL_API bool AddReceiveCallback(ReceiveIDCallbackT callback_);

/**
* @brief Remove callback function for incoming receives.
*
Expand Down Expand Up @@ -262,6 +271,13 @@ namespace eCAL
**/
ECAL_API std::string GetTopicName() const;

/**
* @brief Gets a unique ID of this Subscriber
*
* @return The topic id.
**/
ECAL_API Registration::STopicId GetId() const;

/**
* @brief Gets description of the connected topic.
*
Expand Down
36 changes: 30 additions & 6 deletions ecal/core/include/ecal/ecal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#pragma once
#include <string>
#include <tuple>
#include <iostream>

namespace eCAL
{
Expand Down Expand Up @@ -87,40 +88,63 @@ namespace eCAL
{
struct SEntityId
{
std::string entity_id; // unique id within that process
std::string entity_id; // unique id within that process (it should already be unique within the whole system)
int32_t process_id = 0; // process id which produced the sample
std::string host_name; // host which produced the sample

bool operator==(const SEntityId& other) const {
return entity_id == other.entity_id &&
process_id == other.process_id &&
host_name == other.host_name;
return entity_id == other.entity_id;
}

bool operator<(const SEntityId& other) const
{
return std::tie(process_id, entity_id, host_name)
< std::tie(other.process_id, other.entity_id, other.host_name);
return entity_id < other.entity_id;
}
};

// Overload the << operator for SEntityId
inline std::ostream& operator<<(std::ostream& os, const SEntityId& id)
{
os << "SEntityId(entity_id: " << id.entity_id
<< ", process_id: " << id.process_id
<< ", host_name: " << id.host_name << ")";
return os;
}

struct STopicId
{
SEntityId topic_id;
std::string topic_name;

bool operator==(const STopicId& other) const
{
return topic_id == other.topic_id && topic_name == other.topic_name;
}

bool operator<(const STopicId& other) const
{
return std::tie(topic_id, topic_name) < std::tie(other.topic_id, other.topic_name);
}
};

inline std::ostream& operator<<(std::ostream& os, const STopicId& id)
{
os << "STopicId(topic_id: " << id.topic_id
<< ", topic_name: " << id.topic_name << ")";
return os;
}

struct SServiceId
{
SEntityId service_id;
std::string service_name;
std::string method_name;

bool operator==(const SServiceId& other) const
{
return service_id == other.service_id && service_name == other.service_name && method_name == other.method_name;
}

bool operator<(const SServiceId& other) const
{
return std::tie(service_id, service_name, method_name) < std::tie(other.service_id, other.service_name, other.method_name);
Expand Down
20 changes: 10 additions & 10 deletions ecal/core/src/io/shm/ecal_memfile_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace eCAL
return true;
}

bool CMemFileObserver::Start(const std::string& topic_name_, const std::string& topic_id_, const int timeout_, const MemFileDataCallbackT& callback_)
bool CMemFileObserver::Start(const int timeout_, const MemFileDataCallbackT& callback_)
{
if (!m_created) return false;
if (m_is_observing) return false;
Expand All @@ -108,11 +108,11 @@ namespace eCAL
m_is_observing = true;

// start observer thread
m_thread = std::thread(&CMemFileObserver::Observe, this, topic_name_, topic_id_, timeout_);
m_thread = std::thread(&CMemFileObserver::Observe, this, timeout_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, std::string("CMemFileObserver started (" + topic_name_ + ", " + topic_id_ + ")"));
Logging::Log(log_level_debug2, std::string("CMemFileObserver started."));
#endif

return true;
Expand Down Expand Up @@ -147,7 +147,7 @@ namespace eCAL
return true;
}

void CMemFileObserver::Observe(const std::string& topic_name_, const std::string& topic_id_, const int timeout_)
void CMemFileObserver::Observe(const int timeout_)
{
// internal clock sample update checking
uint64_t last_sample_clock(0);
Expand Down Expand Up @@ -223,13 +223,13 @@ namespace eCAL
// calculate user payload address
data_buf = static_cast<const char*>(buf) + mfile_hdr.hdr_size;
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
else
{
// call user callback function
m_data_callback(topic_name_, topic_id_, data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
m_data_callback(data_buf, mfile_hdr.data_size, (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}
}
}
Expand Down Expand Up @@ -264,7 +264,7 @@ namespace eCAL
if (post_process_buffer)
{
// add sample to data reader (and call user callback function)
if (m_data_callback) m_data_callback(topic_name_, topic_id_, receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
if (m_data_callback) m_data_callback(receive_buffer.data(), receive_buffer.size(), (long long)mfile_hdr.id, (long long)mfile_hdr.clock, (long long)mfile_hdr.time, (size_t)mfile_hdr.hash);
}

// send acknowledge event
Expand Down Expand Up @@ -365,7 +365,7 @@ namespace eCAL
m_created = false;
}

bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_)
bool CMemFileThreadPool::ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_)
{
if(!m_created) return(false);
if(memfile_name_.empty()) return(false);
Expand All @@ -388,7 +388,7 @@ namespace eCAL
else
{
observer->Stop();
observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_);
observer->Start(timeout_observation_ms, callback_);
}

return(true);
Expand All @@ -398,7 +398,7 @@ namespace eCAL
{
auto observer = std::make_shared<CMemFileObserver>();
observer->Create(memfile_name_, memfile_event_);
observer->Start(topic_name_, topic_id_, timeout_observation_ms, callback_);
observer->Start(timeout_observation_ms, callback_);
m_observer_pool[memfile_name_] = observer;
#ifndef NDEBUG
// log it
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/io/shm/ecal_memfile_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

namespace eCAL
{
using MemFileDataCallbackT = std::function<size_t (const std::string &, const std::string &, const char *, size_t, long long, long long, long long, size_t)>;
using MemFileDataCallbackT = std::function<size_t (const char *, size_t, long long, long long, long long, size_t)>;

////////////////////////////////////////
// CMemFileObserver
Expand All @@ -61,14 +61,14 @@ namespace eCAL
bool Create(const std::string& memfile_name_, const std::string& memfile_event_);
bool Destroy();

bool Start(const std::string& topic_name_, const std::string& topic_id_, int timeout_, const MemFileDataCallbackT& callback_);
bool Start(int timeout_, const MemFileDataCallbackT& callback_);
bool Stop();
bool IsObserving() {return(m_is_observing);};

bool ResetTimeout();

protected:
void Observe(const std::string& topic_name_, const std::string& topic_id_, int timeout_);
void Observe(int timeout_);
bool ReadFileHeader(SMemFileHeader& memfile_hdr);

std::atomic<bool> m_created;
Expand Down Expand Up @@ -97,7 +97,7 @@ namespace eCAL
void Start();
void Stop();

bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, const std::string& topic_name_, const std::string& topic_id_, int timeout_observation_ms, const MemFileDataCallbackT& callback_);
bool ObserveFile(const std::string& memfile_name_, const std::string& memfile_event_, int timeout_observation_ms, const MemFileDataCallbackT& callback_);

protected:
void CleanupPoolThread();
Expand Down
6 changes: 6 additions & 0 deletions ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ namespace eCAL
return(m_datawriter->GetTopicName());
}

Registration::STopicId CPublisher::GetId() const
{
if (m_datawriter == nullptr) return{};
return(m_datawriter->GetId());
}

SDataTypeInformation CPublisher::GetDataTypeInformation() const
{
if (m_datawriter == nullptr) return(SDataTypeInformation{});
Expand Down
12 changes: 6 additions & 6 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace eCAL
if (layer_ == eTLayerType::tl_none)
{
// log it
Logging::Log(log_level_error, ecal_sample.topic.tname + " : payload received without layer definition !");
Logging::Log(log_level_error, ecal_sample.topic_info.tname + " : payload received without layer definition !");
}
#endif

Expand Down Expand Up @@ -153,7 +153,7 @@ namespace eCAL
{
// apply sample to data reader
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
auto res = m_topic_name_datareader_map.equal_range(ecal_sample.topic.tname);
auto res = m_topic_name_datareader_map.equal_range(ecal_sample.topic_info.tname);
std::transform(
res.first, res.second, std::back_inserter(readers_to_apply), [](const auto& match) { return match.second; }
);
Expand All @@ -163,7 +163,7 @@ namespace eCAL
for (const auto& reader : readers_to_apply)
{
applied_size = reader->ApplySample(
ecal_sample.topic.tid,
ecal_sample.topic_info,
payload_addr,
payload_size,
ecal_sample_content.id,
Expand All @@ -182,7 +182,7 @@ namespace eCAL
return (applied_size > 0);
}

bool CSubGate::ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
bool CSubGate::ApplySample(const Payload::TopicInfo& topic_info_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_)
{
if (!m_created) return false;

Expand All @@ -194,7 +194,7 @@ namespace eCAL
// Apply the samples to the readers afterwards.
{
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
auto res = m_topic_name_datareader_map.equal_range(topic_name_);
auto res = m_topic_name_datareader_map.equal_range(topic_info_.tname);
std::transform(
res.first, res.second, std::back_inserter(readers_to_apply), [](const auto& match) { return match.second; }
);
Expand All @@ -203,7 +203,7 @@ namespace eCAL

for (const auto& reader : readers_to_apply)
{
applied_size = reader->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_);
applied_size = reader->ApplySample(topic_info_, buf_, len_, id_, clock_, time_, hash_, layer_);
}

return (applied_size > 0);
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_subgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace eCAL
bool HasSample(const std::string& sample_name_);

bool ApplySample(const char* serialized_sample_data_, size_t serialized_sample_size_, eTLayerType layer_);
bool ApplySample(const std::string& topic_name_, const std::string& topic_id_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);
bool ApplySample(const Payload::TopicInfo& topic_info_, const char* buf_, size_t len_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);

void ApplyPubRegistration(const Registration::Sample& ecal_sample_);
void ApplyPubUnregistration(const Registration::Sample& ecal_sample_);
Expand Down
17 changes: 16 additions & 1 deletion ecal/core/src/pubsub/ecal_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,16 @@ namespace eCAL

bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_)
{
if(m_datareader == nullptr) return(false);
auto id_callback = [callback_](const Registration::STopicId& topic_id_, const SReceiveCallbackData& data_)
{
callback_(topic_id_.topic_name.c_str(), &data_);
};
return AddReceiveCallback(id_callback);
}

bool CSubscriber::AddReceiveCallback(ReceiveIDCallbackT callback_)
{
if (m_datareader == nullptr) return(false);
RemReceiveCallback();
return(m_datareader->AddReceiveCallback(std::move(callback_)));
}
Expand Down Expand Up @@ -189,6 +198,12 @@ namespace eCAL
if(m_datareader == nullptr) return("");
return(m_datareader->GetTopicName());
}

Registration::STopicId CSubscriber::GetId() const
{
if (m_datareader == nullptr) return{};
return(m_datareader->GetId());
}

SDataTypeInformation CSubscriber::GetDataTypeInformation() const
{
Expand Down
Loading

0 comments on commit ab4f097

Please sign in to comment.