diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index d3696afb..4cff7265 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -283,7 +283,7 @@ Registers a space starting at address `addr` with a length of `size` on the loca - `addr`: The starting address of the registration space; - `size`: The length of the registration space; -- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installTransport`), the preferred network card is identified. +- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installTransport`), the preferred network card is identified. You can also use `*`, Transfer Engine will try to automatically recognize the `device` corresponding to `addr`, if it fails to recognize the device, it will print a `WARNING` level log and use all network cards, no preferred network cards. - `remote_accessible`: Indicates whether this memory can be accessed by remote nodes. - Return value: If successful, returns 0; otherwise, returns a negative value. diff --git a/doc/zh/transfer-engine.md b/doc/zh/transfer-engine.md index 54edc749..dbd37f19 100644 --- a/doc/zh/transfer-engine.md +++ b/doc/zh/transfer-engine.md @@ -253,7 +253,7 @@ int registerLocalMemory(void *addr, size_t size, string location, bool remote_ac - `addr`: 注册空间起始地址; - `size`:注册空间长度; -- `location`: 这一段内存对应的 `device`,比如 `cuda:0` 表示对应 GPU 设备,`cpu:0` 表示对应 CPU socket,通过和网卡优先级顺序表(见`installTransport`) 匹配,识别优选的网卡。 +- `location`: 这一段内存对应的 `device`,比如 `cuda:0` 表示对应 GPU 设备,`cpu:0` 表示对应 CPU socket,通过和网卡优先级顺序表(见`installTransport`) 匹配,识别优选的网卡。 也可以使用 `*`,Transfer Engine 会尽量自动识别 `addr` 对应的 `device`,如果识别失败,将打印 `WARNING` 级别日志,并且使用全部的网卡,不再区分优选网卡。 - `remote_accessible`: 标识这一块内存能否被远端节点访问。 - 返回值:若成功,返回 0;否则返回负数值。 diff --git a/mooncake-transfer-engine/include/common.h b/mooncake-transfer-engine/include/common.h index 471229d9..71e4f0aa 100644 --- a/mooncake-transfer-engine/include/common.h +++ b/mooncake-transfer-engine/include/common.h @@ -61,8 +61,7 @@ static inline int bindToSocket(int socket_id) { } } numa_free_cpumask(cpu_list); - if (nr_cpus == 0) - return 0; + if (nr_cpus == 0) return 0; if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_set)) { LOG(ERROR) << "Failed to set socket affinity"; return ERR_NUMA; diff --git a/mooncake-transfer-engine/include/memory_location.h b/mooncake-transfer-engine/include/memory_location.h new file mode 100644 index 00000000..9b6aef51 --- /dev/null +++ b/mooncake-transfer-engine/include/memory_location.h @@ -0,0 +1,40 @@ +// Copyright 2024 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef MEMORY_LOCATION_H +#define MEMORY_LOCATION_H + +#include + +#include + +#include "common.h" + +const int pagesize = 4096; + +namespace mooncake { +struct MemoryLocationEntry { + uint64_t start; + size_t len; + std::string location; +}; + +// Get CPU numa node id +// TODO: support getting cuda device id from unified address. +const std::vector getMemoryLocation(void *start, + size_t len); + +} // namespace mooncake + +#endif // MEMORY_LOCATION_H diff --git a/mooncake-transfer-engine/src/memory_location.cpp b/mooncake-transfer-engine/src/memory_location.cpp new file mode 100644 index 00000000..1c66038f --- /dev/null +++ b/mooncake-transfer-engine/src/memory_location.cpp @@ -0,0 +1,67 @@ +// Copyright 2024 KVCache.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "memory_location.h" + +namespace mooncake { + +uintptr_t alignPage(uintptr_t address) { return address & ~(pagesize - 1); } + +std::string genCpuNodeName(int node) { + if (node >= 0) return "cpu:" + std::to_string(node); + + // use "*" when failed to get the numa node. + return "*"; +} + +const std::vector getMemoryLocation(void *start, + size_t len) { + std::vector entries; + + // start and end address may not be page aligned. + uintptr_t aligned_start = alignPage((uintptr_t)start); + int n = (uintptr_t(start) - aligned_start + len + pagesize - 1) / pagesize; + void **pages = (void **)malloc(sizeof(void *) * n); + int *status = (int *)malloc(sizeof(int) * n); + + for (int i = 0; i < n; i++) { + pages[i] = (void *)((char *)aligned_start + i * pagesize); + } + + int rc = numa_move_pages(0, n, pages, nullptr, status, 0); + if (rc != 0) { + PLOG(WARNING) << "Failed to get NUMA node, addr: " << start + << ", len: " << len; + entries.push_back({(uint64_t)start, len, "*"}); + return entries; + } + + int node = status[0]; + uint64_t start_addr = (uint64_t)start; + uint64_t new_start_addr; + for (int i = 1; i < n; i++) { + if (status[i] != node) { + new_start_addr = alignPage((uint64_t)start) + i * pagesize; + entries.push_back({start_addr, size_t(new_start_addr - start_addr), + genCpuNodeName(node)}); + start_addr = new_start_addr; + node = status[i]; + } + } + entries.push_back( + {start_addr, (uint64_t)start + len - start_addr, genCpuNodeName(node)}); + return entries; +} + +} // namespace mooncake diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 34cf065b..14745031 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -264,7 +264,8 @@ Json::Value Topology::toJson() const { } int Topology::selectDevice(const std::string storage_type, int retry_count) { - if (!resolved_matrix_.count(storage_type)) return ERR_DEVICE_NOT_FOUND; + if (resolved_matrix_.count(storage_type) == 0) return ERR_DEVICE_NOT_FOUND; + auto &entry = resolved_matrix_[storage_type]; if (retry_count == 0) { int rand_value = SimpleRandom::Get().next(); @@ -294,6 +295,9 @@ int Topology::resolve() { hca_list_.push_back(hca); hca_id_map[hca] = next_hca_map_index; next_hca_map_index++; + + // "*" means any device + resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]); } resolved_matrix_[entry.first].preferred_hca.push_back( hca_id_map[hca]); @@ -303,6 +307,9 @@ int Topology::resolve() { hca_list_.push_back(hca); hca_id_map[hca] = next_hca_map_index; next_hca_map_index++; + + // "*" means any device + resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]); } resolved_matrix_[entry.first].avail_hca.push_back(hca_id_map[hca]); } diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index bedd9a92..d43ce438 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -25,6 +25,7 @@ #include "common.h" #include "config.h" +#include "memory_location.h" #include "topology.h" #include "transport/rdma_transport/rdma_context.h" #include "transport/rdma_transport/rdma_endpoint.h" @@ -88,9 +89,6 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, bool update_metadata) { (void)remote_accessible; BufferDesc buffer_desc; - buffer_desc.name = name; - buffer_desc.addr = (uint64_t)addr; - buffer_desc.length = length; const static int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; @@ -100,9 +98,28 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, buffer_desc.lkey.push_back(context->lkey(addr)); buffer_desc.rkey.push_back(context->rkey(addr)); } - int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); - if (rc) return rc; + // Get the memory location automatically after registered MR(pinned), + // when the name is "*". + if (name == "*") { + const std::vector entries = + getMemoryLocation(addr, length); + for (auto &entry : entries) { + buffer_desc.name = entry.location; + buffer_desc.addr = entry.start; + buffer_desc.length = entry.len; + int rc = + metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); + if (rc) return rc; + } + } else { + buffer_desc.name = name; + buffer_desc.addr = (uint64_t)addr; + buffer_desc.length = length; + int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); + + if (rc) return rc; + } return 0; } diff --git a/mooncake-transfer-engine/tests/CMakeLists.txt b/mooncake-transfer-engine/tests/CMakeLists.txt index 3611ba58..504221df 100644 --- a/mooncake-transfer-engine/tests/CMakeLists.txt +++ b/mooncake-transfer-engine/tests/CMakeLists.txt @@ -21,3 +21,7 @@ add_test(NAME transfer_metadata_test COMMAND transfer_metadata_test) add_executable(topology_test topology_test.cpp) target_link_libraries(topology_test PUBLIC transfer_engine gtest gtest_main) add_test(NAME topology_test COMMAND topology_test) + +add_executable(memory_location_test memory_location_test.cpp) +target_link_libraries(memory_location_test PUBLIC transfer_engine gtest gtest_main) +add_test(NAME memory_location_test COMMAND memory_location_test) diff --git a/mooncake-transfer-engine/tests/memory_location_test.cpp b/mooncake-transfer-engine/tests/memory_location_test.cpp new file mode 100644 index 00000000..a49accd3 --- /dev/null +++ b/mooncake-transfer-engine/tests/memory_location_test.cpp @@ -0,0 +1,123 @@ +#include "memory_location.h" + +#include +#include +#include +#include +#include + +TEST(MemoryLocationTest, MallocSimpleNode0) { + int size = 4096 * 10; + void *addr = numa_alloc_onnode(size, 0); + ASSERT_NE(addr, nullptr); + + auto entries = mooncake::getMemoryLocation(addr, size); + ASSERT_EQ(entries.size(), 1); + + // check the memory location, no node before page fault + EXPECT_EQ(entries[0].start, (uint64_t)addr); + EXPECT_EQ(entries[0].location, "*"); + EXPECT_EQ(entries[0].len, size); + + // trigger page fault + memset(addr, 0, size); + + entries = mooncake::getMemoryLocation(addr, size); + ASSERT_EQ(entries.size(), 1); + + // check the memory location, node 0 after page fault + EXPECT_EQ(entries[0].start, (uint64_t)addr); + EXPECT_EQ(entries[0].location, "cpu:0"); + EXPECT_EQ(entries[0].len, size); + + numa_free(addr, size); +} + +TEST(MemoryLocationTest, MallocSimpleNodeLargest) { + int node = numa_max_node(); + LOG(INFO) << "node: " << node; + + std::string location = "cpu:" + std::to_string(node); + + int size = 4096 * 10; + void *addr = numa_alloc_onnode(size, node); + ASSERT_NE(addr, nullptr); + + // trigger page fault + memset(addr, 0, size); + + auto entries = mooncake::getMemoryLocation(addr, size); + ASSERT_EQ(entries.size(), 1); + + // check the memory location + EXPECT_EQ(entries[0].start, (uint64_t)addr); + EXPECT_EQ(entries[0].location, location); + EXPECT_EQ(entries[0].len, size); + + numa_free(addr, size); +} + +TEST(MemoryLocationTest, MallocMultipleNodes) { + int nodea = 0; + int nodeb = numa_max_node(); + LOG(INFO) << "node a: " << nodea << " node b: " << nodeb; + + std::string locationa = "cpu:" + std::to_string(nodea); + std::string locationb = "cpu:" + std::to_string(nodeb); + + int size = 4096 * 10; + void *addr = numa_alloc_onnode(size, nodea); + ASSERT_NE(addr, nullptr); + ASSERT_EQ((uint64_t)addr % 4096, 0); // page aligned + + // trigger page fault + memset(addr, 0, size); + + int rc; + + // move first two pages & last one page to nodeb + void *pages[3] = {addr, (void *)((uint64_t)addr + 4096), + (void *)((uint64_t)addr + 4096 * 9)}; + int nodes[3] = {nodeb, nodeb, nodeb}; + int status[3]; + rc = numa_move_pages(0, 3, pages, nodes, status, MPOL_MF_MOVE); + if (rc != 0) { + PLOG(ERROR) << "numa_move_pages failed, rc: " << rc; + } + ASSERT_EQ(rc, 0); + + // not page aligned + void *start = (void *)((uint64_t)addr + 1024 * 2); + + auto entries = mooncake::getMemoryLocation(start, size - 1024 * 4); + + if (nodea == nodeb) { + // only one numa node + ASSERT_EQ(entries.size(), 1); + + // check the first memory location + EXPECT_EQ(entries[0].start, (uint64_t)start); + EXPECT_EQ(entries[0].location, locationa); + EXPECT_EQ(entries[0].len, size - 1024 * 4); + + } else { + ASSERT_EQ(entries.size(), 3); + + // check the first memory location + EXPECT_EQ(entries[0].start, (uint64_t)start); + EXPECT_EQ(entries[0].location, locationb); + EXPECT_EQ(entries[0].len, 4096 * 2 - 1024 * 2); + + // check the second memory location + EXPECT_EQ(entries[1].start, (uint64_t)addr + 4096 * 2); + EXPECT_EQ(entries[1].location, locationa); + EXPECT_EQ(entries[1].len, 4096 * 7); + + // check the third memory location + EXPECT_EQ(entries[2].start, (uint64_t)addr + 4096 * 9); + EXPECT_EQ(entries[2].location, locationb); + EXPECT_EQ(entries[2].len, 4096 - 1024 * 2); + } + + numa_free(addr, size); +} diff --git a/mooncake-transfer-engine/tests/rdma_transport_test.cpp b/mooncake-transfer-engine/tests/rdma_transport_test.cpp index 7af0662c..b8e525ff 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test.cpp @@ -272,7 +272,7 @@ int initiator() { LOG_ASSERT(!rc); #else addr = allocateMemoryPool(ram_buffer_size, 0, false); - int rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0"); + int rc = engine->registerLocalMemory(addr, ram_buffer_size, "*"); LOG_ASSERT(!rc); #endif diff --git a/mooncake-transfer-engine/tests/topology_test.cpp b/mooncake-transfer-engine/tests/topology_test.cpp index 6caf93be..d61580ce 100644 --- a/mooncake-transfer-engine/tests/topology_test.cpp +++ b/mooncake-transfer-engine/tests/topology_test.cpp @@ -5,8 +5,7 @@ #include "transfer_metadata.h" -TEST(ToplogyTest, GetTopologyMatrix) -{ +TEST(ToplogyTest, GetTopologyMatrix) { mooncake::Topology topology; topology.discover(); std::string json_str = topology.toString(); @@ -17,77 +16,96 @@ TEST(ToplogyTest, GetTopologyMatrix) } TEST(ToplogyTest, TestEmpty) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " - ": [[\"erdma_1\"],[\"erdma_0\"]]}"; - topology.clear(); - topology.parse(json_str); - ASSERT_TRUE(!topology.empty()); + mooncake::Topology topology; + std::string json_str = + "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " + ": [[\"erdma_1\"],[\"erdma_0\"]]}"; + topology.clear(); + topology.parse(json_str); + ASSERT_TRUE(!topology.empty()); } TEST(ToplogyTest, TestHcaList) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_0\"]],\"cpu:1\" " - ": [[\"erdma_0\"],[\"erdma_0\"]]}"; - topology.clear(); - topology.parse(json_str); - ASSERT_EQ(topology.getHcaList().size(), 1); - std::set HcaList = {"erdma_0"}; - for (auto &hca : topology.getHcaList()) { - ASSERT_TRUE(HcaList.count(hca)); - } + mooncake::Topology topology; + std::string json_str = + "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_0\"]],\"cpu:1\" " + ": [[\"erdma_0\"],[\"erdma_0\"]]}"; + topology.clear(); + topology.parse(json_str); + ASSERT_EQ(topology.getHcaList().size(), 1); + std::set HcaList = {"erdma_0"}; + for (auto &hca : topology.getHcaList()) { + ASSERT_TRUE(HcaList.count(hca)); + } } TEST(ToplogyTest, TestHcaListSize) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " - ": [[\"erdma_2\"],[\"erdma_3\"]]}"; - topology.clear(); - topology.parse(json_str); - ASSERT_EQ(topology.getHcaList().size(), 4); + mooncake::Topology topology; + std::string json_str = + "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " + ": [[\"erdma_2\"],[\"erdma_3\"]]}"; + topology.clear(); + topology.parse(json_str); + ASSERT_EQ(topology.getHcaList().size(), 4); } TEST(ToplogyTest, TestHcaList2) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " - ": [[\"erdma_1\"],[\"erdma_0\"]]}"; - topology.clear(); - topology.parse(json_str); - ASSERT_EQ(topology.getHcaList().size(), 2); - std::set HcaList = {"erdma_0", "erdma_1"}; - for (auto &hca : topology.getHcaList()) { - ASSERT_TRUE(HcaList.count(hca)); - } + mooncake::Topology topology; + std::string json_str = + "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]],\"cpu:1\" " + ": [[\"erdma_1\"],[\"erdma_0\"]]}"; + topology.clear(); + topology.parse(json_str); + ASSERT_EQ(topology.getHcaList().size(), 2); + std::set HcaList = {"erdma_0", "erdma_1"}; + for (auto &hca : topology.getHcaList()) { + ASSERT_TRUE(HcaList.count(hca)); + } } TEST(ToplogyTest, TestMatrix) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]]}"; - topology.clear(); - topology.parse(json_str); - auto matrix = topology.getMatrix(); - ASSERT_TRUE(matrix.size() == 1); - ASSERT_TRUE(matrix.count("cpu:0")); + mooncake::Topology topology; + std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]]}"; + topology.clear(); + topology.parse(json_str); + auto matrix = topology.getMatrix(); + ASSERT_TRUE(matrix.size() == 1); + ASSERT_TRUE(matrix.count("cpu:0")); } TEST(ToplogyTest, TestSelectDevice) { - mooncake::Topology topology; - std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]]}"; - topology.clear(); - topology.parse(json_str); - std::set items = {0, 1}; - int device; - device = topology.selectDevice("cpu:0", 2); - ASSERT_TRUE(items.count(device)); - items.erase(device); - device = topology.selectDevice("cpu:0", 1); - ASSERT_TRUE(items.count(device)); - items.erase(device); - ASSERT_TRUE(items.empty()); + mooncake::Topology topology; + std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]]}"; + topology.clear(); + topology.parse(json_str); + std::set items = {0, 1}; + int device; + device = topology.selectDevice("cpu:0", 2); + ASSERT_TRUE(items.count(device)); + items.erase(device); + device = topology.selectDevice("cpu:0", 1); + ASSERT_TRUE(items.count(device)); + items.erase(device); + ASSERT_TRUE(items.empty()); +} + +TEST(ToplogyTest, TestSelectDeviceAny) { + mooncake::Topology topology; + std::string json_str = "{\"cpu:0\" : [[\"erdma_0\"],[\"erdma_1\"]]}"; + topology.clear(); + topology.parse(json_str); + std::set items = {0, 1}; + int device; + device = topology.selectDevice("*", 2); + ASSERT_TRUE(items.count(device)); + items.erase(device); + device = topology.selectDevice("*", 1); + ASSERT_TRUE(items.count(device)); + items.erase(device); + ASSERT_TRUE(items.empty()); } -int main(int argc, char **argv) -{ +int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }