From 795ca342363a0df6b18c771e04e997d9b4b18846 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Tue, 14 Oct 2025 03:30:07 +0000 Subject: [PATCH] Use dynamic loader to support tcp over cuda --- .../transport/tcp_transport/tcp_transport.cpp | 150 +++++++++++++++--- 1 file changed, 126 insertions(+), 24 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp index c6544b9f8..9db7e85c3 100644 --- a/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp +++ b/mooncake-transfer-engine/src/transport/tcp_transport/tcp_transport.cpp @@ -49,6 +49,80 @@ struct SessionHeader { uint8_t opcode; }; +class GpuRuntime { + public: + static GpuRuntime &instance() { + static GpuRuntime inst; + return inst; + } + + bool isAvailable() const { return handle_ != nullptr; } + + bool isDevicePtr(const void *addr) const { + if (!isAvailable() || !pGetAttr_) return false; + Attr attr{}; + int status = pGetAttr_(&attr, addr); + if (status != 0) return false; + return attr.type == kMemoryTypeDevice; + } + + bool copy(void *dst, const void *src, size_t bytes, int kind) const { + if (!isAvailable() || !pMemcpy_) return false; + int status = pMemcpy_(dst, src, bytes, kind); + return (status == 0); + } + + private: + GpuRuntime() { init(); } + ~GpuRuntime() { + if (handle_) dlclose(handle_); + } + GpuRuntime(const GpuRuntime &) = delete; + GpuRuntime &operator=(const GpuRuntime &) = delete; + + void init() { + const char *libs[] = { + "libcudart.so", // CUDA + "libmusa_runtime.so" // MUSA + }; + + for (auto lib : libs) { + handle_ = dlopen(lib, RTLD_LAZY); + if (!handle_) continue; + + pGetAttr_ = reinterpret_cast( + dlsym(handle_, "cudaPointerGetAttributes")); + if (!pGetAttr_) + pGetAttr_ = reinterpret_cast( + dlsym(handle_, "musaPointerGetAttributes")); + + pMemcpy_ = reinterpret_cast(dlsym(handle_, "cudaMemcpy")); + if (!pMemcpy_) + pMemcpy_ = + reinterpret_cast(dlsym(handle_, "musaMemcpy")); + + if (pGetAttr_ && pMemcpy_) { + std::cout << "[GpuRuntime] Loaded GPU runtime: " << lib << "\n"; + return; + } + + dlclose(handle_); + handle_ = nullptr; + } + } + + private: + void *handle_ = nullptr; + struct Attr { + int type; + }; + using GetAttrFn = int (*)(Attr *, const void *); + using MemcpyFn = int (*)(void *, const void *, size_t, int); + GetAttrFn pGetAttr_ = nullptr; + MemcpyFn pMemcpy_ = nullptr; + static constexpr int kMemoryTypeDevice = 2; +}; + #if defined(USE_CUDA) || defined(USE_MUSA) static bool isCudaMemory(void *addr) { cudaPointerAttributes attributes; @@ -57,6 +131,12 @@ static bool isCudaMemory(void *addr) { if (attributes.type == cudaMemoryTypeDevice) return true; return false; } +#else +static bool isCudaMemory(void *addr) { + auto &gpu = GpuRuntime::instance(); + if (!gpu.isAvailable()) return false; + return gpu.isDevicePtr(addr); +} #endif struct Session : public std::enable_shared_from_this { @@ -156,26 +236,39 @@ struct Session : public std::enable_shared_from_this { cudaMemcpy(dram_buffer, addr + total_transferred_bytes_, buffer_size, cudaMemcpyDefault); } +#else + if (isCudaMemory(addr)) { + dram_buffer = new char[buffer_size]; + auto &gpu = GpuRuntime::instance(); + gpu.copy(dram_buffer, addr + total_transferred_bytes_, buffer_size, + 4); + } #endif asio::async_write( socket_, asio::buffer(dram_buffer, buffer_size), [this, addr, dram_buffer, self](const asio::error_code &ec, std::size_t transferred_bytes) { -#if defined(USE_CUDA) || defined(USE_MUSA) if (isCudaMemory(addr)) { delete[] dram_buffer; } -#endif if (ec) { - LOG(ERROR) - << "Session::writeBody failed. " - << "Attempt to write data " << addr << " using buffer " - << dram_buffer << ". Error: " << ec.message() - << " (value: " << ec.value() << ")" - << ", total_transferred_bytes_: " - << total_transferred_bytes_ - << ", current transferred_bytes: " << transferred_bytes; + if (ec.value() == 14 /* Bad address */) { + LOG(FATAL) << "Unable to transfer GPU memory vis TCP " + "transport without CUDA support. " + "Please rebuild the Python wheel with " + "-DUSE_CUDA=ON"; + } else { + LOG(ERROR) << "Session::writeBody failed. " + << "Attempt to write data " << addr + << " using buffer " << dram_buffer + << ". Error: " << ec.message() + << " (value: " << ec.value() << ")" + << ", total_transferred_bytes_: " + << total_transferred_bytes_ + << ", current transferred_bytes: " + << transferred_bytes; + } if (on_finalize_) on_finalize_(TransferStatusEnum::FAILED); session_mutex_.unlock(); return; @@ -201,32 +294,34 @@ struct Session : public std::enable_shared_from_this { char *dram_buffer = addr + total_transferred_bytes_; -#if defined(USE_CUDA) || defined(USE_MUSA) bool is_cuda_memory = isCudaMemory(addr); if (is_cuda_memory) { dram_buffer = new char[buffer_size]; } -#else - bool is_cuda_memory = false; -#endif asio::async_read( socket_, asio::buffer(dram_buffer, buffer_size), [this, addr, dram_buffer, is_cuda_memory, self]( const asio::error_code &ec, std::size_t transferred_bytes) { if (ec) { - LOG(ERROR) - << "Session::readBody failed. " - << "Attempt to read data " << addr << " using buffer " - << dram_buffer << ". Error: " << ec.message() - << " (value: " << ec.value() << ")" - << ", total_transferred_bytes_: " - << total_transferred_bytes_ - << ", current transferred_bytes: " << transferred_bytes; + if (ec.value() == 14 /* Bad address */) { + LOG(FATAL) << "Unable to transfer GPU memory vis TCP " + "transport without CUDA support. " + "Please rebuild the Python wheel with " + "-DUSE_CUDA=ON"; + } else { + LOG(ERROR) << "Session::readBody failed. " + << "Attempt to read data " << addr + << " using buffer " << dram_buffer + << ". Error: " << ec.message() + << " (value: " << ec.value() << ")" + << ", total_transferred_bytes_: " + << total_transferred_bytes_ + << ", current transferred_bytes: " + << transferred_bytes; + } if (on_finalize_) on_finalize_(TransferStatusEnum::FAILED); -#if defined(USE_CUDA) || defined(USE_MUSA) if (is_cuda_memory) delete[] dram_buffer; -#endif session_mutex_.unlock(); return; } @@ -234,6 +329,13 @@ struct Session : public std::enable_shared_from_this { cudaMemcpy(addr + total_transferred_bytes_, dram_buffer, transferred_bytes, cudaMemcpyDefault); if (is_cuda_memory) delete[] dram_buffer; +#else + if (isCudaMemory(addr)) { + auto &gpu = GpuRuntime::instance(); + gpu.copy(addr + total_transferred_bytes_, dram_buffer, + transferred_bytes, 4); + if (is_cuda_memory) delete[] dram_buffer; + } #endif total_transferred_bytes_ += transferred_bytes; readBody();