Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
, m_db(db)
, m_zmqServer(zmqServer)
{
if (popBatchSize > 0)
{
m_popBatchSize = (size_t)popBatchSize;
}
else
{
m_popBatchSize = DEFAULT_POP_BATCH_SIZE;
SWSS_LOG_ERROR("Invalid pop batch size: Setting it to %d", DEFAULT_POP_BATCH_SIZE);
}

if (dbPersistence)
{
SWSS_LOG_DEBUG("Database persistence enabled, tableName: %s", tableName.c_str());
Expand Down Expand Up @@ -80,7 +90,8 @@ void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const
}

vkco.clear();
for (size_t ie = 0; ie < count; ie++)
auto pop_limit = min(count, m_popBatchSize);
for (size_t ie = 0; ie < pop_limit; ie++)
{
auto& kco = *(m_receivedOperationQueue.front());
vkco.push_back(std::move(kco));
Expand All @@ -90,6 +101,12 @@ void ZmqConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const
m_receivedOperationQueue.pop();
}
}

if (count > m_popBatchSize)
{
// Notify epoll to wake up and continue to pop.
m_selectableEvent.notify();
}
}

size_t ZmqConsumerStateTable::dbUpdaterQueueSize()
Expand Down
2 changes: 2 additions & 0 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
ZmqServer& m_zmqServer;

std::unique_ptr<AsyncDBUpdater> m_asyncDBUpdater;

size_t m_popBatchSize;
};

}
91 changes: 91 additions & 0 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,94 @@ TEST(ZmqServerLazzyBind, test)
EXPECT_EQ(received, 1);
}

// Parameterized test structure for ZmqConsumerStateTablePopSize
struct PopSizeTestParams
{
int batchSize;
int numElements;
int expectedPopCount;
vector<int> expectedSizes;
};

class ZmqConsumerStateTablePopSize : public ::testing::TestWithParam<PopSizeTestParams>
{
};

TEST_P(ZmqConsumerStateTablePopSize, test)
{
auto params = GetParam();
std::string testTableName = "ZMQ_BATCH_SIZE_UT";
std::string pushEndpoint = "tcp://localhost:1235";
std::string pullEndpoint = "tcp://*:1235";
int popCount = 0;
vector<int> recvdSizes;

// Start consumer first
thread *consumerThread = new thread([&]() {
cout << "Consumer thread started" << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(pullEndpoint);
Selectable* c = new ZmqConsumerStateTable(&db, testTableName, server, params.batchSize, 0, false);
Select cs;
cs.addSelectable(c);

Selectable *selectcs;
std::deque<KeyOpFieldsValuesTuple> vkco;

const auto timeout = std::chrono::seconds(15);
auto startTime = std::chrono::steady_clock::now();

while (popCount < params.expectedPopCount && (std::chrono::steady_clock::now() - startTime < timeout))
{
cout << "Entering select" << endl;
if (cs.select(&selectcs, 1000, true) == Select::OBJECT)
{
((ZmqConsumerStateTable*)c)->pops(vkco);
cout << "pops: " << vkco.size() << endl;
recvdSizes.push_back((int)vkco.size());
popCount++;
vkco.clear();
}
}
delete c;
});

// Wait for consumer to start
sleep(1);

// Producer sends elements
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint, 3000);
ZmqProducerStateTable p(&db, testTableName, client, false);

std::vector<KeyOpFieldsValuesTuple> kcos;
for (int i = 0; i < params.numElements; i++)
{
kcos.push_back(KeyOpFieldsValuesTuple{
"key_" + to_string(i),
SET_COMMAND,
std::vector<FieldValueTuple>{FieldValueTuple{"field", "value"}}
});
}
p.send(kcos);
cout << "Producer sent " << kcos.size() << " elements" << endl;

consumerThread->join();
delete consumerThread;

cout << "Consumer thread joined" << endl;
EXPECT_EQ(popCount, params.expectedPopCount) << "popCount: " << popCount << ", expected: " << params.expectedPopCount;
for (int i = 0; i < popCount; i++)
{
EXPECT_EQ(recvdSizes[i], params.expectedSizes[i]) << "recvdSizes[" << i << "]: " << recvdSizes[i] << ", expected: " << params.expectedSizes[i];
}
}

INSTANTIATE_TEST_SUITE_P(
BatchSizeTests,
ZmqConsumerStateTablePopSize,
::testing::Values(
PopSizeTestParams{40, 150, 4, {40, 40, 40, 30}},
PopSizeTestParams{-1, 384, 3, {128, 128, 128}}
)
);
Loading