diff --git a/lib/RedisRemoteSaiInterface.cpp b/lib/RedisRemoteSaiInterface.cpp index 2c0381fb..b47e7de6 100644 --- a/lib/RedisRemoteSaiInterface.cpp +++ b/lib/RedisRemoteSaiInterface.cpp @@ -20,6 +20,8 @@ #include +#define SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE (64*1024*1024) + using namespace sairedis; using namespace saimeta; using namespace sairediscommon; @@ -77,13 +79,15 @@ sai_status_t RedisRemoteSaiInterface::apiInitialize( m_useTempView = false; m_syncMode = false; m_redisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC; + m_zmqResponseBufferSize = SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE; if (m_contextConfig->m_zmqEnable) { m_communicationChannel = std::make_shared( m_contextConfig->m_zmqEndpoint, m_contextConfig->m_zmqNtfEndpoint, - std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3)); + std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3), + m_zmqResponseBufferSize); SWSS_LOG_NOTICE("zmq enabled, forcing sync mode"); @@ -420,7 +424,8 @@ sai_status_t RedisRemoteSaiInterface::setRedisExtensionAttribute( m_communicationChannel = std::make_shared( m_contextConfig->m_zmqEndpoint, m_contextConfig->m_zmqNtfEndpoint, - std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3)); + std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3), + m_zmqResponseBufferSize); m_communicationChannel->setResponseTimeout(m_responseTimeoutMs); diff --git a/lib/RedisRemoteSaiInterface.h b/lib/RedisRemoteSaiInterface.h index b800eaf7..d0cf173b 100644 --- a/lib/RedisRemoteSaiInterface.h +++ b/lib/RedisRemoteSaiInterface.h @@ -460,6 +460,8 @@ namespace sairedis uint64_t m_responseTimeoutMs; + size_t m_zmqResponseBufferSize; + std::function)> m_notificationCallback; std::map m_tableDump; diff --git a/lib/ZeroMQChannel.cpp b/lib/ZeroMQChannel.cpp index 04d91cbe..1b50f183 100644 --- a/lib/ZeroMQChannel.cpp +++ b/lib/ZeroMQChannel.cpp @@ -12,24 +12,33 @@ using namespace sairedis; -#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024) #define ZMQ_MAX_RETRY 10 ZeroMQChannel::ZeroMQChannel( _In_ const std::string& endpoint, _In_ const std::string& ntfEndpoint, - _In_ Channel::Callback callback): + _In_ Channel::Callback callback, + _In_ long zmqResponseBufferSize): Channel(callback), m_endpoint(endpoint), m_ntfEndpoint(ntfEndpoint), m_context(nullptr), m_socket(nullptr), m_ntfContext(nullptr), - m_ntfSocket(nullptr) + m_ntfSocket(nullptr), + m_zmqResponseBufferSize(zmqResponseBufferSize) { SWSS_LOG_ENTER(); + if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE) + { + SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize); + } + else + { + SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE); + } - m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + m_buffer.resize(m_zmqResponseBufferSize); // configure ZMQ for main communication @@ -129,14 +138,14 @@ void ZeroMQChannel::notificationThreadFunction() std::vector buffer; - buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + buffer.resize(m_zmqResponseBufferSize); while (m_runNotificationThread) { // NOTE: this entire loop internal could be encapsulated into separate class // which will inherit from Selectable class, and name this as ntf receiver - int rc = zmq_recv(m_ntfSocket, buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + int rc = zmq_recv(m_ntfSocket, buffer.data(), m_zmqResponseBufferSize, 0); if (!m_runNotificationThread) break; @@ -156,10 +165,10 @@ void ZeroMQChannel::notificationThreadFunction() continue; } - if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + if (rc >= m_zmqResponseBufferSize) { SWSS_LOG_WARN("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - ZMQ_RESPONSE_BUFFER_SIZE, + m_zmqResponseBufferSize, rc); continue; @@ -291,7 +300,7 @@ sai_status_t ZeroMQChannel::wait( for (int i = 0; true ; ++i) { - rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0); if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY) { @@ -301,10 +310,10 @@ sai_status_t ZeroMQChannel::wait( { SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); } - if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + if (rc >= m_zmqResponseBufferSize) { SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - ZMQ_RESPONSE_BUFFER_SIZE, + m_zmqResponseBufferSize, rc); } break; diff --git a/lib/ZeroMQChannel.h b/lib/ZeroMQChannel.h index 89fd70ee..47d76f5e 100644 --- a/lib/ZeroMQChannel.h +++ b/lib/ZeroMQChannel.h @@ -10,6 +10,8 @@ #include #include +#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024) + namespace sairedis { class ZeroMQChannel: @@ -20,7 +22,8 @@ namespace sairedis ZeroMQChannel( _In_ const std::string& endpoint, _In_ const std::string& ntfEndpoint, - _In_ Channel::Callback callback); + _In_ Channel::Callback callback, + _In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE); virtual ~ZeroMQChannel(); @@ -63,5 +66,7 @@ namespace sairedis void* m_ntfContext; void* m_ntfSocket; + + long m_zmqResponseBufferSize; }; } diff --git a/meta/ZeroMQSelectableChannel.cpp b/meta/ZeroMQSelectableChannel.cpp index a53c53ca..344992fb 100644 --- a/meta/ZeroMQSelectableChannel.cpp +++ b/meta/ZeroMQSelectableChannel.cpp @@ -6,27 +6,36 @@ #include #include -#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024) - //#define ZMQ_POLL_TIMEOUT (2*60*1000) #define ZMQ_POLL_TIMEOUT (1000) using namespace sairedis; ZeroMQSelectableChannel::ZeroMQSelectableChannel( - _In_ const std::string& endpoint): + _In_ const std::string& endpoint, + _In_ long zmqResponseBufferSize): m_endpoint(endpoint), m_context(nullptr), m_socket(nullptr), m_fd(0), m_allowZmqPoll(false), - m_runThread(true) + m_runThread(true), + m_zmqResponseBufferSize(zmqResponseBufferSize) { SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("binding on %s", endpoint.c_str()); - m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE); + if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE) + { + SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize); + } + else + { + SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE); + } + + m_buffer.resize(m_zmqResponseBufferSize); m_context = zmq_ctx_new();; @@ -230,17 +239,17 @@ uint64_t ZeroMQSelectableChannel::readData() // clear selectable event so it could be triggered in next select() m_selectableEvent.readData(); - int rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0); + int rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0); if (rc < 0) { SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno()); } - if (rc >= ZMQ_RESPONSE_BUFFER_SIZE) + if (rc >= m_zmqResponseBufferSize) { SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED", - ZMQ_RESPONSE_BUFFER_SIZE, + m_zmqResponseBufferSize, rc); } diff --git a/meta/ZeroMQSelectableChannel.h b/meta/ZeroMQSelectableChannel.h index 3ef3fe3b..952e8f38 100644 --- a/meta/ZeroMQSelectableChannel.h +++ b/meta/ZeroMQSelectableChannel.h @@ -9,6 +9,8 @@ #include #include +#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024) + namespace sairedis { class ZeroMQSelectableChannel: @@ -17,7 +19,8 @@ namespace sairedis public: ZeroMQSelectableChannel( - _In_ const std::string& endpoint); + _In_ const std::string& endpoint, + _In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE); virtual ~ZeroMQSelectableChannel(); @@ -75,5 +78,7 @@ namespace sairedis std::shared_ptr m_zmlPollThread; swss::SelectableEvent m_selectableEvent; + + long m_zmqResponseBufferSize; }; } diff --git a/syncd/Syncd.cpp b/syncd/Syncd.cpp index 1aeec2ee..d6ef8bf4 100644 --- a/syncd/Syncd.cpp +++ b/syncd/Syncd.cpp @@ -41,6 +41,7 @@ #define DEF_SAI_WARM_BOOT_DATA_FILE "/var/warmboot/sai-warmboot.bin" #define SAI_FAILURE_DUMP_SCRIPT "/usr/bin/sai_failure_dump.sh" +#define SYNCD_ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024) using namespace syncd; using namespace saimeta; @@ -135,7 +136,7 @@ Syncd::Syncd( m_enableSyncMode = true; - m_selectableChannel = std::make_shared(m_contextConfig->m_zmqEndpoint); + m_selectableChannel = std::make_shared(m_contextConfig->m_zmqEndpoint, SYNCD_ZMQ_RESPONSE_BUFFER_SIZE); } else {