Skip to content

Commit 42dc179

Browse files
glevkovichtbierwiaczonhshomroni
authored
Memory management dev1 final (#2944)
* README: improve instruction on how to push images to Docker Hub * Change in requestsOfNonPrimary field from raw ptr ClientRequestMsg to unique * add incoming messages buffers allocations and releases monitor: - MessageBase: add structures and functionality to gather stats about incoming msgs - Every time an external incoming message arrives and a buffer is allocated, monitoring structures are updated - Every time an external incoming message buffer is released, monitoring structures are updated - A user can get the information in real time from diagnostics server: - Run "./concord-ctl status get messages_communicator" from inside concord container --------- Co-authored-by: tbierwiaczon <tbierwiaczon@vmware.com> Co-authored-by: hshomroni <hshomroni@vmware.com>
1 parent 385922f commit 42dc179

16 files changed

+219
-51
lines changed

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ The CI builds and runs tests in a docker container. To add a new dependency or t
141141
[Makefile](https://github.com/vmware/concord-bft/blob/master/Makefile#L5)
142142
and
143143
[Makefile](https://github.com/vmware/concord-bft/blob/master/Makefile#L10)
144-
* Tag the new images:
144+
* Tag the new images (at least one):
145145
* For release: `docker tag concord-bft:latest concordbft/concord-bft:<version>`
146146
* For debug: `docker tag concord-bft-debug:latest concordbft/concord-bft-debug:<version>`
147147
<br>where version is `current version + 1`.
@@ -150,7 +150,9 @@ The CI builds and runs tests in a docker container. To add a new dependency or t
150150
clean build test`
151151
* Ask one of the maintainers for a temporary write permission to Docker Hub
152152
repository(you need to have a [Docker ID](https://docs.docker.com/docker-id/))
153-
* Push the image: `docker push concordbft/concord-bft:<version>`
153+
* Push the images (at least one):
154+
* For release: `docker push concordbft/concord-bft:<version>`
155+
* For debug: `docker push concordbft/concord-bft-debug:<version>`
154156
* Create a PR for the update:
155157
* The PR must contain only changes related to the updates in the image
156158
* PR's summary has to be similar to `Docker update to version release=<new version> debug=<new version>`

bftengine/src/bcstatetransfer/BCStateTran.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,9 @@ void BCStateTran::setReconfigurationEngineImpl(std::shared_ptr<ClientReconfigura
11381138
template <typename MSG>
11391139
void BCStateTran::freeStateTransferMsg(const MSG *m) {
11401140
const char *p_to_delete = (reinterpret_cast<const char *>(m) - sizeof(MessageBase::Header));
1141+
if (m->isIncomingMsg_) {
1142+
MessageBase::Statistics::updateDiagnosticsCountersOnBufRelease(MsgCode::StateTransfer);
1143+
}
11411144
std::free(const_cast<char *>(p_to_delete));
11421145
}
11431146

@@ -1148,6 +1151,7 @@ void BCStateTran::handleStateTransferMessageImpl(char *msg,
11481151
LocalTimePoint incomingEventsQPushTime) {
11491152
// msgHeader is now the owner of msg. after getting true type of msg, the ownership will be of onMessage functions.
11501153
auto msgHeader = STMessageUptr<BCStateTranBaseMsg>(reinterpret_cast<BCStateTranBaseMsg *>(msg));
1154+
msgHeader.get()->isIncomingMsg_ = true;
11511155
if (!running_) {
11521156
return;
11531157
}

bftengine/src/bcstatetransfer/Messages.hpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ class MsgType {
4040
};
4141

4242
struct BCStateTranBaseMsg {
43-
BCStateTranBaseMsg(uint16_t type) : type(type) {}
43+
BCStateTranBaseMsg(uint16_t type, bool isIncomingMsg = false) : type(type), isIncomingMsg_(isIncomingMsg) {}
44+
4445
uint16_t type;
46+
// this flag is for monitoring messages buffer allocs and releases so it's mutable even though incoming ST msgs objs
47+
// are const during handling
48+
mutable uint8_t isIncomingMsg_;
4549
// This struct and its derived structs are used to de/serialize message buffers sent over communication channels.
4650
// Since virtual methods modify the memory layout of a struct, there cannot be any for both this base struct and its
4751
// inheritors.
@@ -134,7 +138,7 @@ struct CheckpointSummaryMsg : public BCStateTranBaseMsg {
134138
}
135139

136140
static bool equivalent(const CheckpointSummaryMsg* a, const CheckpointSummaryMsg* b) {
137-
static_assert((sizeof(CheckpointSummaryMsg) - sizeof(requestMsgSeqNum) == 87),
141+
static_assert((sizeof(CheckpointSummaryMsg) - sizeof(requestMsgSeqNum) == 88),
138142
"Should newly added field be compared below?");
139143
bool cmp1 =
140144
((a->maxBlockId == b->maxBlockId) && (a->checkpointNum == b->checkpointNum) &&
@@ -152,7 +156,7 @@ struct CheckpointSummaryMsg : public BCStateTranBaseMsg {
152156
}
153157

154158
static bool equivalent(const CheckpointSummaryMsg* a, uint16_t a_id, const CheckpointSummaryMsg* b, uint16_t b_id) {
155-
static_assert((sizeof(CheckpointSummaryMsg) - sizeof(requestMsgSeqNum) == 87),
159+
static_assert((sizeof(CheckpointSummaryMsg) - sizeof(requestMsgSeqNum) == 88),
156160
"Should newly added field be compared below?");
157161
if ((a->maxBlockId != b->maxBlockId) || (a->checkpointNum != b->checkpointNum) ||
158162
(a->digestOfMaxBlockId != b->digestOfMaxBlockId) ||

bftengine/src/bftengine/IncomingMsgsStorage.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <functional>
2222
#include <memory>
23+
#include <string>
2324

2425
namespace bftEngine::impl {
2526

@@ -30,6 +31,8 @@ class IncomingMsgsStorage {
3031

3132
virtual void start() = 0;
3233
virtual void stop() = 0;
34+
// returns a string with status. To be used by diagnostics server
35+
virtual std::string status() const = 0;
3336

3437
virtual bool isRunning() const = 0;
3538

bftengine/src/bftengine/IncomingMsgsStorageImp.cpp

100755100644
+16-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
#include "IncomingMsgsStorageImp.hpp"
1515
#include "messages/InternalMessage.hpp"
16-
#include <future>
1716
#include "log/logger.hpp"
1817

18+
#include <future>
19+
#include <sstream>
20+
1921
using std::queue;
2022
using namespace std::chrono;
2123
using namespace concord::diagnostics;
@@ -199,4 +201,17 @@ void IncomingMsgsStorageImp::dispatchMessages(std::promise<void>& signalStarted)
199201
}
200202
}
201203

204+
std::string IncomingMsgsStorageImp::status() const {
205+
std::ostringstream oss;
206+
207+
bool is_running = isRunning();
208+
209+
oss << KVLOG(is_running) << std::endl;
210+
oss << MessageBase::Statistics::getNumBuffsAllocatedForExtrnIncomingMsgs() << std::endl;
211+
oss << MessageBase::Statistics::getNumBuffsFreedForExtrnIncomingMsgs() << std::endl;
212+
oss << MessageBase::Statistics::getNumAliveExtrnIncomingMsgsObjsPerType() << std::endl;
213+
214+
return oss.str();
215+
}
216+
202217
} // namespace bftEngine::impl

bftengine/src/bftengine/IncomingMsgsStorageImp.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <condition_variable>
2727
#include <future>
2828
#include <utility>
29+
#include <string>
2930

3031
namespace bftEngine::impl {
3132

@@ -38,6 +39,8 @@ class IncomingMsgsStorageImp : public IncomingMsgsStorage {
3839

3940
void start() override;
4041
void stop() override;
42+
// returns a string with status. To be used by diagnostics server
43+
std::string status() const override;
4144

4245
// Can be called by any thread
4346
bool pushExternalMsg(std::unique_ptr<MessageBase> msg) override;

bftengine/src/bftengine/MsgReceiver.cpp

100755100644
+2-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ void MsgReceiver::onNewMessage(NodeNum sourceNode,
3636

3737
auto *msgBody = (MessageBase::Header *)std::malloc(messageLength);
3838
memcpy(msgBody, message, messageLength);
39-
auto pMsg = std::make_unique<MessageBase>(sourceNode, msgBody, messageLength, true);
40-
39+
auto pMsg = std::make_unique<MessageBase>(sourceNode, msgBody, messageLength, true, true);
40+
MessageBase::Statistics::updateDiagnosticsCountersOnBufAlloc(static_cast<MsgCode::Type>(pMsg->type()));
4141
incomingMsgsStorage_->pushExternalMsg(std::move(pMsg));
4242
}
4343

bftengine/src/bftengine/MsgsCommunicator.cpp

+9-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "MsgsCommunicator.hpp"
1313
#include "util/assertUtils.hpp"
1414
#include "communication/CommDefs.hpp"
15+
#include "diagnostics.h"
1516

1617
namespace bftEngine::impl {
1718

@@ -21,7 +22,14 @@ using namespace bft::communication;
2122
MsgsCommunicator::MsgsCommunicator(ICommunication* comm,
2223
shared_ptr<IncomingMsgsStorage> incomingMsgsStorage,
2324
shared_ptr<IReceiver> msgReceiver)
24-
: incomingMsgsStorage_(incomingMsgsStorage), msgReceiver_(msgReceiver), communication_(comm) {}
25+
: incomingMsgsStorage_(incomingMsgsStorage), msgReceiver_(msgReceiver), communication_(comm) {
26+
auto& registrar = concord::diagnostics::RegistrarSingleton::getInstance();
27+
28+
concord::diagnostics::StatusHandler handler(
29+
"messages_communicator", "Msg Comm", [this]() { return incomingMsgsStorage_->status(); });
30+
31+
registrar.status.registerHandler(handler);
32+
}
2533

2634
int MsgsCommunicator::startCommunication(uint16_t replicaId) {
2735
replicaId_ = replicaId;

bftengine/src/bftengine/ReplicaImp.cpp

+2-19
Original file line numberDiff line numberDiff line change
@@ -500,10 +500,7 @@ void ReplicaImp::onMessage<ClientRequestMsg>(std::unique_ptr<ClientRequestMsg> m
500500
auto m = msg.get();
501501
// Adding the message to a queue for future retransmission.
502502
if (requestsOfNonPrimary.size() < NonPrimaryCombinedReqSize) {
503-
if (requestsOfNonPrimary.count(msg->requestSeqNum())) {
504-
delete std::get<1>(requestsOfNonPrimary.at(msg->requestSeqNum()));
505-
}
506-
requestsOfNonPrimary[m->requestSeqNum()] = std::make_pair(getMonotonicTime(), msg.release());
503+
requestsOfNonPrimary[m->requestSeqNum()] = std::make_pair(getMonotonicTime(), std::move(msg));
507504
}
508505

509506
send(m, currentPrimary());
@@ -1121,7 +1118,6 @@ void ReplicaImp::onMessage<PrePrepareMsg>(PrePrepareMsgUPtr message) {
11211118
if (clientsManager->canBecomePending(req.clientProxyId(), req.requestSeqNum()))
11221119
clientsManager->addPendingRequest(req.clientProxyId(), req.requestSeqNum(), req.getCid());
11231120
if (requestsOfNonPrimary.count(req.requestSeqNum())) {
1124-
delete std::get<1>(requestsOfNonPrimary.at(req.requestSeqNum()));
11251121
requestsOfNonPrimary.erase(req.requestSeqNum());
11261122
}
11271123
}
@@ -2984,11 +2980,6 @@ void ReplicaImp::MoveToHigherView(ViewNum nextView) {
29842980
ConcordAssert(viewChangeProtocolEnabled);
29852981
ConcordAssertLT(getCurrentView(), nextView);
29862982

2987-
// Once we move to higher view we would prefer tp avoid retransmitting clients request from previous view
2988-
for (auto &[_, msg] : requestsOfNonPrimary) {
2989-
(void)_;
2990-
delete std::get<1>(msg);
2991-
}
29922983
requestsOfNonPrimary.clear();
29932984

29942985
const bool wasInPrevViewNumber = viewsManager->viewIsActive(getCurrentView());
@@ -3210,11 +3201,6 @@ void ReplicaImp::onNewView(const std::vector<PrePrepareMsg *> &prePreparesForNew
32103201

32113202
clientsManager->clearAllPendingRequests();
32123203

3213-
// Once we move to higher view we would prefer tp avoid retransmitting clients request from previous view
3214-
for (auto &[_, msg] : requestsOfNonPrimary) {
3215-
(void)_;
3216-
delete std::get<1>(msg);
3217-
}
32183204
requestsOfNonPrimary.clear();
32193205

32203206
// clear requestsQueueOfPrimary
@@ -3313,9 +3299,6 @@ void ReplicaImp::onTransferringCompleteImp(uint64_t newStateCheckpoint) {
33133299
TimeRecorder scoped_timer(*histograms_.onTransferringCompleteImp);
33143300
time_in_state_transfer_.end();
33153301
LOG_INFO(GL, KVLOG(newStateCheckpoint));
3316-
for (auto &req : requestsOfNonPrimary) {
3317-
delete std::get<1>(req.second);
3318-
}
33193302
requestsOfNonPrimary.clear();
33203303
if (ps_) {
33213304
ps_->beginWriteTran();
@@ -4612,7 +4595,7 @@ void ReplicaImp::addTimers() {
46124595
if (timeout > (3 * config_.clientRequestRetransmissionTimerMilli)) {
46134596
LOG_INFO(GL, "retransmitting client request in non primary due to timeout" << KVLOG(sn, timeout));
46144597
requestsOfNonPrimary[sn].first = getMonotonicTime();
4615-
send(std::get<1>(msg), currentPrimary());
4598+
send(std::get<1>(msg).get(), currentPrimary());
46164599
}
46174600
}
46184601
});

bftengine/src/bftengine/ReplicaImp.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {
117117
std::queue<std::unique_ptr<ClientRequestMsg>> requestsQueueOfPrimary; // only used by the primary
118118
size_t primaryCombinedReqSize = 0; // only used by the primary
119119

120-
std::map<uint64_t, std::pair<Time, ClientRequestMsg*>>
120+
std::map<uint64_t, std::pair<Time, std::unique_ptr<ClientRequestMsg>>>
121121
requestsOfNonPrimary; // used to retransmit client requests by a non primary replica
122122
size_t NonPrimaryCombinedReqSize = 1000;
123123
//

bftengine/src/bftengine/messages/MessageBase.cpp

+103-11
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
// these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE
1010
// file.
1111

12-
#include <cstring>
13-
1412
#include "MessageBase.hpp"
15-
1613
#include "log/logger.hpp"
1714
#include "util/assertUtils.hpp"
1815
#include "ReplicaConfig.hpp"
1916

17+
#include <cstring>
18+
#include <sstream>
19+
2020
#ifdef DEBUG_MEMORY_MSG
2121
#include <set>
2222

@@ -46,11 +46,66 @@ void MessageBase::printLiveMessages() {
4646
namespace bftEngine {
4747
namespace impl {
4848

49+
// static class members for diagnostics server
50+
std::array<std::atomic<size_t>, MsgCode::LastMsgCodeVal> MessageBase::Statistics::AliveIncomingExtrnMsgsBufs{};
51+
static_assert(MsgCode::LastMsgCodeVal < 2000,
52+
"MessageBase AliveIncomingExtrnMsgsBufs array too big (above 2000), check MsgCode enum definition");
53+
Bitmap MessageBase::Statistics::IncomingExtrnMsgReceivedAtLeastOnceFlags{MsgCode::LastMsgCodeVal};
54+
std::mutex MessageBase::Statistics::messagesStatsMonitoringMutex_{};
55+
std::atomic<size_t> MessageBase::Statistics::numIncomingExtrnMsgsBufAllocs{0};
56+
std::atomic<size_t> MessageBase::Statistics::numIncomingExtrnMsgsBufFrees{0};
57+
// End of static class members for diagnostics server
58+
4959
MessageBase::~MessageBase() {
5060
#ifdef DEBUG_MEMORY_MSG
5161
liveMessagesDebug.erase(this);
5262
#endif
53-
if (owner_) std::free((char *)msgBody_);
63+
64+
// if this obj is not the owner of the buf (owner_ is false), the buf may have already be freed - hence access to it
65+
// is prohibited
66+
if (owner_) {
67+
if (isIncomingMsg_) {
68+
MsgCode::Type msg_code = static_cast<MsgCode::Type>(msgBody_->msgType);
69+
MessageBase::Statistics::updateDiagnosticsCountersOnBufRelease(msg_code);
70+
}
71+
std::free((char *)msgBody_);
72+
}
73+
}
74+
75+
void MessageBase::Statistics::updateDiagnosticsCountersOnBufAlloc(MsgCode::Type msg_code) {
76+
if (!MessageBase::Statistics::IncomingExtrnMsgReceivedAtLeastOnceFlags.get(msg_code)) {
77+
// here is a very rare event - first time ever receiving msg of type msg_code, hence
78+
// using a lock here shouldn't affect performnce. in other places we avoid locks where possible.
79+
// It is possible that two threads will call the "set" function but it's ok since it's
80+
// just a flag telling us if the message was received at least once. setting the bit twice causes
81+
// no damage.
82+
std::lock_guard<std::mutex> lock(MessageBase::Statistics::messagesStatsMonitoringMutex_);
83+
MessageBase::Statistics::IncomingExtrnMsgReceivedAtLeastOnceFlags.set(msg_code);
84+
}
85+
86+
// use "++" operator to ensure atomicity
87+
MessageBase::Statistics::numIncomingExtrnMsgsBufAllocs++;
88+
MessageBase::Statistics::AliveIncomingExtrnMsgsBufs[msg_code]++;
89+
}
90+
91+
void MessageBase::Statistics::updateDiagnosticsCountersOnBufRelease(MsgCode::Type msg_code) {
92+
if (MessageBase::Statistics::AliveIncomingExtrnMsgsBufs[msg_code] == 0) {
93+
LOG_ERROR(GL, "Trying to dec a counter of a msg that hasn't been inserted yet, msg code: " << (msg_code));
94+
return;
95+
} else {
96+
// use "--" operator to ensure atomicity
97+
MessageBase::Statistics::AliveIncomingExtrnMsgsBufs[msg_code]--;
98+
}
99+
// use "++" operator to ensure atomicity
100+
MessageBase::Statistics::numIncomingExtrnMsgsBufFrees++;
101+
}
102+
103+
void MessageBase::releaseOwnership() {
104+
if (!owner_) {
105+
LOG_ERROR(GL, "Trying to release ownership of a MessageBase obj that is already not the buffer owner");
106+
} else {
107+
owner_ = false;
108+
}
54109
}
55110

56111
void MessageBase::shrinkToFit() {
@@ -102,13 +157,17 @@ MessageBase::MessageBase(NodeIdType sender, MsgType type, SpanContextSize spanCo
102157
#endif
103158
}
104159

105-
MessageBase::MessageBase(NodeIdType sender, MessageBase::Header *body, MsgSize size, bool ownerOfStorage) {
106-
msgBody_ = body;
107-
msgSize_ = size;
108-
storageSize_ = size;
109-
sender_ = sender;
110-
owner_ = ownerOfStorage;
111-
160+
MessageBase::MessageBase(NodeIdType sender, MessageBase::Header *body, MsgSize size, bool ownerOfStorage)
161+
: MessageBase(sender, body, size, ownerOfStorage, false) {}
162+
163+
MessageBase::MessageBase(
164+
NodeIdType sender, MessageBase::Header *body, MsgSize size, bool ownerOfStorage, bool isIncoming)
165+
: msgBody_(body),
166+
msgSize_(size),
167+
storageSize_(size),
168+
sender_(sender),
169+
owner_(ownerOfStorage),
170+
isIncomingMsg_(isIncoming) {
112171
#ifdef DEBUG_MEMORY_MSG
113172
liveMessagesDebug.insert(this);
114173
#endif
@@ -235,5 +294,38 @@ MessageBase *MessageBase::deserializeMsg(char *&buf, size_t bufLen, size_t &actu
235294
return msg;
236295
}
237296

297+
// Start methods for diagnostics server:
298+
std::string MessageBase::Statistics::getNumBuffsAllocatedForExtrnIncomingMsgs() {
299+
return (std::string(" Num buffer allocations for external incoming messages: " +
300+
std::to_string(MessageBase::Statistics::numIncomingExtrnMsgsBufAllocs)));
301+
}
302+
std::string MessageBase::Statistics::getNumBuffsFreedForExtrnIncomingMsgs() {
303+
return (std::string(" Num buffer frees for external incoming messages: " +
304+
std::to_string(MessageBase::Statistics::numIncomingExtrnMsgsBufFrees)));
305+
}
306+
std::string MessageBase::Statistics::getNumAliveExtrnIncomingMsgsObjsPerType() {
307+
std::ostringstream oss;
308+
oss << " Alive extrnal incoming message objects per type: " << std::endl;
309+
oss << " --------------------------------------- " << std::endl;
310+
311+
for (int i = 0; i < MsgCode::LastMsgCodeVal; ++i) {
312+
bool wasMsgEverRecieved;
313+
314+
{
315+
std::lock_guard<std::mutex> lock(messagesStatsMonitoringMutex_);
316+
wasMsgEverRecieved = MessageBase::Statistics::IncomingExtrnMsgReceivedAtLeastOnceFlags.get(i);
317+
}
318+
319+
if (wasMsgEverRecieved) {
320+
oss << " " << static_cast<MsgCode::Type>(i) << ": " << MessageBase::Statistics::AliveIncomingExtrnMsgsBufs[i]
321+
<< std::endl;
322+
}
323+
}
324+
325+
oss << " ** All messages not on the list were never received by replica ** ";
326+
return oss.str();
327+
}
328+
// End methods for diagnostics server ^
329+
238330
} // namespace impl
239331
} // namespace bftEngine

0 commit comments

Comments
 (0)