From 503afb97fa55a686e0fa2e16d97623695c1b4bf7 Mon Sep 17 00:00:00 2001 From: doujiang24 Date: Sun, 5 Jan 2025 23:37:46 +0800 Subject: [PATCH 1/2] [TransferEngine] test: cmake enable testing. --- .github/workflows/build-linux.yml | 15 ++++- CMakeLists.txt | 2 + dependencies.sh | 4 ++ .../include/multi_transport.h | 2 +- .../src/multi_transport.cpp | 11 ++-- .../src/transfer_metadata_plugin.cpp | 38 +++++++---- mooncake-transfer-engine/tests/CMakeLists.txt | 12 +++- .../tests/tcp_transport_test.cpp | 49 +++++++++----- .../tests/transfer_metadata_test.cpp | 66 ++++++++++++------- 9 files changed, 133 insertions(+), 66 deletions(-) diff --git a/.github/workflows/build-linux.yml b/.github/workflows/build-linux.yml index dd009ff..82487cd 100644 --- a/.github/workflows/build-linux.yml +++ b/.github/workflows/build-linux.yml @@ -15,7 +15,7 @@ jobs: run: | sudo apt update -y sudo bash -x dependencies.sh - mkdir build + mkdir build cd build cmake .. shell: bash @@ -23,4 +23,15 @@ jobs: run: | cd build make -j - shell: bash \ No newline at end of file + shell: bash + - name: start-metadata-server + run: | + cd mooncake-transfer-engine/example/http-metadata-server + export PATH=$PATH:/usr/local/go/bin + go run . --addr=:8090 & + shell: bash + - name: test + run: | + cd build + MC_METADATA_SERVER=http://127.0.0.1:8090/metadata make test -j + shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index df9ba55..53cd267 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,8 @@ cmake_minimum_required(VERSION 3.16) project(mooncake CXX C) +enable_testing() + set(CMAKE_C_STANDARD 99) set(CMAKE_CXX_STANDARD 17) diff --git a/dependencies.sh b/dependencies.sh index cb20474..46b9048 100755 --- a/dependencies.sh +++ b/dependencies.sh @@ -54,4 +54,8 @@ cd build cmake .. make -j$(nproc) && sudo make install +echo "*** Download and installing [golang-1.22] ***" +wget https://go.dev/dl/go1.22.linux-amd64.tar.gz +sudo tar -C /usr/local -xzf go1.22.linux-amd64.tar.gz + echo "*** Dependencies Installed! ***" diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index 42266da..aef3b69 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -56,7 +56,7 @@ class MultiTransport { private: std::shared_ptr metadata_; std::string local_server_name_; - std::map transport_map_; + std::map> transport_map_; RWSpinlock batch_desc_lock_; std::unordered_map> batch_desc_set_; }; diff --git a/mooncake-transfer-engine/src/multi_transport.cpp b/mooncake-transfer-engine/src/multi_transport.cpp index 524db1a..f2c1cc3 100644 --- a/mooncake-transfer-engine/src/multi_transport.cpp +++ b/mooncake-transfer-engine/src/multi_transport.cpp @@ -147,13 +147,13 @@ Transport *MultiTransport::installTransport(const std::string &proto, return nullptr; } - transport_map_[proto] = transport; + transport_map_[proto] = std::shared_ptr(transport); return transport; } Transport *MultiTransport::selectTransport(const TransferRequest &entry) { if (entry.target_id == LOCAL_SEGMENT_ID && transport_map_.count("local")) - return transport_map_["local"]; + return transport_map_["local"].get(); auto target_segment_desc = metadata_->getSegmentDescByID(entry.target_id); if (!target_segment_desc) { LOG(ERROR) << "MultiTransport: Incorrect target segment id " @@ -165,17 +165,18 @@ Transport *MultiTransport::selectTransport(const TransferRequest &entry) { LOG(ERROR) << "MultiTransport: Transport " << proto << " not installed"; return nullptr; } - return transport_map_[proto]; + return transport_map_[proto].get(); } Transport *MultiTransport::getTransport(const std::string &proto) { if (!transport_map_.count(proto)) return nullptr; - return transport_map_[proto]; + return transport_map_[proto].get(); } std::vector MultiTransport::listTransports() { std::vector transport_list; - for (auto &entry : transport_map_) transport_list.push_back(entry.second); + for (auto &entry : transport_map_) + transport_list.push_back(entry.second.get()); return transport_list; } diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index df677df..e38c149 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -371,9 +371,18 @@ static inline const std::string getNetworkAddress(struct sockaddr *addr) { } struct SocketHandShakePlugin : public HandShakePlugin { - SocketHandShakePlugin() : listener_running_(false) {} + SocketHandShakePlugin() : listener_running_(false), listen_fd_(-1) {} + + void closeListen() { + if (listen_fd_ >= 0) { + LOG(INFO) << "SocketHandShakePlugin: closing listen socket"; + close(listen_fd_); + listen_fd_ = -1; + } + } virtual ~SocketHandShakePlugin() { + closeListen(); if (listener_running_) { listener_running_ = false; listener_.join(); @@ -383,14 +392,14 @@ struct SocketHandShakePlugin : public HandShakePlugin { virtual int startDaemon(OnReceiveCallBack on_recv_callback, uint16_t listen_port) { sockaddr_in bind_address; - int on = 1, listen_fd = -1; + int on = 1; memset(&bind_address, 0, sizeof(sockaddr_in)); bind_address.sin_family = AF_INET; bind_address.sin_port = htons(listen_port); bind_address.sin_addr.s_addr = INADDR_ANY; - listen_fd = socket(AF_INET, SOCK_STREAM, 0); - if (listen_fd < 0) { + listen_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (listen_fd_ < 0) { PLOG(ERROR) << "SocketHandShakePlugin: socket()"; return ERR_SOCKET; } @@ -398,39 +407,39 @@ struct SocketHandShakePlugin : public HandShakePlugin { struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; - if (setsockopt(listen_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, + if (setsockopt(listen_fd_, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout))) { PLOG(ERROR) << "SocketHandShakePlugin: setsockopt(SO_RCVTIMEO)"; - close(listen_fd); + closeListen(); return ERR_SOCKET; } - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { PLOG(ERROR) << "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)"; - close(listen_fd); + closeListen(); return ERR_SOCKET; } - if (bind(listen_fd, (sockaddr *)&bind_address, sizeof(sockaddr_in)) < + if (bind(listen_fd_, (sockaddr *)&bind_address, sizeof(sockaddr_in)) < 0) { PLOG(ERROR) << "SocketHandShakePlugin: bind (port " << listen_port << ")"; - close(listen_fd); + closeListen(); return ERR_SOCKET; } - if (listen(listen_fd, 5)) { + if (listen(listen_fd_, 5)) { PLOG(ERROR) << "SocketHandShakePlugin: listen()"; - close(listen_fd); + closeListen(); return ERR_SOCKET; } listener_running_ = true; - listener_ = std::thread([this, listen_fd, on_recv_callback]() { + listener_ = std::thread([this, on_recv_callback]() { while (listener_running_) { sockaddr_in addr; socklen_t addr_len = sizeof(sockaddr_in); - int conn_fd = accept(listen_fd, (sockaddr *)&addr, &addr_len); + int conn_fd = accept(listen_fd_, (sockaddr *)&addr, &addr_len); if (conn_fd < 0) { if (errno != EWOULDBLOCK) PLOG(ERROR) << "SocketHandShakePlugin: accept()"; @@ -584,6 +593,7 @@ struct SocketHandShakePlugin : public HandShakePlugin { std::atomic listener_running_; std::thread listener_; + int listen_fd_; }; std::shared_ptr HandShakePlugin::Create( diff --git a/mooncake-transfer-engine/tests/CMakeLists.txt b/mooncake-transfer-engine/tests/CMakeLists.txt index 497f2f1..3611ba5 100644 --- a/mooncake-transfer-engine/tests/CMakeLists.txt +++ b/mooncake-transfer-engine/tests/CMakeLists.txt @@ -1,17 +1,23 @@ add_executable(rdma_transport_test rdma_transport_test.cpp) target_link_libraries(rdma_transport_test PUBLIC transfer_engine) +# add_test(NAME rdma_transport_test COMMAND rdma_transport_test) add_executable(transport_uint_test transport_uint_test.cpp) -target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main ) +target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main ) +add_test(NAME transport_uint_test COMMAND transport_uint_test) add_executable(rdma_transport_test2 rdma_transport_test2.cpp) -target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main ) +target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main ) +# add_test(NAME rdma_transport_test2 COMMAND rdma_transport_test2) add_executable(tcp_transport_test tcp_transport_test.cpp) -target_link_libraries(tcp_transport_test PUBLIC transfer_engine gtest gtest_main ) +target_link_libraries(tcp_transport_test PUBLIC transfer_engine gtest gtest_main ) +add_test(NAME tcp_transport_test COMMAND tcp_transport_test) add_executable(transfer_metadata_test transfer_metadata_test.cpp) target_link_libraries(transfer_metadata_test PUBLIC transfer_engine gtest gtest_main) +add_test(NAME transfer_metadata_test COMMAND transfer_metadata_test) add_executable(topology_test topology_test.cpp) target_link_libraries(topology_test PUBLIC transfer_engine gtest gtest_main) +add_test(NAME topology_test COMMAND topology_test) diff --git a/mooncake-transfer-engine/tests/tcp_transport_test.cpp b/mooncake-transfer-engine/tests/tcp_transport_test.cpp index 71c827c..862d88c 100644 --- a/mooncake-transfer-engine/tests/tcp_transport_test.cpp +++ b/mooncake-transfer-engine/tests/tcp_transport_test.cpp @@ -48,8 +48,6 @@ DEFINE_int32(gpu_id, 0, "GPU ID to use"); using namespace mooncake; -//// etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls -/// http://10.0.0.1:2379 / ./tcp_transport_test namespace mooncake { class TCPTransportTest : public ::testing::Test { @@ -58,12 +56,29 @@ class TCPTransportTest : public ::testing::Test { void SetUp() override { google::InitGoogleLogging("TCPTransportTest"); FLAGS_logtostderr = 1; + + const char *env = std::getenv("MC_METADATA_SERVER"); + if (env) + metadata_server = env; + else + metadata_server = metadata_server; + LOG(INFO) << "metadata_server: " << metadata_server; + + env = std::getenv("MC_LOCAL_SERVER_NAME"); + if (env) + local_server_name = env; + else + local_server_name = "127.0.0.2:12345"; + LOG(INFO) << "local_server_name: " << local_server_name; } void TearDown() override { // 清理 glog google::ShutdownGoogleLogging(); } + + std::string metadata_server; + std::string local_server_name; }; static void *allocateMemoryPool(size_t size, int socket_id, @@ -73,9 +88,10 @@ static void *allocateMemoryPool(size_t size, int socket_id, TEST_F(TCPTransportTest, GetTcpTest) { auto engine = std::make_unique(); - auto hostname_port = parseHostNameWithPort("127.0.0.2:12345"); - engine->init("127.0.0.1:2379", "127.0.0.2:12345", - hostname_port.first.c_str(), hostname_port.second); + auto hostname_port = parseHostNameWithPort(local_server_name); + auto rc = engine->init(metadata_server, local_server_name, + hostname_port.first.c_str(), hostname_port.second); + LOG_ASSERT(rc == 0); Transport *xport = nullptr; xport = engine->installTransport("tcp", nullptr); LOG_ASSERT(xport != nullptr); @@ -86,22 +102,23 @@ TEST_F(TCPTransportTest, Writetest) { void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; auto engine = std::make_unique(); - auto hostname_port = parseHostNameWithPort("127.0.0.2:12345"); - engine->init("127.0.0.1:2379", "127.0.0.2:12345", - hostname_port.first.c_str(), hostname_port.second); + auto hostname_port = parseHostNameWithPort(local_server_name); + auto rc = engine->init(metadata_server, local_server_name, + hostname_port.first.c_str(), hostname_port.second); + LOG_ASSERT(rc == 0); Transport *xport = nullptr; xport = engine->installTransport("tcp", nullptr); LOG_ASSERT(xport != nullptr); addr = allocateMemoryPool(ram_buffer_size, 0, false); - int rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0"); + rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0"); LOG_ASSERT(!rc); for (size_t offset = 0; offset < kDataLength; ++offset) *((char *)(addr) + offset) = 'a' + lrand48() % 26; auto batch_id = engine->allocateBatchID(1); int ret = 0; - auto segment_id = engine->openSegment("127.0.0.2:12345"); + auto segment_id = engine->openSegment(local_server_name); TransferRequest entry; auto segment_desc = engine->getMetadata()->getSegmentDescByID(segment_id); uint64_t remote_base = (uint64_t)segment_desc->buffers[0].addr; @@ -129,8 +146,8 @@ TEST_F(TCPTransportTest, WriteAndReadtest) { void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; auto engine = std::make_unique(); - auto hostname_port = parseHostNameWithPort("127.0.0.2:12345"); - engine->init("127.0.0.1:2379", "127.0.0.2:12345", + auto hostname_port = parseHostNameWithPort(local_server_name); + engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); Transport *xport = nullptr; xport = engine->installTransport("tcp", nullptr); @@ -142,7 +159,7 @@ TEST_F(TCPTransportTest, WriteAndReadtest) { for (size_t offset = 0; offset < kDataLength; ++offset) *((char *)(addr) + offset) = 'a' + lrand48() % 26; - auto segment_id = engine->openSegment("127.0.0.2:12345"); + auto segment_id = engine->openSegment(local_server_name); auto segment_desc = engine->getMetadata()->getSegmentDescByID(segment_id); uint64_t remote_base = (uint64_t)segment_desc->buffers[0].addr; { @@ -200,8 +217,8 @@ TEST_F(TCPTransportTest, WriteAndRead2test) { void *addr = nullptr; const size_t ram_buffer_size = 1ull << 30; auto engine = std::make_unique(); - auto hostname_port = parseHostNameWithPort("127.0.0.2:12345"); - engine->init("127.0.0.1:2379", "127.0.0.2:12345", + auto hostname_port = parseHostNameWithPort(local_server_name); + engine->init(metadata_server, local_server_name, hostname_port.first.c_str(), hostname_port.second); Transport *xport = nullptr; xport = engine->installTransport("tcp", nullptr); @@ -213,7 +230,7 @@ TEST_F(TCPTransportTest, WriteAndRead2test) { for (size_t offset = 0; offset < kDataLength; ++offset) *((char *)(addr) + offset) = 'a' + lrand48() % 26; - auto segment_id = engine->openSegment("127.0.0.2:12345"); + auto segment_id = engine->openSegment(local_server_name); auto segment_desc = engine->getMetadata()->getSegmentDescByID(segment_id); uint64_t remote_base = (uint64_t)segment_desc->buffers[0].addr; diff --git a/mooncake-transfer-engine/tests/transfer_metadata_test.cpp b/mooncake-transfer-engine/tests/transfer_metadata_test.cpp index 2d984d5..85a6489 100644 --- a/mooncake-transfer-engine/tests/transfer_metadata_test.cpp +++ b/mooncake-transfer-engine/tests/transfer_metadata_test.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "transfer_metadata.h" + #include #include #include @@ -20,14 +22,9 @@ #include #include "transport/transport.h" -#include "transfer_metadata.h" using namespace mooncake; -//default value is used for local test -DEFINE_string(metadata_server, "127.0.0.1:2379", "etcd server host address"); -DEFINE_string(local_server, "127.0.0.2:12345", "Local server name for segment discovery"); - namespace mooncake { class TransferMetadataTest : public ::testing::Test { @@ -36,28 +33,46 @@ class TransferMetadataTest : public ::testing::Test { // initialize glog google::InitGoogleLogging("TransferMetadataTest"); FLAGS_logtostderr = 1; // output to stdout - metadata_client = std::make_unique(FLAGS_metadata_server); + + const char* env = std::getenv("MC_METADATA_SERVER"); + if (env) + metadata_server = env; + else + metadata_server = metadata_server; + LOG(INFO) << "metadata_server: " << metadata_server; + + env = std::getenv("MC_LOCAL_SERVER_NAME"); + if (env) + local_server_name = env; + else + local_server_name = "127.0.0.2:12345"; + LOG(INFO) << "local_server_name: " << local_server_name; + + metadata_client = std::make_unique(metadata_server); } void TearDown() override { // clean up glog google::ShutdownGoogleLogging(); } std::unique_ptr metadata_client; + std::string metadata_server; + std::string local_server_name; }; -//add and search LocalSegmentMeta +// add and search LocalSegmentMeta TEST_F(TransferMetadataTest, LocalSegmentTest) { auto segment_des = std::make_shared(); - segment_des->name="test_server"; - segment_des->protocol="rdma"; - TransferMetadata::SegmentID segment_id=1111111; - std::string segment_name="test_segment"; - int re = metadata_client->addLocalSegment(segment_id, segment_name, std::move(segment_des)); + segment_des->name = "test_server"; + segment_des->protocol = "rdma"; + TransferMetadata::SegmentID segment_id = 1111111; + std::string segment_name = "test_segment"; + int re = metadata_client->addLocalSegment(segment_id, segment_name, + std::move(segment_des)); ASSERT_EQ(re, 0); auto des = metadata_client->getSegmentDescByName(segment_name); - ASSERT_EQ(des,segment_des); + ASSERT_EQ(des, segment_des); des = metadata_client->getSegmentDescByID(segment_id, false); - ASSERT_EQ(des,segment_des); + ASSERT_EQ(des, segment_des); auto id = metadata_client->getSegmentID(segment_name); ASSERT_EQ(id, segment_id); } @@ -65,31 +80,32 @@ TEST_F(TransferMetadataTest, LocalSegmentTest) { // add and remove LocalMemoryBufferMeta TEST_F(TransferMetadataTest, LocalMemoryBufferTest) { auto segment_des = std::make_shared(); - segment_des->name="test_localMemery"; - segment_des->protocol="rdma"; - int re = metadata_client->addLocalSegment(LOCAL_SEGMENT_ID, "test_local_segment", std::move(segment_des)); + segment_des->name = "test_localMemery"; + segment_des->protocol = "rdma"; + int re = metadata_client->addLocalSegment( + LOCAL_SEGMENT_ID, "test_local_segment", std::move(segment_des)); ASSERT_EQ(re, 0); - uint64_t addr=0; + uint64_t addr = 0; for (int i = 0; i < 10; ++i) { TransferMetadata::BufferDesc buffer_des; - buffer_des.addr= addr + i*2048; - buffer_des.length=1024; + buffer_des.addr = addr + i * 2048; + buffer_des.length = 1024; re = metadata_client->addLocalMemoryBuffer(buffer_des, false); ASSERT_EQ(re, 0); } - addr=1000; + addr = 1000; re = metadata_client->removeLocalMemoryBuffer((void*)addr, false); ASSERT_EQ(re, ERR_ADDRESS_NOT_REGISTERED); for (int i = 9; i > 0; --i) { - addr = i*2048; + addr = i * 2048; re = metadata_client->removeLocalMemoryBuffer((void*)addr, false); ASSERT_EQ(re, 0); } } -//add, get and remove RPCMetaEntryMeta +// add, get and remove RPCMetaEntryMeta TEST_F(TransferMetadataTest, RpcMetaEntryTest) { - auto hostname_port = parseHostNameWithPort(FLAGS_local_server); + auto hostname_port = parseHostNameWithPort(local_server_name); TransferMetadata::RpcMetaDesc desc; desc.ip_or_host_name = hostname_port.first.c_str(); desc.rpc_port = hostname_port.second; @@ -103,7 +119,7 @@ TEST_F(TransferMetadataTest, RpcMetaEntryTest) { ASSERT_EQ(re, 0); } -}// namespace mooncake +} // namespace mooncake int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); From 607fd4f7c9610e770c5c59fdee2eaba519b17590 Mon Sep 17 00:00:00 2001 From: doujiang24 Date: Mon, 6 Jan 2025 17:06:50 +0800 Subject: [PATCH 2/2] fix workflow script for testing --- .github/workflows/build-linux.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-linux.yml b/.github/workflows/build-linux.yml index 82487cd..d61c2dc 100644 --- a/.github/workflows/build-linux.yml +++ b/.github/workflows/build-linux.yml @@ -17,7 +17,7 @@ jobs: sudo bash -x dependencies.sh mkdir build cd build - cmake .. + cmake .. -DUSE_HTTP=ON shell: bash - name: make run: | @@ -28,10 +28,13 @@ jobs: run: | cd mooncake-transfer-engine/example/http-metadata-server export PATH=$PATH:/usr/local/go/bin - go run . --addr=:8090 & + go mod tidy && go build -o http-metadata-server . + ./http-metadata-server --addr=:8090 & shell: bash - name: test run: | cd build - MC_METADATA_SERVER=http://127.0.0.1:8090/metadata make test -j + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib + ldconfig -v || echo "always continue" + MC_METADATA_SERVER=http://127.0.0.1:8090/metadata make test -j ARGS="-V" shell: bash