Skip to content

Commit

Permalink
[TransferEngine] feature: registerLocalMemory support the "*" locatio…
Browse files Browse the repository at this point in the history
…n. (#86)

* [TransferEngine] feature: registerLocalMemory support the "*" location.

1. try best to recognize the cpu numa node for now,
2. use all nic when failed to get the numa node,
3. will support cuda memory in the feature.

Signed-off-by: doujiang24 <doujiang24@gmail.com>

* fix test when there is only one numa node

Signed-off-by: doujiang24 <doujiang24@gmail.com>

---------

Signed-off-by: doujiang24 <doujiang24@gmail.com>
  • Loading branch information
doujiang24 authored Jan 23, 2025
1 parent 19aafbe commit 86a8649
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 67 deletions.
2 changes: 1 addition & 1 deletion doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Registers a space starting at address `addr` with a length of `size` on the loca
- `addr`: The starting address of the registration space;
- `size`: The length of the registration space;
- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installTransport`), the preferred network card is identified.
- `location`: The `device` corresponding to this memory segment, such as `cuda:0` indicating the GPU device, `cpu:0` indicating the CPU socket, by matching with the network card priority order table (see `installTransport`), the preferred network card is identified. You can also use `*`, Transfer Engine will try to automatically recognize the `device` corresponding to `addr`, if it fails to recognize the device, it will print a `WARNING` level log and use all network cards, no preferred network cards.
- `remote_accessible`: Indicates whether this memory can be accessed by remote nodes.
- Return value: If successful, returns 0; otherwise, returns a negative value.
Expand Down
2 changes: 1 addition & 1 deletion doc/zh/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ int registerLocalMemory(void *addr, size_t size, string location, bool remote_ac
- `addr`: 注册空间起始地址;
- `size`:注册空间长度;
- `location`: 这一段内存对应的 `device`,比如 `cuda:0` 表示对应 GPU 设备,`cpu:0` 表示对应 CPU socket,通过和网卡优先级顺序表(见`installTransport`) 匹配,识别优选的网卡。
- `location`: 这一段内存对应的 `device`,比如 `cuda:0` 表示对应 GPU 设备,`cpu:0` 表示对应 CPU socket,通过和网卡优先级顺序表(见`installTransport`) 匹配,识别优选的网卡。 也可以使用 `*`,Transfer Engine 会尽量自动识别 `addr` 对应的 `device`,如果识别失败,将打印 `WARNING` 级别日志,并且使用全部的网卡,不再区分优选网卡。
- `remote_accessible`: 标识这一块内存能否被远端节点访问。
- 返回值:若成功,返回 0;否则返回负数值。
Expand Down
3 changes: 1 addition & 2 deletions mooncake-transfer-engine/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ static inline int bindToSocket(int socket_id) {
}
}
numa_free_cpumask(cpu_list);
if (nr_cpus == 0)
return 0;
if (nr_cpus == 0) return 0;
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_set)) {
LOG(ERROR) << "Failed to set socket affinity";
return ERR_NUMA;
Expand Down
40 changes: 40 additions & 0 deletions mooncake-transfer-engine/include/memory_location.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef MEMORY_LOCATION_H
#define MEMORY_LOCATION_H

#include <glog/logging.h>

#include <memory>

#include "common.h"

const int pagesize = 4096;

namespace mooncake {
struct MemoryLocationEntry {
uint64_t start;
size_t len;
std::string location;
};

// Get CPU numa node id
// TODO: support getting cuda device id from unified address.
const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
size_t len);

} // namespace mooncake

#endif // MEMORY_LOCATION_H
67 changes: 67 additions & 0 deletions mooncake-transfer-engine/src/memory_location.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 KVCache.AI
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "memory_location.h"

namespace mooncake {

uintptr_t alignPage(uintptr_t address) { return address & ~(pagesize - 1); }

std::string genCpuNodeName(int node) {
if (node >= 0) return "cpu:" + std::to_string(node);

// use "*" when failed to get the numa node.
return "*";
}

const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
size_t len) {
std::vector<MemoryLocationEntry> entries;

// start and end address may not be page aligned.
uintptr_t aligned_start = alignPage((uintptr_t)start);
int n = (uintptr_t(start) - aligned_start + len + pagesize - 1) / pagesize;
void **pages = (void **)malloc(sizeof(void *) * n);
int *status = (int *)malloc(sizeof(int) * n);

for (int i = 0; i < n; i++) {
pages[i] = (void *)((char *)aligned_start + i * pagesize);
}

int rc = numa_move_pages(0, n, pages, nullptr, status, 0);
if (rc != 0) {
PLOG(WARNING) << "Failed to get NUMA node, addr: " << start
<< ", len: " << len;
entries.push_back({(uint64_t)start, len, "*"});
return entries;
}

int node = status[0];
uint64_t start_addr = (uint64_t)start;
uint64_t new_start_addr;
for (int i = 1; i < n; i++) {
if (status[i] != node) {
new_start_addr = alignPage((uint64_t)start) + i * pagesize;
entries.push_back({start_addr, size_t(new_start_addr - start_addr),
genCpuNodeName(node)});
start_addr = new_start_addr;
node = status[i];
}
}
entries.push_back(
{start_addr, (uint64_t)start + len - start_addr, genCpuNodeName(node)});
return entries;
}

} // namespace mooncake
9 changes: 8 additions & 1 deletion mooncake-transfer-engine/src/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ Json::Value Topology::toJson() const {
}

int Topology::selectDevice(const std::string storage_type, int retry_count) {
if (!resolved_matrix_.count(storage_type)) return ERR_DEVICE_NOT_FOUND;
if (resolved_matrix_.count(storage_type) == 0) return ERR_DEVICE_NOT_FOUND;

auto &entry = resolved_matrix_[storage_type];
if (retry_count == 0) {
int rand_value = SimpleRandom::Get().next();
Expand Down Expand Up @@ -294,6 +295,9 @@ int Topology::resolve() {
hca_list_.push_back(hca);
hca_id_map[hca] = next_hca_map_index;
next_hca_map_index++;

// "*" means any device
resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]);
}
resolved_matrix_[entry.first].preferred_hca.push_back(
hca_id_map[hca]);
Expand All @@ -303,6 +307,9 @@ int Topology::resolve() {
hca_list_.push_back(hca);
hca_id_map[hca] = next_hca_map_index;
next_hca_map_index++;

// "*" means any device
resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]);
}
resolved_matrix_[entry.first].avail_hca.push_back(hca_id_map[hca]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common.h"
#include "config.h"
#include "memory_location.h"
#include "topology.h"
#include "transport/rdma_transport/rdma_context.h"
#include "transport/rdma_transport/rdma_endpoint.h"
Expand Down Expand Up @@ -88,9 +89,6 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length,
bool update_metadata) {
(void)remote_accessible;
BufferDesc buffer_desc;
buffer_desc.name = name;
buffer_desc.addr = (uint64_t)addr;
buffer_desc.length = length;
const static int access_rights = IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ;
Expand All @@ -100,9 +98,28 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length,
buffer_desc.lkey.push_back(context->lkey(addr));
buffer_desc.rkey.push_back(context->rkey(addr));
}
int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata);

if (rc) return rc;
// Get the memory location automatically after registered MR(pinned),
// when the name is "*".
if (name == "*") {
const std::vector<MemoryLocationEntry> entries =
getMemoryLocation(addr, length);
for (auto &entry : entries) {
buffer_desc.name = entry.location;
buffer_desc.addr = entry.start;
buffer_desc.length = entry.len;
int rc =
metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata);
if (rc) return rc;
}
} else {
buffer_desc.name = name;
buffer_desc.addr = (uint64_t)addr;
buffer_desc.length = length;
int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata);

if (rc) return rc;
}

return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions mooncake-transfer-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ add_test(NAME transfer_metadata_test COMMAND transfer_metadata_test)
add_executable(topology_test topology_test.cpp)
target_link_libraries(topology_test PUBLIC transfer_engine gtest gtest_main)
add_test(NAME topology_test COMMAND topology_test)

add_executable(memory_location_test memory_location_test.cpp)
target_link_libraries(memory_location_test PUBLIC transfer_engine gtest gtest_main)
add_test(NAME memory_location_test COMMAND memory_location_test)
123 changes: 123 additions & 0 deletions mooncake-transfer-engine/tests/memory_location_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include "memory_location.h"

#include <glog/logging.h>
#include <gtest/gtest.h>
#include <numa.h>
#include <numaif.h>
#include <sys/mman.h>

TEST(MemoryLocationTest, MallocSimpleNode0) {
int size = 4096 * 10;
void *addr = numa_alloc_onnode(size, 0);
ASSERT_NE(addr, nullptr);

auto entries = mooncake::getMemoryLocation(addr, size);
ASSERT_EQ(entries.size(), 1);

// check the memory location, no node before page fault
EXPECT_EQ(entries[0].start, (uint64_t)addr);
EXPECT_EQ(entries[0].location, "*");
EXPECT_EQ(entries[0].len, size);

// trigger page fault
memset(addr, 0, size);

entries = mooncake::getMemoryLocation(addr, size);
ASSERT_EQ(entries.size(), 1);

// check the memory location, node 0 after page fault
EXPECT_EQ(entries[0].start, (uint64_t)addr);
EXPECT_EQ(entries[0].location, "cpu:0");
EXPECT_EQ(entries[0].len, size);

numa_free(addr, size);
}

TEST(MemoryLocationTest, MallocSimpleNodeLargest) {
int node = numa_max_node();
LOG(INFO) << "node: " << node;

std::string location = "cpu:" + std::to_string(node);

int size = 4096 * 10;
void *addr = numa_alloc_onnode(size, node);
ASSERT_NE(addr, nullptr);

// trigger page fault
memset(addr, 0, size);

auto entries = mooncake::getMemoryLocation(addr, size);
ASSERT_EQ(entries.size(), 1);

// check the memory location
EXPECT_EQ(entries[0].start, (uint64_t)addr);
EXPECT_EQ(entries[0].location, location);
EXPECT_EQ(entries[0].len, size);

numa_free(addr, size);
}

TEST(MemoryLocationTest, MallocMultipleNodes) {
int nodea = 0;
int nodeb = numa_max_node();
LOG(INFO) << "node a: " << nodea << " node b: " << nodeb;

std::string locationa = "cpu:" + std::to_string(nodea);
std::string locationb = "cpu:" + std::to_string(nodeb);

int size = 4096 * 10;
void *addr = numa_alloc_onnode(size, nodea);
ASSERT_NE(addr, nullptr);
ASSERT_EQ((uint64_t)addr % 4096, 0); // page aligned

// trigger page fault
memset(addr, 0, size);

int rc;

// move first two pages & last one page to nodeb
void *pages[3] = {addr, (void *)((uint64_t)addr + 4096),
(void *)((uint64_t)addr + 4096 * 9)};
int nodes[3] = {nodeb, nodeb, nodeb};
int status[3];
rc = numa_move_pages(0, 3, pages, nodes, status, MPOL_MF_MOVE);
if (rc != 0) {
PLOG(ERROR) << "numa_move_pages failed, rc: " << rc;
}
ASSERT_EQ(rc, 0);

// not page aligned
void *start = (void *)((uint64_t)addr + 1024 * 2);

auto entries = mooncake::getMemoryLocation(start, size - 1024 * 4);

if (nodea == nodeb) {
// only one numa node
ASSERT_EQ(entries.size(), 1);

// check the first memory location
EXPECT_EQ(entries[0].start, (uint64_t)start);
EXPECT_EQ(entries[0].location, locationa);
EXPECT_EQ(entries[0].len, size - 1024 * 4);

} else {
ASSERT_EQ(entries.size(), 3);

// check the first memory location
EXPECT_EQ(entries[0].start, (uint64_t)start);
EXPECT_EQ(entries[0].location, locationb);
EXPECT_EQ(entries[0].len, 4096 * 2 - 1024 * 2);

// check the second memory location
EXPECT_EQ(entries[1].start, (uint64_t)addr + 4096 * 2);
EXPECT_EQ(entries[1].location, locationa);
EXPECT_EQ(entries[1].len, 4096 * 7);

// check the third memory location
EXPECT_EQ(entries[2].start, (uint64_t)addr + 4096 * 9);
EXPECT_EQ(entries[2].location, locationb);
EXPECT_EQ(entries[2].len, 4096 - 1024 * 2);
}

numa_free(addr, size);
}
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/tests/rdma_transport_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ int initiator() {
LOG_ASSERT(!rc);
#else
addr = allocateMemoryPool(ram_buffer_size, 0, false);
int rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0");
int rc = engine->registerLocalMemory(addr, ram_buffer_size, "*");
LOG_ASSERT(!rc);
#endif

Expand Down
Loading

0 comments on commit 86a8649

Please sign in to comment.