Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions include/courtier/GAsioConsumerT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class GAsioConsumerT
std::shared_ptr<processable_type> p;

// Try to retrieve a work item from the broker
m_broker_ptr->get(p, m_timeout);
GBROKER(processable_type)->get(p, m_timeout);

// May be empty, if we ran into a timeout
return p;
Expand All @@ -1112,7 +1112,7 @@ class GAsioConsumerT
);
}

if(not m_broker_ptr->put(p, m_timeout)) {
if(not GBROKER(processable_type)->put(p, m_timeout)) {
glogger
<< "In GAsioConsumerT<>::putPayloadItem():" << std::endl
<< "Work item could not be submitted to the broker" << std::endl
Expand Down Expand Up @@ -1185,7 +1185,6 @@ class GAsioConsumerT
std::atomic<std::size_t> m_n_active_sessions{0};
std::size_t m_n_max_reconnects = GASIOCONSUMERMAXCONNECTIONATTEMPTS;

std::shared_ptr<typename Gem::Courtier::GBrokerT<processable_type>> m_broker_ptr = GBROKER(processable_type); ///< Simplified access to the broker
const std::chrono::duration<double> m_timeout = std::chrono::milliseconds(GBEASTMSTIMEOUT); ///< A timeout for put- and get-operations via the broker

//-------------------------------------------------------------------------
Expand Down
7 changes: 2 additions & 5 deletions include/courtier/GMPIConsumerT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ namespace Gem::Courtier {
std::shared_ptr<processable_type> p;

// Try to retrieve a work item from the broker
m_brokerPtr->get(p, m_timeout);
GBROKER(processable_type)->get(p, m_timeout);

// May be empty, if we ran into a timeout
return p;
Expand All @@ -1014,7 +1014,7 @@ namespace Gem::Courtier {
<< "Function called with empty work item" << std::endl);
}

if (not m_brokerPtr->put(p, m_timeout)) {
if (not GBROKER(processable_type)->put(p, m_timeout)) {
glogger
<< "In GMPIConsumerMasterNodeT<>::putPayloadItem():" << std::endl
<< "Work item could not be submitted to the broker" << std::endl
Expand Down Expand Up @@ -1048,9 +1048,6 @@ namespace Gem::Courtier {
std::vector<std::shared_ptr<GMPIConsumerSessionT<processable_type>>> m_openSessions{};
// whether a stop request for the GMPIConsumerT has been received
std::atomic_bool m_isToldToStop;
// whether the stop request has been sent to all clients
std::shared_ptr<typename Gem::Courtier::GBrokerT<processable_type>> m_brokerPtr = GBROKER(
processable_type); ///< Simplified access to the broker
const std::chrono::duration<double> m_timeout = std::chrono::milliseconds(
GMPICONSUMERBROKERACCESSBROKERTIMEOUT); ///< A timeout for put- and get-operations via the broker
};
Expand Down
5 changes: 2 additions & 3 deletions include/courtier/GSerialConsumerT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ class GSerialConsumerT
const std::chrono::milliseconds& timeout
) -> std::shared_ptr<processable_type> {
std::shared_ptr<processable_type> p;
m_broker_ptr->get(p, timeout);
GBROKER(processable_type)->get(p, timeout);
return p;
}
//----------------------
, [this](
std::shared_ptr<processable_type> p
, const std::chrono::milliseconds& timeout
) -> void { m_broker_ptr->put(p, timeout); }
) -> void { GBROKER(processable_type)->put(p, timeout); }
//----------------------
, [this]() -> bool { return this->stopped(); }
//----------------------
Expand Down Expand Up @@ -304,7 +304,6 @@ class GSerialConsumerT
std::shared_ptr<GLocalConsumerWorkerT<processable_type>> m_worker; ///< Holds the worker assigned to this consumer
std::shared_ptr<GLocalConsumerWorkerT<processable_type>> m_workerTemplate; ///< Holds an external worker assigned to this consumer

std::shared_ptr<GBrokerT<processable_type>> m_broker_ptr = GBROKER(processable_type); ///< A shortcut to the broker so we do not have to go through the singleton
};

/******************************************************************************/
Expand Down
6 changes: 2 additions & 4 deletions include/courtier/GStdThreadConsumerT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,14 @@ class GStdThreadConsumerT
const std::chrono::milliseconds& timeout
) -> std::shared_ptr<processable_type> {
std::shared_ptr<processable_type> p;
m_broker_ptr->get(p, timeout);
GBROKER(processable_type)->get(p, timeout);
return p;
}
//----------------------
, [this](
std::shared_ptr<processable_type> p
, const std::chrono::milliseconds& timeout
) -> void { m_broker_ptr->put(p, timeout); }
) -> void { GBROKER(processable_type)->put(p, timeout); }
//----------------------
, [this]() -> bool { return this->stopped(); }
//----------------------
Expand Down Expand Up @@ -398,8 +398,6 @@ class GStdThreadConsumerT

std::vector<std::shared_ptr<GLocalConsumerWorkerT<processable_type>>> m_workers; ///< Holds the current worker objects
std::shared_ptr<GLocalConsumerWorkerT<processable_type>> m_workerTemplate; ///< All workers will be created as a clone of this worker

std::shared_ptr<GBrokerT<processable_type>> m_broker_ptr = GBROKER(processable_type); ///< A shortcut to the broker so we do not have to go through the singleton
};

/******************************************************************************/
Expand Down
5 changes: 2 additions & 3 deletions include/courtier/GWebsocketConsumerT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ class GWebsocketConsumerT
std::shared_ptr<processable_type> p;

// Try to retrieve a work item from the broker
m_broker_ptr->get(p, m_timeout);
GBROKER(processable_type)->get(p, m_timeout);

// May be empty, if we ran into a timeout
return p;
Expand All @@ -1428,7 +1428,7 @@ class GWebsocketConsumerT
);
}

if(not m_broker_ptr->put(p, m_timeout)) {
if(not GBROKER(processable_type)->put(p, m_timeout)) {
glogger
<< "In GWebsocketConsumerT<>::putPayloadItem():" << std::endl
<< "Work item could not be submitted to the broker" << std::endl
Expand Down Expand Up @@ -1502,7 +1502,6 @@ class GWebsocketConsumerT
std::size_t m_ping_interval = GBEASTCONSUMERPINGINTERVAL;
bool m_verbose_control_frames = false; ///< Whether the control_callback should emit information when a control frame is received

std::shared_ptr<GBrokerT<processable_type>> m_broker_ptr = GBROKER(processable_type); ///< Simplified access to the broker
const std::chrono::duration<double> m_timeout = std::chrono::milliseconds(GBEASTMSTIMEOUT); ///< A timeout for put- and get-operations via the broker

//-------------------------------------------------------------------------
Expand Down