Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for issue 416: Impossible to retain transfer ids #420

Merged
merged 9 commits into from
Jan 31, 2025
2 changes: 1 addition & 1 deletion include/libcyphal/application/node/heartbeat_producer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class HeartbeatProducer final // NOSONAR cpp:S3624
void publishMessage(const TimePoint approx_now)
{
// Publishing of heartbeats makes sense only if the local node ID is known.
if (presentation_.transport().getLocalNodeId() == cetl::nullopt)
if (!presentation_.transport().getLocalNodeId().has_value())
{
return;
}
Expand Down
8 changes: 4 additions & 4 deletions include/libcyphal/presentation/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ class Client final : public detail::ClientBase
// For request (and the following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
auto& shared_client = getSharedClient();
const auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
Expand Down Expand Up @@ -325,8 +325,8 @@ class RawServiceClient final : public detail::ClientBase
// 1. For request (and following response) we need to allocate a transfer ID,
// which will be in use to pair the request with the response.
//
auto& shared_client = getSharedClient();
auto opt_transfer_id = shared_client.nextTransferId();
auto& shared_client = getSharedClient();
const auto opt_transfer_id = shared_client.nextTransferId();
if (!opt_transfer_id)
{
return TooManyPendingRequestsError{};
Expand Down
74 changes: 53 additions & 21 deletions include/libcyphal/presentation/client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "libcyphal/executor.hpp"
#include "libcyphal/transport/errors.hpp"
#include "libcyphal/transport/svc_sessions.hpp"
#include "libcyphal/transport/transfer_id_generators.hpp"
#include "libcyphal/transport/transfer_id_map.hpp"
#include "libcyphal/transport/types.hpp"
#include "libcyphal/types.hpp"

Expand All @@ -34,7 +34,9 @@ namespace presentation
namespace detail
{

class SharedClient : public common::cavl::Node<SharedClient>, public SharedObject
class SharedClient : public common::cavl::Node<SharedClient>,
public SharedObject,
protected transport::detail::ITransferIdStorage
{
public:
using Node::remove;
Expand Down Expand Up @@ -134,17 +136,24 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
, svc_request_tx_session_{std::move(svc_request_tx_session)}
, svc_response_rx_session_{std::move(svc_response_rx_session)}
, response_rx_params_{svc_response_rx_session_->getParams()}
, next_transfer_id_{0}
, nearest_deadline_{DistantFuture()}
{
CETL_DEBUG_ASSERT(svc_request_tx_session_ != nullptr, "");
CETL_DEBUG_ASSERT(svc_response_rx_session_ != nullptr, "");

if (const auto* const transfer_id_map = delegate.getTransferIdMap())
{
const SessionSpec session_spec{response_rx_params_.service_id, response_rx_params_.server_node_id};
next_transfer_id_ = transfer_id_map->getIdFor(session_spec);
}

// Override the default (2s) timeout value of the response session.
// This is done to allow multiple overlapping responses to be handled properly.
// Otherwise, the responses would be rejected (as "duplicates") if their transfer IDs are in order.
// Real duplicates (f.e. caused by redundant transports) won't cause any issues
// b/c shared RPC client expects/accepts only one response per transfer ID,
// and corresponding promise callback node will be removed after the first response.
// and the corresponding promise callback node will be removed after the first response.
svc_response_rx_session_->setTransferIdTimeout({});

svc_response_rx_session_->setOnReceiveCallback([this](const auto& arg) {
Expand Down Expand Up @@ -199,7 +208,7 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
{
if (timeout_node.isTimeoutLinked())
{
// Remove previous timeout node (if any),
// Remove the previous timeout node (if any),
// and then reinsert the node with updated/given new deadline time.
//
timeout_nodes_by_deadline_.remove(&timeout_node);
Expand Down Expand Up @@ -240,9 +249,27 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec

void destroy() noexcept override
{
if (auto* const transfer_id_map = delegate_.getTransferIdMap())
{
const SessionSpec session_spec{response_rx_params_.service_id, response_rx_params_.server_node_id};
transfer_id_map->setIdFor(session_spec, next_transfer_id_);
}

delegate_.forgetSharedClient(*this);
}

// MARK: ITransferIdStorage

transport::TransferId load() const noexcept override
{
return next_transfer_id_;
}

void save(const transport::TransferId transfer_id) noexcept override
{
next_transfer_id_ = transfer_id;
}

protected:
virtual void insertNewCallbackNode(CallbackNode& callback_node)
{
Expand Down Expand Up @@ -273,7 +300,8 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
}

private:
using Schedule = IExecutor::Callback::Schedule;
using Schedule = IExecutor::Callback::Schedule;
using SessionSpec = transport::ITransferIdMap::SessionSpec;

static constexpr TimePoint DistantFuture()
{
Expand Down Expand Up @@ -388,6 +416,7 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec
const UniquePtr<transport::IRequestTxSession> svc_request_tx_session_;
const UniquePtr<transport::IResponseRxSession> svc_response_rx_session_;
const transport::ResponseRxParams response_rx_params_;
transport::TransferId next_transfer_id_;
common::cavl::Tree<CallbackNode> cb_nodes_by_transfer_id_;
TimePoint nearest_deadline_;
common::cavl::Tree<TimeoutNode> timeout_nodes_by_deadline_;
Expand All @@ -399,8 +428,8 @@ class SharedClient : public common::cavl::Node<SharedClient>, public SharedObjec

/// @brief Defines a shared client implementation that uses a generic transfer ID generator.
///
template <typename TransferIdGeneratorMixin>
class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
template <typename TransferIdGenerator>
class ClientImpl final : public SharedClient
{
public:
ClientImpl(IPresentationDelegate& delegate,
Expand All @@ -409,7 +438,7 @@ class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
UniquePtr<transport::IResponseRxSession> svc_response_rx_session,
const transport::TransferId transfer_id_modulo)
: SharedClient{delegate, executor, std::move(svc_request_tx_session), std::move(svc_response_rx_session)}
, TransferIdGeneratorMixin{transfer_id_modulo}
, transfer_id_generator_{transfer_id_modulo, *this}
{
}

Expand All @@ -424,40 +453,41 @@ class ClientImpl final : public SharedClient, private TransferIdGeneratorMixin
private:
using Base = SharedClient;

// MARK: SharedClient

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return TransferIdGeneratorMixin::nextTransferId();
}

void insertNewCallbackNode(CallbackNode& callback_node) override
{
SharedClient::insertNewCallbackNode(callback_node);
TransferIdGeneratorMixin::retainTransferId(callback_node.getTransferId());
transfer_id_generator_.retainTransferId(callback_node.getTransferId());
}

void removeCallbackNode(CallbackNode& callback_node) override
{
TransferIdGeneratorMixin::releaseTransferId(callback_node.getTransferId());
transfer_id_generator_.releaseTransferId(callback_node.getTransferId());
SharedClient::removeCallbackNode(callback_node);
}

// MARK: SharedClient

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return transfer_id_generator_.nextTransferId();
}

TransferIdGenerator transfer_id_generator_;

}; // ClientImpl<TransferIdGeneratorMixin>

/// @brief Defines a shared client specialization that uses a trivial transfer ID generator.
///
template <>
class ClientImpl<transport::detail::TrivialTransferIdGenerator> final
: public SharedClient,
private transport::detail::TrivialTransferIdGenerator
class ClientImpl<transport::detail::TrivialTransferIdGenerator> final : public SharedClient
{
public:
ClientImpl(IPresentationDelegate& delegate,
IExecutor& executor,
UniquePtr<transport::IRequestTxSession> svc_request_tx_session,
UniquePtr<transport::IResponseRxSession> svc_response_rx_session)
: Base{delegate, executor, std::move(svc_request_tx_session), std::move(svc_response_rx_session)}
, transfer_id_generator_{*this}
{
}

Expand All @@ -476,9 +506,11 @@ class ClientImpl<transport::detail::TrivialTransferIdGenerator> final

CETL_NODISCARD cetl::optional<transport::TransferId> nextTransferId() noexcept override
{
return TrivialTransferIdGenerator::nextTransferId();
return transfer_id_generator_.nextTransferId();
}

transport::detail::TrivialTransferIdGenerator transfer_id_generator_;

}; // ClientImpl<TrivialTransferIdGenerator>

} // namespace detail
Expand Down
Loading