Skip to content

Commit 14b0586

Browse files
authored
fix(transfer_engine): replace deprecated Json::Reader (#938)
* fix(transfer_engine): replace deprecated Json::Reader with thread-safe CharReaderBuilder The deprecated Json::Reader class uses static global variables internally, causing race conditions and crashes in multi-threaded scenarios. This fix replaces all Json::Reader usage with the thread-safe Json::CharReaderBuilder API across the transfer engine and common modules. Changes: - mooncake-transfer-engine: Updated JSON parsing in transfer_metadata_plugin, transfer_metadata, and topology modules - mooncake-common: Fixed JSON parsing in default_config - All JSON parsing now uses CharReaderBuilder for thread safety This resolves intermittent CI crashes in concurrent tests like ConcurrentPutGetWithLeaseTimeOut. * refactor(topology): simplify toJson() to avoid unnecessary serialization Instead of serializing to string and parsing back, directly construct Json::Value from matrix_. This is more efficient and eliminates the need for JSON parsing entirely in this method.
1 parent 6027426 commit 14b0586

File tree

5 files changed

+84
-37
lines changed

5 files changed

+84
-37
lines changed

mooncake-common/src/default_config.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,31 @@ void DefaultConfig::loadFromYAML() {
4545

4646
void DefaultConfig::loadFromJSON() {
4747
Json::Value root;
48-
Json::Reader reader;
4948
std::ifstream file;
5049
file.open(path_);
5150

52-
if (!reader.parse(file, root, false)) {
53-
file.close();
54-
throw std::runtime_error("Failed to parse JSON file: " +
55-
reader.getFormattedErrorMessages());
51+
// Read entire file into string
52+
std::string json_content((std::istreambuf_iterator<char>(file)),
53+
std::istreambuf_iterator<char>());
54+
file.close();
55+
56+
// Use thread-safe CharReaderBuilder instead of deprecated Reader
57+
Json::CharReaderBuilder builder;
58+
builder["collectComments"] = false;
59+
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
60+
std::string errs;
61+
62+
if (!reader->parse(json_content.data(),
63+
json_content.data() + json_content.size(), &root,
64+
&errs)) {
65+
throw std::runtime_error("Failed to parse JSON file: " + errs);
5666
}
67+
5768
try {
5869
processNode(root, "");
5970
} catch (const std::exception& e) {
60-
file.close();
6171
throw e;
6272
}
63-
file.close();
6473
}
6574

6675
void DefaultConfig::processNode(const YAML::Node& node, std::string key) {

mooncake-store/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ add_store_test(pybind_client_test pybind_client_test.cpp)
3030
add_store_test(client_metrics_test client_metrics_test.cpp)
3131
add_store_test(serializer_test serializer_test.cpp)
3232
add_store_test(non_ha_reconnect_test non_ha_reconnect_test.cpp)
33+
3334
add_subdirectory(e2e)
3435

3536
add_executable(high_availability_test high_availability_test.cpp)

mooncake-transfer-engine/src/topology.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,20 @@ int Topology::discover(const std::vector<std::string> &filter) {
264264
int Topology::parse(const std::string &topology_json) {
265265
std::set<std::string> rnic_set;
266266
Json::Value root;
267-
Json::Reader reader;
268267

269-
if (topology_json.empty() || !reader.parse(topology_json, root)) {
268+
if (topology_json.empty()) {
269+
return ERR_MALFORMED_JSON;
270+
}
271+
272+
// Use thread-safe CharReaderBuilder instead of deprecated Reader
273+
Json::CharReaderBuilder builder;
274+
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
275+
std::string errs;
276+
277+
if (!reader->parse(topology_json.data(),
278+
topology_json.data() + topology_json.size(), &root,
279+
&errs)) {
280+
LOG(ERROR) << "Topology::parse: JSON parse error: " << errs;
270281
return ERR_MALFORMED_JSON;
271282
}
272283

@@ -303,8 +314,9 @@ std::string Topology::toString() const {
303314

304315
Json::Value Topology::toJson() const {
305316
Json::Value root;
306-
Json::Reader reader;
307-
reader.parse(toString(), root);
317+
for (const auto &pair : matrix_) {
318+
root[pair.first] = pair.second.toJson();
319+
}
308320
return root;
309321
}
310322

mooncake-transfer-engine/src/transfer_metadata.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ struct TransferNotifyUtil {
4747
}
4848

4949
static int decode(Json::Value root, TransferMetadata::NotifyDesc &desc) {
50-
Json::Reader reader;
5150
desc.name = root["name"].asString();
5251
desc.notify_msg = root["notify_msg"].asString();
5352
return 0;
@@ -67,7 +66,6 @@ struct TransferHandshakeUtil {
6766
}
6867

6968
static int decode(Json::Value root, TransferMetadata::HandShakeDesc &desc) {
70-
Json::Reader reader;
7169
desc.local_nic_path = root["local_nic_path"].asString();
7270
desc.peer_nic_path = root["peer_nic_path"].asString();
7371
for (const auto &qp : root["qp_num"])

mooncake-transfer-engine/src/transfer_metadata_plugin.cpp

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,21 @@
4949
#include "config.h"
5050
#include "error.h"
5151

52+
// Helper function to parse JSON string using thread-safe CharReaderBuilder
53+
static bool parseJsonString(const std::string &json_str, Json::Value &value,
54+
std::string *error_msg = nullptr) {
55+
Json::CharReaderBuilder builder;
56+
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
57+
std::string errs;
58+
59+
bool success = reader->parse(
60+
json_str.data(), json_str.data() + json_str.size(), &value, &errs);
61+
if (!success && error_msg) {
62+
*error_msg = errs;
63+
}
64+
return success;
65+
}
66+
5267
namespace mooncake {
5368
#ifdef USE_REDIS
5469
struct RedisStoragePlugin : public MetadataStoragePlugin {
@@ -118,7 +133,6 @@ struct RedisStoragePlugin : public MetadataStoragePlugin {
118133
std::lock_guard<std::mutex> lock(access_client_mutex_);
119134
if (!client_) return false;
120135

121-
Json::Reader reader;
122136
redisReply *resp =
123137
(redisReply *)redisCommand(client_, "GET %s", key.c_str());
124138
if (!resp) {
@@ -135,7 +149,12 @@ struct RedisStoragePlugin : public MetadataStoragePlugin {
135149

136150
auto json_file = std::string(resp->str);
137151
freeReplyObject(resp);
138-
if (!reader.parse(json_file, value)) return false;
152+
153+
std::string errs;
154+
if (!parseJsonString(json_file, value, &errs)) {
155+
LOG(ERROR) << "RedisStoragePlugin: JSON parse error: " << errs;
156+
return false;
157+
}
139158
return true;
140159
}
141160

@@ -374,15 +393,19 @@ struct EtcdStoragePlugin : public MetadataStoragePlugin {
374393
virtual ~EtcdStoragePlugin() {}
375394

376395
virtual bool get(const std::string &key, Json::Value &value) {
377-
Json::Reader reader;
378396
auto resp = client_.get(key);
379397
if (!resp.is_ok()) {
380398
LOG(ERROR) << "EtcdStoragePlugin: unable to get " << key << " from "
381399
<< metadata_uri_ << ": " << resp.error_message();
382400
return false;
383401
}
384402
auto json_file = resp.value().as_string();
385-
if (!reader.parse(json_file, value)) return false;
403+
404+
std::string errs;
405+
if (!parseJsonString(json_file, value, &errs)) {
406+
LOG(ERROR) << "EtcdStoragePlugin: JSON parse error: " << errs;
407+
return false;
408+
}
386409
return true;
387410
}
388411

@@ -429,7 +452,6 @@ struct EtcdStoragePlugin : public MetadataStoragePlugin {
429452
virtual ~EtcdStoragePlugin() { EtcdCloseWrapper(); }
430453

431454
virtual bool get(const std::string &key, Json::Value &value) {
432-
Json::Reader reader;
433455
char *json_data = nullptr;
434456
auto ret = EtcdGetWrapper((char *)key.c_str(), &json_data, &err_msg_);
435457
if (ret) {
@@ -446,7 +468,12 @@ struct EtcdStoragePlugin : public MetadataStoragePlugin {
446468
auto json_file = std::string(json_data);
447469
// free the memory allocated by EtcdGetWrapper
448470
free(json_data);
449-
if (!reader.parse(json_file, value)) return false;
471+
472+
std::string errs;
473+
if (!parseJsonString(json_file, value, &errs)) {
474+
LOG(ERROR) << "EtcdStoragePlugin: JSON parse error: " << errs;
475+
return false;
476+
}
450477
return true;
451478
}
452479

@@ -716,16 +743,16 @@ struct SocketHandShakePlugin : public HandShakePlugin {
716743
getNetworkAddress((struct sockaddr *)&addr);
717744

718745
Json::Value local, peer;
719-
Json::Reader reader;
720746

721747
auto [type, json_str] = readString(conn_fd);
722-
if (!reader.parse(json_str, peer)) {
723-
LOG(ERROR) << "SocketHandShakePlugin: failed to receive "
724-
"handshake message, "
725-
"malformed json format:"
726-
<< reader.getFormattedErrorMessages()
727-
<< ", json string length: " << json_str.size()
728-
<< ", json string content: " << json_str;
748+
std::string errs;
749+
if (!parseJsonString(json_str, peer, &errs)) {
750+
LOG(ERROR)
751+
<< "SocketHandShakePlugin: failed to receive "
752+
"handshake message, "
753+
"malformed json format: "
754+
<< errs << ", json string length: " << json_str.size()
755+
<< ", json string content: " << json_str;
729756
close(conn_fd);
730757
continue;
731758
}
@@ -906,7 +933,6 @@ struct SocketHandShakePlugin : public HandShakePlugin {
906933
return ret;
907934
}
908935

909-
Json::Reader reader;
910936
auto [type, json_str] = readString(conn_fd);
911937
if (type != HandShakeRequestType::Connection) {
912938
LOG(ERROR)
@@ -915,10 +941,11 @@ struct SocketHandShakePlugin : public HandShakePlugin {
915941
return ERR_SOCKET;
916942
}
917943

918-
if (!reader.parse(json_str, peer)) {
944+
std::string errs;
945+
if (!parseJsonString(json_str, peer, &errs)) {
919946
LOG(ERROR) << "SocketHandShakePlugin: failed to receive handshake "
920-
"message: "
921-
"malformed json format, check tcp connection";
947+
"message: malformed json format: "
948+
<< errs;
922949
close(conn_fd);
923950
return ERR_MALFORMED_JSON;
924951
}
@@ -981,7 +1008,6 @@ struct SocketHandShakePlugin : public HandShakePlugin {
9811008
return ret;
9821009
}
9831010

984-
Json::Reader reader;
9851011
auto [type, json_str] = readString(conn_fd);
9861012
if (type != HandShakeRequestType::Notify) {
9871013
LOG(ERROR)
@@ -993,10 +1019,11 @@ struct SocketHandShakePlugin : public HandShakePlugin {
9931019
// LOG(INFO) << "SocketHandShakePlugin: received metadata message: "
9941020
// << json_str;
9951021

996-
if (!reader.parse(json_str, peer_notify)) {
1022+
std::string errs;
1023+
if (!parseJsonString(json_str, peer_notify, &errs)) {
9971024
LOG(ERROR) << "SocketHandShakePlugin: failed to receive metadata "
9981025
"message, malformed json format: "
999-
<< reader.getFormattedErrorMessages();
1026+
<< errs;
10001027
close(conn_fd);
10011028
return ERR_MALFORMED_JSON;
10021029
}
@@ -1023,7 +1050,6 @@ struct SocketHandShakePlugin : public HandShakePlugin {
10231050
return ret;
10241051
}
10251052

1026-
Json::Reader reader;
10271053
auto [type, json_str] = readString(conn_fd);
10281054
if (type != HandShakeRequestType::Metadata) {
10291055
LOG(ERROR)
@@ -1035,10 +1061,11 @@ struct SocketHandShakePlugin : public HandShakePlugin {
10351061
// LOG(INFO) << "SocketHandShakePlugin: received metadata message: "
10361062
// << json_str;
10371063

1038-
if (!reader.parse(json_str, peer_metadata)) {
1064+
std::string errs;
1065+
if (!parseJsonString(json_str, peer_metadata, &errs)) {
10391066
LOG(ERROR) << "SocketHandShakePlugin: failed to receive metadata "
10401067
"message, malformed json format: "
1041-
<< reader.getFormattedErrorMessages();
1068+
<< errs;
10421069
close(conn_fd);
10431070
return ERR_MALFORMED_JSON;
10441071
}

0 commit comments

Comments
 (0)