From 27a70259f0699623fd776af793a10c38d1389ac6 Mon Sep 17 00:00:00 2001 From: Sonic Build Admin Date: Thu, 9 Oct 2025 16:19:21 +0000 Subject: [PATCH] Add pop batch size support for ZMQ Consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **What i did** Add pop batch size support to ZmqConsumerState Table to optimize memory and increase the speed for updating CRM counters/DASH Feedback when applying dash configuration at scale Example: Let's say we have a GNMI server which pushed X entries to orchagent. Current logic of ZmqConsumerState table would move X entries to m_toSync map. Dashorch would create X entries in bulker. However, max_bulk size is often limited (currently 1000) And definitely much less than the size of m_toSync in this scale scenario. So, effective memory during this time is `2 * X (1 copy in m_toSync + 1 copy in bulker)* size per object `until all those entries are applied to ASIC. 1. With this change, only pop batch size entries are popped out to m_toSync and added to bulker. Thus peak memory utilization is cut in half in case of Dash Scale. 2. Another side effect of this change is the postprocessing for pop batch size items is done immediately in orchagent and there is no delay on updating CRM or GNMI Feedback loop. If not, post processing starts only after all the entries in m_toSync are applied to syncd which is not capped for current design **How i verified** UT and applying DASH config and making sure everything works Before the update: ``` [ RUN  ] ZmqConsumerStateTablePopSize.test Consumer thread started Entering select Producer sent 150 elements pops: 150 Consumer thread joined tests/zmq_state_ut.cpp:636: Failure Expected equality of these values: popCount Which is: 1 4 popCount: 1, expected: 4 tests/zmq_state_ut.cpp:639: Failure Expected equality of these values: recvdSizes[i] Which is: 150 expectedSizes[i] Which is: 40 recvdSizes[0]: 150, expected: 40 [  FAILED ] ZmqConsumerStateTablePopSize.test (16017 ms) ``` After the update: ``` [----------] 1 test from ZmqConsumerStateTablePopSize [ RUN ] ZmqConsumerStateTablePopSize.test Consumer thread started Entering select Producer sent 150 elements pops: 40 Entering select pops: 40 Entering select pops: 40 Entering select pops: 30 Consumer thread joined [ OK ] ZmqConsumerStateTablePopSize.test (2008 ms) [----------] 1 test from ZmqConsumerStateTablePopSize (2008 ms total) [----------] Global test environment tear-down [==========] 1 test from 1 test suite ran. (2012 ms total) [ PASSED ] 1 test. ``` --- common/zmqconsumerstatetable.cpp | 19 ++++++- common/zmqconsumerstatetable.h | 2 + tests/zmq_state_ut.cpp | 91 ++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 5f58482..5900927 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -23,6 +23,16 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string , m_db(db) , m_zmqServer(zmqServer) { + if (popBatchSize > 0) + { + m_popBatchSize = (size_t)popBatchSize; + } + else + { + m_popBatchSize = DEFAULT_POP_BATCH_SIZE; + SWSS_LOG_ERROR("Invalid pop batch size: Setting it to %d", DEFAULT_POP_BATCH_SIZE); + } + if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); @@ -80,7 +90,8 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const } vkco.clear(); - for (size_t ie = 0; ie < count; ie++) + auto pop_limit = min(count, m_popBatchSize); + for (size_t ie = 0; ie < pop_limit; ie++) { auto& kco = *(m_receivedOperationQueue.front()); vkco.push_back(std::move(kco)); @@ -90,6 +101,12 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const m_receivedOperationQueue.pop(); } } + + if (count > m_popBatchSize) + { + // Notify epoll to wake up and continue to pop. + m_selectableEvent.notify(); + } } size_t ZmqConsumerStateTable::dbUpdaterQueueSize() diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index dece60b..10441ac 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -83,6 +83,8 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes ZmqServer& m_zmqServer; std::unique_ptr m_asyncDBUpdater; + + size_t m_popBatchSize; }; } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 6b2d9a2..509c1f7 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -612,3 +612,94 @@ TEST(ZmqServerLazzyBind, test) EXPECT_EQ(received, 1); } +// Parameterized test structure for ZmqConsumerStateTablePopSize +struct PopSizeTestParams +{ + int batchSize; + int numElements; + int expectedPopCount; + vector expectedSizes; +}; + +class ZmqConsumerStateTablePopSize : public ::testing::TestWithParam +{ +}; + +TEST_P(ZmqConsumerStateTablePopSize, test) +{ + auto params = GetParam(); + std::string testTableName = "ZMQ_BATCH_SIZE_UT"; + std::string pushEndpoint = "tcp://localhost:1235"; + std::string pullEndpoint = "tcp://*:1235"; + int popCount = 0; + vector recvdSizes; + + // Start consumer first + thread *consumerThread = new thread([&]() { + cout << "Consumer thread started" << endl; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(pullEndpoint); + Selectable* c = new ZmqConsumerStateTable(&db, testTableName, server, params.batchSize, 0, false); + Select cs; + cs.addSelectable(c); + + Selectable *selectcs; + std::deque vkco; + + const auto timeout = std::chrono::seconds(15); + auto startTime = std::chrono::steady_clock::now(); + + while (popCount < params.expectedPopCount && (std::chrono::steady_clock::now() - startTime < timeout)) + { + cout << "Entering select" << endl; + if (cs.select(&selectcs, 1000, true) == Select::OBJECT) + { + ((ZmqConsumerStateTable*)c)->pops(vkco); + cout << "pops: " << vkco.size() << endl; + recvdSizes.push_back((int)vkco.size()); + popCount++; + vkco.clear(); + } + } + delete c; + }); + + // Wait for consumer to start + sleep(1); + + // Producer sends elements + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client, false); + + std::vector kcos; + for (int i = 0; i < params.numElements; i++) + { + kcos.push_back(KeyOpFieldsValuesTuple{ + "key_" + to_string(i), + SET_COMMAND, + std::vector{FieldValueTuple{"field", "value"}} + }); + } + p.send(kcos); + cout << "Producer sent " << kcos.size() << " elements" << endl; + + consumerThread->join(); + delete consumerThread; + + cout << "Consumer thread joined" << endl; + EXPECT_EQ(popCount, params.expectedPopCount) << "popCount: " << popCount << ", expected: " << params.expectedPopCount; + for (int i = 0; i < popCount; i++) + { + EXPECT_EQ(recvdSizes[i], params.expectedSizes[i]) << "recvdSizes[" << i << "]: " << recvdSizes[i] << ", expected: " << params.expectedSizes[i]; + } +} + +INSTANTIATE_TEST_SUITE_P( + BatchSizeTests, + ZmqConsumerStateTablePopSize, + ::testing::Values( + PopSizeTestParams{40, 150, 4, {40, 40, 40, 30}}, + PopSizeTestParams{-1, 384, 3, {128, 128, 128}} + ) +);