Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions lib/RedisRemoteSaiInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <inttypes.h>

#define SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE (64*1024*1024)

using namespace sairedis;
using namespace saimeta;
using namespace sairediscommon;
Expand Down Expand Up @@ -77,13 +79,15 @@ sai_status_t RedisRemoteSaiInterface::apiInitialize(
m_useTempView = false;
m_syncMode = false;
m_redisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
m_zmqResponseBufferSize = SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE;

if (m_contextConfig->m_zmqEnable)
{
m_communicationChannel = std::make_shared<ZeroMQChannel>(
m_contextConfig->m_zmqEndpoint,
m_contextConfig->m_zmqNtfEndpoint,
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3));
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3),
m_zmqResponseBufferSize);

SWSS_LOG_NOTICE("zmq enabled, forcing sync mode");

Expand Down Expand Up @@ -420,7 +424,8 @@ sai_status_t RedisRemoteSaiInterface::setRedisExtensionAttribute(
m_communicationChannel = std::make_shared<ZeroMQChannel>(
m_contextConfig->m_zmqEndpoint,
m_contextConfig->m_zmqNtfEndpoint,
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3));
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3),
m_zmqResponseBufferSize);

m_communicationChannel->setResponseTimeout(m_responseTimeoutMs);

Expand Down
2 changes: 2 additions & 0 deletions lib/RedisRemoteSaiInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ namespace sairedis

uint64_t m_responseTimeoutMs;

size_t m_zmqResponseBufferSize;

std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> m_notificationCallback;

std::map<sai_object_id_t, swss::TableDump> m_tableDump;
Expand Down
31 changes: 20 additions & 11 deletions lib/ZeroMQChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,33 @@

using namespace sairedis;

#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)
#define ZMQ_MAX_RETRY 10

ZeroMQChannel::ZeroMQChannel(
_In_ const std::string& endpoint,
_In_ const std::string& ntfEndpoint,
_In_ Channel::Callback callback):
_In_ Channel::Callback callback,
_In_ long zmqResponseBufferSize):
Channel(callback),
m_endpoint(endpoint),
m_ntfEndpoint(ntfEndpoint),
m_context(nullptr),
m_socket(nullptr),
m_ntfContext(nullptr),
m_ntfSocket(nullptr)
m_ntfSocket(nullptr),
m_zmqResponseBufferSize(zmqResponseBufferSize)
{
SWSS_LOG_ENTER();
if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE)
{
SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize);
}
else
{
SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
}

m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
m_buffer.resize(m_zmqResponseBufferSize);

// configure ZMQ for main communication

Expand Down Expand Up @@ -129,14 +138,14 @@ void ZeroMQChannel::notificationThreadFunction()

std::vector<uint8_t> buffer;

buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
buffer.resize(m_zmqResponseBufferSize);

while (m_runNotificationThread)
{
// NOTE: this entire loop internal could be encapsulated into separate class
// which will inherit from Selectable class, and name this as ntf receiver

int rc = zmq_recv(m_ntfSocket, buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
int rc = zmq_recv(m_ntfSocket, buffer.data(), m_zmqResponseBufferSize, 0);

if (!m_runNotificationThread)
break;
Expand All @@ -156,10 +165,10 @@ void ZeroMQChannel::notificationThreadFunction()
continue;
}

if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
if (rc >= m_zmqResponseBufferSize)
{
SWSS_LOG_WARN("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
ZMQ_RESPONSE_BUFFER_SIZE,
m_zmqResponseBufferSize,
rc);

continue;
Expand Down Expand Up @@ -291,7 +300,7 @@ sai_status_t ZeroMQChannel::wait(

for (int i = 0; true ; ++i)
{
rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0);

if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY)
{
Expand All @@ -301,10 +310,10 @@ sai_status_t ZeroMQChannel::wait(
{
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}
if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
if (rc >= m_zmqResponseBufferSize)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
ZMQ_RESPONSE_BUFFER_SIZE,
m_zmqResponseBufferSize,
rc);
}
break;
Expand Down
7 changes: 6 additions & 1 deletion lib/ZeroMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <memory>
#include <functional>

#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024)

namespace sairedis
{
class ZeroMQChannel:
Expand All @@ -20,7 +22,8 @@ namespace sairedis
ZeroMQChannel(
_In_ const std::string& endpoint,
_In_ const std::string& ntfEndpoint,
_In_ Channel::Callback callback);
_In_ Channel::Callback callback,
_In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);

virtual ~ZeroMQChannel();

Expand Down Expand Up @@ -63,5 +66,7 @@ namespace sairedis
void* m_ntfContext;

void* m_ntfSocket;

long m_zmqResponseBufferSize;
};
}
25 changes: 17 additions & 8 deletions meta/ZeroMQSelectableChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,36 @@
#include <zmq.h>
#include <unistd.h>

#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)

//#define ZMQ_POLL_TIMEOUT (2*60*1000)
#define ZMQ_POLL_TIMEOUT (1000)

using namespace sairedis;

ZeroMQSelectableChannel::ZeroMQSelectableChannel(
_In_ const std::string& endpoint):
_In_ const std::string& endpoint,
_In_ long zmqResponseBufferSize):
m_endpoint(endpoint),
m_context(nullptr),
m_socket(nullptr),
m_fd(0),
m_allowZmqPoll(false),
m_runThread(true)
m_runThread(true),
m_zmqResponseBufferSize(zmqResponseBufferSize)
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("binding on %s", endpoint.c_str());

m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE)
{
SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize);
}
else
{
SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
}

m_buffer.resize(m_zmqResponseBufferSize);

m_context = zmq_ctx_new();;

Expand Down Expand Up @@ -230,17 +239,17 @@ uint64_t ZeroMQSelectableChannel::readData()
// clear selectable event so it could be triggered in next select()
m_selectableEvent.readData();

int rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
int rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0);

if (rc < 0)
{
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}

if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
if (rc >= m_zmqResponseBufferSize)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
ZMQ_RESPONSE_BUFFER_SIZE,
m_zmqResponseBufferSize,
rc);
}

Expand Down
7 changes: 6 additions & 1 deletion meta/ZeroMQSelectableChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <thread>
#include <memory>

#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024)

namespace sairedis
{
class ZeroMQSelectableChannel:
Expand All @@ -17,7 +19,8 @@ namespace sairedis
public:

ZeroMQSelectableChannel(
_In_ const std::string& endpoint);
_In_ const std::string& endpoint,
_In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);

virtual ~ZeroMQSelectableChannel();

Expand Down Expand Up @@ -75,5 +78,7 @@ namespace sairedis
std::shared_ptr<std::thread> m_zmlPollThread;

swss::SelectableEvent m_selectableEvent;

long m_zmqResponseBufferSize;
};
}
3 changes: 2 additions & 1 deletion syncd/Syncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#define DEF_SAI_WARM_BOOT_DATA_FILE "/var/warmboot/sai-warmboot.bin"
#define SAI_FAILURE_DUMP_SCRIPT "/usr/bin/sai_failure_dump.sh"
#define SYNCD_ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)

using namespace syncd;
using namespace saimeta;
Expand Down Expand Up @@ -135,7 +136,7 @@ Syncd::Syncd(

m_enableSyncMode = true;

m_selectableChannel = std::make_shared<sairedis::ZeroMQSelectableChannel>(m_contextConfig->m_zmqEndpoint);
m_selectableChannel = std::make_shared<sairedis::ZeroMQSelectableChannel>(m_contextConfig->m_zmqEndpoint, SYNCD_ZMQ_RESPONSE_BUFFER_SIZE);
}
else
{
Expand Down
Loading