diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 5f58482..5900927 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -23,6 +23,16 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string , m_db(db) , m_zmqServer(zmqServer) { + if (popBatchSize > 0) + { + m_popBatchSize = (size_t)popBatchSize; + } + else + { + m_popBatchSize = DEFAULT_POP_BATCH_SIZE; + SWSS_LOG_ERROR("Invalid pop batch size: Setting it to %d", DEFAULT_POP_BATCH_SIZE); + } + if (dbPersistence) { SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str()); @@ -80,7 +90,8 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const } vkco.clear(); - for (size_t ie = 0; ie < count; ie++) + auto pop_limit = min(count, m_popBatchSize); + for (size_t ie = 0; ie < pop_limit; ie++) { auto& kco = *(m_receivedOperationQueue.front()); vkco.push_back(std::move(kco)); @@ -90,6 +101,12 @@ void ZmqConsumerStateTable::pops(std::deque &vkco, const m_receivedOperationQueue.pop(); } } + + if (count > m_popBatchSize) + { + // Notify epoll to wake up and continue to pop. + m_selectableEvent.notify(); + } } size_t ZmqConsumerStateTable::dbUpdaterQueueSize() diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index dece60b..10441ac 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -83,6 +83,8 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes ZmqServer& m_zmqServer; std::unique_ptr m_asyncDBUpdater; + + size_t m_popBatchSize; }; } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 6b2d9a2..509c1f7 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -612,3 +612,94 @@ TEST(ZmqServerLazzyBind, test) EXPECT_EQ(received, 1); } +// Parameterized test structure for ZmqConsumerStateTablePopSize +struct PopSizeTestParams +{ + int batchSize; + int numElements; + int expectedPopCount; + vector expectedSizes; +}; + +class ZmqConsumerStateTablePopSize : public ::testing::TestWithParam +{ +}; + +TEST_P(ZmqConsumerStateTablePopSize, test) +{ + auto params = GetParam(); + std::string testTableName = "ZMQ_BATCH_SIZE_UT"; + std::string pushEndpoint = "tcp://localhost:1235"; + std::string pullEndpoint = "tcp://*:1235"; + int popCount = 0; + vector recvdSizes; + + // Start consumer first + thread *consumerThread = new thread([&]() { + cout << "Consumer thread started" << endl; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(pullEndpoint); + Selectable* c = new ZmqConsumerStateTable(&db, testTableName, server, params.batchSize, 0, false); + Select cs; + cs.addSelectable(c); + + Selectable *selectcs; + std::deque vkco; + + const auto timeout = std::chrono::seconds(15); + auto startTime = std::chrono::steady_clock::now(); + + while (popCount < params.expectedPopCount && (std::chrono::steady_clock::now() - startTime < timeout)) + { + cout << "Entering select" << endl; + if (cs.select(&selectcs, 1000, true) == Select::OBJECT) + { + ((ZmqConsumerStateTable*)c)->pops(vkco); + cout << "pops: " << vkco.size() << endl; + recvdSizes.push_back((int)vkco.size()); + popCount++; + vkco.clear(); + } + } + delete c; + }); + + // Wait for consumer to start + sleep(1); + + // Producer sends elements + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client, false); + + std::vector kcos; + for (int i = 0; i < params.numElements; i++) + { + kcos.push_back(KeyOpFieldsValuesTuple{ + "key_" + to_string(i), + SET_COMMAND, + std::vector{FieldValueTuple{"field", "value"}} + }); + } + p.send(kcos); + cout << "Producer sent " << kcos.size() << " elements" << endl; + + consumerThread->join(); + delete consumerThread; + + cout << "Consumer thread joined" << endl; + EXPECT_EQ(popCount, params.expectedPopCount) << "popCount: " << popCount << ", expected: " << params.expectedPopCount; + for (int i = 0; i < popCount; i++) + { + EXPECT_EQ(recvdSizes[i], params.expectedSizes[i]) << "recvdSizes[" << i << "]: " << recvdSizes[i] << ", expected: " << params.expectedSizes[i]; + } +} + +INSTANTIATE_TEST_SUITE_P( + BatchSizeTests, + ZmqConsumerStateTablePopSize, + ::testing::Values( + PopSizeTestParams{40, 150, 4, {40, 40, 40, 30}}, + PopSizeTestParams{-1, 384, 3, {128, 128, 128}} + ) +);