Skip to content

Commit c99f5a1

Browse files
committed
support multi thread for tcp io
1 parent c426bd4 commit c99f5a1

22 files changed

+244
-158
lines changed

include/phxpaxos/network.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ class NetWork
4444
//If paxoslib call this function, network need to stop receive any message.
4545
virtual void StopNetWork() = 0;
4646

47-
virtual int SendMessageTCP(const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
47+
virtual int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
4848

49-
virtual int SendMessageUDP(const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
49+
virtual int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage) = 0;
5050

5151
//When receive a message, call this funtion.
5252
//This funtion is async, just enqueue an return.

include/phxpaxos/options.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ class Options
152152
//Message size under iUDPMaxSize we use udp to send.
153153
//Default is 4096.
154154
size_t iUDPMaxSize;
155+
156+
//optional
157+
//Our default network io thread count.
158+
//Default is 1.
159+
int iIOThreadCount;
155160

156161
//optional
157162
//We support to run multi phxpaxos on one process.

src/algorithm/base.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ int Base :: SendMessage(const nodeid_t iSendtoNodeID, const CheckpointMsg & oChe
203203
return ret;
204204
}
205205

206-
return m_poMsgTransport->SendMessage(iSendtoNodeID, sBuffer, iSendType);
206+
return m_poMsgTransport->SendMessage(m_poConfig->GetMyGroupIdx(), iSendtoNodeID, sBuffer, iSendType);
207207
}
208208

209209
int Base :: SendMessage(const nodeid_t iSendtoNodeID, const PaxosMsg & oPaxosMsg, const int iSendType)
@@ -228,7 +228,7 @@ int Base :: SendMessage(const nodeid_t iSendtoNodeID, const PaxosMsg & oPaxosMsg
228228
return ret;
229229
}
230230

231-
return m_poMsgTransport->SendMessage(iSendtoNodeID, sBuffer, iSendType);
231+
return m_poMsgTransport->SendMessage(m_poConfig->GetMyGroupIdx(), iSendtoNodeID, sBuffer, iSendType);
232232
}
233233

234234
int Base :: BroadcastMessage(const PaxosMsg & oPaxosMsg, const int iRunType, const int iSendType)
@@ -255,7 +255,7 @@ int Base :: BroadcastMessage(const PaxosMsg & oPaxosMsg, const int iRunType, con
255255
return ret;
256256
}
257257

258-
ret = m_poMsgTransport->BroadcastMessage(sBuffer, iSendType);
258+
ret = m_poMsgTransport->BroadcastMessage(m_poConfig->GetMyGroupIdx(), sBuffer, iSendType);
259259

260260
if (iRunType == BroadcastMessage_Type_RunSelf_Final)
261261
{
@@ -274,7 +274,7 @@ int Base :: BroadcastMessageToFollower(const PaxosMsg & oPaxosMsg, const int iSe
274274
return ret;
275275
}
276276

277-
return m_poMsgTransport->BroadcastMessageFollower(sBuffer, iSendType);
277+
return m_poMsgTransport->BroadcastMessageFollower(m_poConfig->GetMyGroupIdx(), sBuffer, iSendType);
278278
}
279279

280280
int Base :: BroadcastMessageToTempNode(const PaxosMsg & oPaxosMsg, const int iSendType)
@@ -286,7 +286,7 @@ int Base :: BroadcastMessageToTempNode(const PaxosMsg & oPaxosMsg, const int iSe
286286
return ret;
287287
}
288288

289-
return m_poMsgTransport->BroadcastMessageTempNode(sBuffer, iSendType);
289+
return m_poMsgTransport->BroadcastMessageTempNode(m_poConfig->GetMyGroupIdx(), sBuffer, iSendType);
290290
}
291291

292292
///////////////////////////

src/algorithm/checkpoint_receiver.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,12 @@ int CheckpointReceiver :: InitFilePath(const std::string & sFilePath, std::strin
161161
}
162162
}
163163

164-
sFormatFilePath = "/";
164+
sFormatFilePath = "";
165+
if (vecDirList.size() > 0 && vecDirList[0].size() > 0 && vecDirList[0][0] != '.')
166+
{
167+
sFormatFilePath += "/";
168+
}
169+
165170
for (size_t i = 0; i < vecDirList.size(); i++)
166171
{
167172
if (i + 1 == vecDirList.size())

src/comm/msg_transport.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,16 @@ class MsgTransport
3737
public:
3838
virtual ~MsgTransport() {}
3939

40-
virtual int SendMessage(const nodeid_t iSendtoNodeID, const std::string & sBuffer,
41-
const int iSendType = Message_SendType_UDP) = 0;
40+
virtual int SendMessage(const int iGroupIdx, const nodeid_t iSendtoNodeID,
41+
const std::string & sBuffer, const int iSendType = Message_SendType_UDP) = 0;
4242

43-
virtual int BroadcastMessage(const std::string & sBuffer,
43+
virtual int BroadcastMessage(const int iGroupIdx, const std::string & sBuffer,
4444
const int iSendType = Message_SendType_UDP) = 0;
4545

46-
virtual int BroadcastMessageFollower(const std::string & sBuffer,
46+
virtual int BroadcastMessageFollower(const int iGroupIdx, const std::string & sBuffer,
4747
const int iSendType = Message_SendType_UDP) = 0;
4848

49-
virtual int BroadcastMessageTempNode(const std::string & sBuffer,
49+
virtual int BroadcastMessageTempNode(const int iGroupIdx, const std::string & sBuffer,
5050
const int iSendType = Message_SendType_UDP) = 0;
5151
};
5252

src/comm/options.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ Options :: Options()
114114
iSyncInterval = 0;
115115
poNetWork = nullptr;
116116
iUDPMaxSize = 4096;
117+
iIOThreadCount = 1;
117118
iGroupCount = 1;
118119
bUseMembership = false;
119120
pMembershipChangeCallback = nullptr;

src/communicate/communicate.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ Communicate :: ~Communicate()
3939
{
4040
}
4141

42-
int Communicate :: Send(const nodeid_t iNodeID, const NodeInfo & oNodeInfo,
43-
const std::string & sMessage, const int iSendType)
42+
int Communicate :: Send(const int iGroupIdx, const nodeid_t iNodeID,
43+
const NodeInfo & oNodeInfo, const std::string & sMessage, const int iSendType)
4444
{
4545
if ((int)sMessage.size() > MAX_VALUE_SIZE)
4646
{
@@ -55,44 +55,44 @@ int Communicate :: Send(const nodeid_t iNodeID, const NodeInfo & oNodeInfo,
5555
if (sMessage.size() > m_iUDPMaxSize || iSendType == Message_SendType_TCP)
5656
{
5757
BP->GetNetworkBP()->SendTcp(sMessage);
58-
return m_poNetwork->SendMessageTCP(oNodeInfo.GetIP(), oNodeInfo.GetPort(), sMessage);
58+
return m_poNetwork->SendMessageTCP(iGroupIdx, oNodeInfo.GetIP(), oNodeInfo.GetPort(), sMessage);
5959
}
6060
else
6161
{
6262
BP->GetNetworkBP()->SendUdp(sMessage);
63-
return m_poNetwork->SendMessageUDP(oNodeInfo.GetIP(), oNodeInfo.GetPort(), sMessage);
63+
return m_poNetwork->SendMessageUDP(iGroupIdx, oNodeInfo.GetIP(), oNodeInfo.GetPort(), sMessage);
6464
}
6565
}
6666

67-
int Communicate :: SendMessage(const nodeid_t iSendtoNodeID, const std::string & sMessage, const int iSendType)
67+
int Communicate :: SendMessage(const int iGroupIdx, const nodeid_t iSendtoNodeID, const std::string & sMessage, const int iSendType)
6868
{
69-
return Send(iSendtoNodeID, NodeInfo(iSendtoNodeID), sMessage, iSendType);
69+
return Send(iGroupIdx, iSendtoNodeID, NodeInfo(iSendtoNodeID), sMessage, iSendType);
7070
}
7171

72-
int Communicate :: BroadcastMessage(const std::string & sMessage, const int iSendType)
72+
int Communicate :: BroadcastMessage(const int iGroupIdx, const std::string & sMessage, const int iSendType)
7373
{
7474
const std::set<nodeid_t> & setNodeInfo = m_poConfig->GetSystemVSM()->GetMembershipMap();
7575

7676
for (auto & it : setNodeInfo)
7777
{
7878
if (it != m_iMyNodeID)
7979
{
80-
Send(it, NodeInfo(it), sMessage, iSendType);
80+
Send(iGroupIdx, it, NodeInfo(it), sMessage, iSendType);
8181
}
8282
}
8383

8484
return 0;
8585
}
8686

87-
int Communicate :: BroadcastMessageFollower(const std::string & sMessage, const int iSendType)
87+
int Communicate :: BroadcastMessageFollower(const int iGroupIdx, const std::string & sMessage, const int iSendType)
8888
{
8989
const std::map<nodeid_t, uint64_t> & mapFollowerNodeInfo = m_poConfig->GetMyFollowerMap();
9090

9191
for (auto & it : mapFollowerNodeInfo)
9292
{
9393
if (it.first != m_iMyNodeID)
9494
{
95-
Send(it.first, NodeInfo(it.first), sMessage, iSendType);
95+
Send(iGroupIdx, it.first, NodeInfo(it.first), sMessage, iSendType);
9696
}
9797
}
9898

@@ -101,15 +101,15 @@ int Communicate :: BroadcastMessageFollower(const std::string & sMessage, const
101101
return 0;
102102
}
103103

104-
int Communicate :: BroadcastMessageTempNode(const std::string & sMessage, const int iSendType)
104+
int Communicate :: BroadcastMessageTempNode(const int iGroupIdx, const std::string & sMessage, const int iSendType)
105105
{
106106
const std::map<nodeid_t, uint64_t> & mapTempNode = m_poConfig->GetTmpNodeMap();
107107

108108
for (auto & it : mapTempNode)
109109
{
110110
if (it.first != m_iMyNodeID)
111111
{
112-
Send(it.first, NodeInfo(it.first), sMessage, iSendType);
112+
Send(iGroupIdx, it.first, NodeInfo(it.first), sMessage, iSendType);
113113
}
114114
}
115115

src/communicate/communicate.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,24 @@ class Communicate : public MsgTransport
4040
NetWork * poNetwork);
4141
~Communicate();
4242

43-
int SendMessage(const nodeid_t iSendtoNodeID, const std::string & sMessage,
43+
int SendMessage(const int iGroupIdx, const nodeid_t iSendtoNodeID, const std::string & sMessage,
4444
const int iSendType = Message_SendType_UDP);
4545

46-
int BroadcastMessage(const std::string & sMessage,
46+
int BroadcastMessage(const int iGroupIdx, const std::string & sMessage,
4747
const int iSendType = Message_SendType_UDP);
4848

49-
int BroadcastMessageFollower(const std::string & sMessage,
49+
int BroadcastMessageFollower(const int iGroupIdx, const std::string & sMessage,
5050
const int iSendType = Message_SendType_UDP);
5151

52-
int BroadcastMessageTempNode(const std::string & sMessage,
52+
int BroadcastMessageTempNode(const int iGroupIdx, const std::string & sMessage,
5353
const int iSendType = Message_SendType_UDP);
5454

5555
public:
5656
void SetUDPMaxSize(const size_t iUDPMaxSize);
5757

5858
private:
59-
int Send(const nodeid_t iNodeID, const NodeInfo & tNodeInfo, const std::string & sMessage, const int iSendType);
59+
int Send(const int iGroupIdx, const nodeid_t iNodeID,
60+
const NodeInfo & tNodeInfo, const std::string & sMessage, const int iSendType);
6061

6162
private:
6263
Config * m_poConfig;

src/communicate/dfnetwork.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void DFNetWork :: StopNetWork()
4141
m_oTcpIOThread.Stop();
4242
}
4343

44-
int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort)
44+
int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount)
4545
{
4646
int ret = m_oUDPSend.Init();
4747
if (ret != 0)
@@ -55,7 +55,7 @@ int DFNetWork :: Init(const std::string & sListenIp, const int iListenPort)
5555
return ret;
5656
}
5757

58-
ret = m_oTcpIOThread.Init(sListenIp, iListenPort);
58+
ret = m_oTcpIOThread.Init(sListenIp, iListenPort, iIOThreadCount);
5959
if (ret != 0)
6060
{
6161
PLErr("m_oTcpIOThread Init fail, ret %d", ret);
@@ -72,12 +72,12 @@ void DFNetWork :: RunNetWork()
7272
m_oTcpIOThread.Start();
7373
}
7474

75-
int DFNetWork :: SendMessageTCP(const std::string & sIp, const int iPort, const std::string & sMessage)
75+
int DFNetWork :: SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage)
7676
{
77-
return m_oTcpIOThread.AddMessage(sIp, iPort, sMessage);
77+
return m_oTcpIOThread.AddMessage(iGroupIdx, sIp, iPort, sMessage);
7878
}
7979

80-
int DFNetWork :: SendMessageUDP(const std::string & sIp, const int iPort, const std::string & sMessage)
80+
int DFNetWork :: SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage)
8181
{
8282
return m_oUDPSend.AddMessage(sIp, iPort, sMessage);
8383
}

src/communicate/dfnetwork.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ class DFNetWork : public NetWork
3535
DFNetWork();
3636
virtual ~DFNetWork();
3737

38-
int Init(const std::string & sListenIp, const int iListenPort);
38+
int Init(const std::string & sListenIp, const int iListenPort, const int iIOThreadCount);
3939

4040
void RunNetWork();
4141

4242
void StopNetWork();
4343

44-
int SendMessageTCP(const std::string & sIp, const int iPort, const std::string & sMessage);
44+
int SendMessageTCP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
4545

46-
int SendMessageUDP(const std::string & sIp, const int iPort, const std::string & sMessage);
46+
int SendMessageUDP(const int iGroupIdx, const std::string & sIp, const int iPort, const std::string & sMessage);
4747

4848
private:
4949
UDPRecv m_oUDPRecv;

src/communicate/tcp/event_loop.cpp

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,34 @@ See the AUTHORS file for names of contributors.
2424
#include "tcp_acceptor.h"
2525
#include "tcp_client.h"
2626
#include "comm_include.h"
27+
#include "message_event.h"
28+
#include "phxpaxos/network.h"
2729

2830
using namespace std;
2931

3032
namespace phxpaxos
3133
{
3234

33-
EventLoop :: EventLoop()
35+
EventLoop :: EventLoop(NetWork * poNetWork)
3436
{
3537
m_iEpollFd = -1;
3638
m_bIsEnd = false;
37-
m_poTcpAcceptor = nullptr;
39+
m_poNetWork = poNetWork;
3840
m_poTcpClient = nullptr;
3941
m_poNotify = nullptr;
4042
memset(m_EpollEvents, 0, sizeof(m_EpollEvents));
4143
}
4244

4345
EventLoop :: ~EventLoop()
4446
{
47+
ClearEvent();
4548
}
4649

4750
void EventLoop :: JumpoutEpollWait()
4851
{
4952
m_poNotify->SendNotify();
5053
}
5154

52-
void EventLoop :: SetTcpAcceptor(TcpAcceptor * poTcpAcceptor)
53-
{
54-
m_poTcpAcceptor = poTcpAcceptor;
55-
}
56-
5755
void EventLoop :: SetTcpClient(TcpClient * poTcpClient)
5856
{
5957
m_poTcpClient = poTcpClient;
@@ -157,11 +155,7 @@ void EventLoop :: StartLoop()
157155

158156
OneLoop(iNextTimeout);
159157

160-
//deal with accept fds
161-
if (m_poTcpAcceptor != nullptr)
162-
{
163-
m_poTcpAcceptor->CreateEvent();
164-
}
158+
CreateEvent();
165159

166160
if (m_poTcpClient != nullptr)
167161
{
@@ -332,6 +326,61 @@ void EventLoop :: DealwithTimeout(int & iNextTimeout)
332326
}
333327
}
334328

329+
void EventLoop :: AddEvent(int iFD, SocketAddress oAddr)
330+
{
331+
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
332+
m_oFDQueue.push(make_pair(iFD, oAddr));
333+
}
334+
335+
void EventLoop :: CreateEvent()
336+
{
337+
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
338+
339+
if (m_oFDQueue.empty())
340+
{
341+
return;
342+
}
343+
344+
ClearEvent();
345+
346+
int iCreatePerTime = 200;
347+
while ((!m_oFDQueue.empty()) && iCreatePerTime--)
348+
{
349+
auto oData = m_oFDQueue.front();
350+
m_oFDQueue.pop();
351+
352+
//create event for this fd
353+
MessageEvent * poMessageEvent = new MessageEvent(MessageEventType_RECV, oData.first,
354+
oData.second, this, m_poNetWork);
355+
poMessageEvent->AddEvent(EPOLLIN);
356+
357+
m_vecCreatedEvent.push_back(poMessageEvent);
358+
}
359+
}
360+
361+
void EventLoop :: ClearEvent()
362+
{
363+
for (auto it = m_vecCreatedEvent.begin(); it != end(m_vecCreatedEvent);)
364+
{
365+
if ((*it)->IsDestroy())
366+
{
367+
delete (*it);
368+
it = m_vecCreatedEvent.erase(it);
369+
}
370+
else
371+
{
372+
it++;
373+
}
374+
}
375+
}
376+
377+
int EventLoop :: GetActiveEventCount()
378+
{
379+
std::lock_guard<std::mutex> oLockGuard(m_oMutex);
380+
ClearEvent();
381+
return (int)m_vecCreatedEvent.size();
382+
}
383+
335384
}
336385

337386

0 commit comments

Comments
 (0)