Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tile-based API #145

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,18 @@ class Connection {
virtual void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) = 0;

/// Write data from a source @ref RegisteredMemory to a destination @ref RegisteredMemory in a 2D fashion.
///
/// @param dst The destination @ref RegisteredMemory.
/// @param dstOffset The offset in bytes from the start of the destination @ref RegisteredMemory.
/// @param dstPitch The pitch of the destination @ref RegisteredMemory in bytes.
/// @param src The source @ref RegisteredMemory.
/// @param srcOffset The offset in bytes from the start of the source @ref RegisteredMemory.
/// @param srcPitch The pitch of the source @ref RegisteredMemory in bytes.
/// @param width The width of the 2D region to write in bytes.
/// @param height The height of the 2D region.
virtual void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src,
uint64_t srcOffset, uint64_t srcPitch, uint64_t width, uint64_t height) = 0;
/// Update a 8-byte value in a destination @ref RegisteredMemory and synchronize the change with the remote process.
///
/// @param dst The destination @ref RegisteredMemory.
Expand Down
9 changes: 9 additions & 0 deletions include/mscclpp/proxy_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ class ProxyService : public BaseProxyService {
ProxyService();

/// Build and add a semaphore to the proxy service.
/// @param communicator The communicator for bootstrapping.
/// @param connection The connection associated with the semaphore.
/// @return The ID of the semaphore.
SemaphoreId buildAndAddSemaphore(Communicator& communicator, std::shared_ptr<Connection> connection);

/// Build and add a semaphore with pitch to the proxy service. This is used for 2D transfers.
/// @param communicator The communicator for bootstrapping.
/// @param connection The connection associated with the channel.
/// @param pitch The pitch pair.
SemaphoreId buildAndAddSemaphore(Communicator& communicator, std::shared_ptr<Connection> connection,
std::pair<uint64_t, uint64_t> pitch);

/// Add a semaphore to the proxy service.
/// @param semaphore The semaphore to be added
/// @return The ID of the semaphore.
Expand Down Expand Up @@ -62,6 +70,7 @@ class ProxyService : public BaseProxyService {
private:
std::vector<std::shared_ptr<Host2DeviceSemaphore>> semaphores_;
std::vector<RegisteredMemory> memories_;
std::vector<std::pair<uint64_t, uint64_t>> pitches_;
Proxy proxy_;
int deviceNumaNode;

Expand Down
117 changes: 117 additions & 0 deletions include/mscclpp/proxy_channel_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const TriggerType TriggerSync = 0x4; // Trigger a flush.
#define MSCCLPP_BITS_CONNID 10
#define MSCCLPP_BITS_FIFO_RESERVED 1

#define MSCCLPP_BITS_WIDTH_SIZE 16
#define MSCCLPP_BITS_HEIGHT_SIZE 16
#define MSCCLPP_2D_FLAG 1

/// Basic structure of each work element in the FIFO.
union ChannelTrigger {
ProxyTrigger value;
Expand All @@ -47,6 +51,25 @@ union ChannelTrigger {
uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED;
} fields;

struct {
// First 64 bits: value[0]
uint64_t width : MSCCLPP_BITS_WIDTH_SIZE;
uint64_t height : MSCCLPP_BITS_HEIGHT_SIZE;
uint64_t srcOffset : MSCCLPP_BITS_OFFSET;
uint64_t
: (64 - MSCCLPP_BITS_WIDTH_SIZE - MSCCLPP_BITS_HEIGHT_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment
// Second 64 bits: value[1]
uint64_t dstOffset : MSCCLPP_BITS_OFFSET;
uint64_t srcMemoryId : MSCCLPP_BITS_REGMEM_HANDLE;
uint64_t dstMemoryId : MSCCLPP_BITS_REGMEM_HANDLE;
uint64_t type : MSCCLPP_BITS_TYPE;
uint64_t chanId : MSCCLPP_BITS_CONNID;
uint64_t multiDimensionFlag : MSCCLPP_2D_FLAG;
uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_TYPE -
MSCCLPP_BITS_CONNID - MSCCLPP_2D_FLAG - MSCCLPP_BITS_FIFO_RESERVED); // ensure 64-bit alignment
uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED;
} fields2D;

#ifdef __CUDACC__
/// Default constructor.
__forceinline__ __device__ ChannelTrigger() {}
Expand All @@ -71,6 +94,27 @@ union ChannelTrigger {
<< MSCCLPP_BITS_OFFSET) +
dstOffset);
}

/// Constructor.
/// @param type The type of the trigger.
/// @param dst The destination memory region.
/// @param dstOffset The offset into the destination memory region.
/// @param src The source memory region.
/// @param srcOffset The offset into the source memory region.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
/// @param semaphoreId The ID of the semaphore.
__device__ ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset,
uint64_t width, uint64_t height, int semaphoreId) {
value.fst = (((srcOffset << MSCCLPP_BITS_HEIGHT_SIZE) + height) << MSCCLPP_BITS_WIDTH_SIZE) + width;
value.snd = ((((((((((1ULL << MSCCLPP_BITS_CONNID) + semaphoreId) << MSCCLPP_BITS_TYPE) + type)
<< MSCCLPP_BITS_REGMEM_HANDLE) +
dst)
<< MSCCLPP_BITS_REGMEM_HANDLE) +
src)
<< MSCCLPP_BITS_OFFSET) +
dstOffset);
}
#endif // __CUDACC__
};

Expand Down Expand Up @@ -104,6 +148,28 @@ struct ProxyChannelDeviceHandle {
put(dst, offset, src, offset, size);
}

/// @brief Push a @ref TriggerData to the FIFO.
/// @param dst The destination memory region.
/// @param dstOffset The offset into the destination memory region.
/// @param src The source memory region.
/// @param srcOffset The offset into the source memory region.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2D(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset,
uint32_t width, uint32_t height) {
fifo_.push(ChannelTrigger(TriggerData, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value);
}

/// @brief Push a @ref TriggerData to the FIFO.
/// @param dst The destination memory region.
/// @param src The source memory region.
/// @param offset The common offset into the destination and source memory regions.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2D(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, uint32_t height) {
put2D(dst, offset, src, offset, width, height);
}

/// Push a @ref TriggerFlag to the FIFO.
__forceinline__ __device__ void signal() {
fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, semaphoreId_).value);
Expand All @@ -120,6 +186,19 @@ struct ProxyChannelDeviceHandle {
fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value);
}

/// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO.
/// @param dst The destination memory region.
/// @param dstOffset The offset into the destination memory region.
/// @param src The source memory region.
/// @param srcOffset The offset into the source memory region.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2DWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset,
uint32_t width, uint32_t height) {
fifo_.push(
ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value);
}

/// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO.
/// @param dst The destination memory region.
/// @param src The source memory region.
Expand All @@ -129,6 +208,17 @@ struct ProxyChannelDeviceHandle {
putWithSignal(dst, offset, src, offset, size);
}

/// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO.
/// @param dst The destination memory region.
/// @param src The source memory region.
/// @param offset The common offset into the destination and source memory regions.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2DWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width,
uint32_t height) {
put2DWithSignal(dst, offset, src, offset, width, height);
}

/// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO.
/// @param dst The destination memory region.
/// @param dstOffset The offset into the destination memory region.
Expand Down Expand Up @@ -178,6 +268,15 @@ struct SimpleProxyChannelDeviceHandle {
proxyChan_.put(dst_, dstOffset, src_, srcOffset, size);
}

/// Push a @ref TriggerData to the FIFO.
/// @param dstOffset The offset into the destination memory region.
/// @param srcOffset The offset into the source memory region.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2D(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, uint32_t height) {
proxyChan_.put2D(dst_, dstOffset, src_, srcOffset, width, height);
}

/// Push a @ref TriggerData to the FIFO.
/// @param offset The common offset into the destination and source memory regions.
/// @param size The size of the transfer.
Expand All @@ -194,11 +293,29 @@ struct SimpleProxyChannelDeviceHandle {
proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size);
}

/// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO.
/// @param dstOffset The offset into the destination memory region.
/// @param srcOffset The offset into the source memory region.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2DWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint32_t width,
uint32_t height) {
proxyChan_.put2DWithSignal(dst_, dstOffset, src_, srcOffset, width, height);
}

/// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO.
/// @param offset The common offset into the destination and source memory regions.
/// @param size The size of the transfer.
__forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); }

/// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO.
/// @param offset The common offset into the destination and source memory regions.
/// @param width The width of the 2D region.
/// @param height The height of the 2D region.
__forceinline__ __device__ void put2DWithSignal(uint64_t offset, uint32_t width, uint32_t height) {
put2DWithSignal(offset, offset, width, height);
}

/// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO.
/// @param dstOffset The offset into the destination memory region.
/// @param srcOffset The offset into the source memory region.
Expand Down
9 changes: 8 additions & 1 deletion python/proxy_channel_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

#include <nanobind/nanobind.h>
#include <nanobind/stl/pair.h>
#include <nanobind/stl/shared_ptr.h>
#include <nanobind/stl/string.h>

Expand All @@ -19,7 +20,13 @@ void register_proxy_channel(nb::module_& m) {
.def(nb::init<>())
.def("start_proxy", &ProxyService::startProxy)
.def("stop_proxy", &ProxyService::stopProxy)
.def("build_and_add_semaphore", &ProxyService::buildAndAddSemaphore, nb::arg("comm"), nb::arg("connection"))
.def("build_and_add_semaphore",
nb::overload_cast<Communicator&, std::shared_ptr<Connection>>(&ProxyService::buildAndAddSemaphore),
nb::arg("comm"), nb::arg("connection"))
.def("build_and_add_semaphore",
nb::overload_cast<Communicator&, std::shared_ptr<Connection>, std::pair<uint64_t, uint64_t>>(
&ProxyService::buildAndAddSemaphore),
nb::arg("comm"), nb::arg("connection"), nb::arg("pitch"))
.def("add_semaphore", &ProxyService::addSemaphore, nb::arg("semaphore"))
.def("add_memory", &ProxyService::addMemory, nb::arg("memory"))
.def("semaphore", &ProxyService::semaphore, nb::arg("id"))
Expand Down
19 changes: 19 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register
// npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size);
}

void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src,
uint64_t srcOffset, uint64_t srcPitch, uint64_t width, uint64_t height) {
validateTransport(dst, remoteTransport());
validateTransport(src, transport());

char* dstPtr = (char*)dst.data();
char* srcPtr = (char*)src.data();

MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dstPitch, srcPtr + srcOffset, srcPitch, width, height,
cudaMemcpyDeviceToDevice, stream_));
INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu",
srcPtr + srcOffset, dstPtr + dstOffset, width, height, dstPitch, srcPitch);
}

void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
validateTransport(dst, remoteTransport());
uint64_t oldValue = *src;
Expand Down Expand Up @@ -131,6 +145,11 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem
// npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size);
}

void IBConnection::write2D(RegisteredMemory, uint64_t, uint64_t, RegisteredMemory, uint64_t, uint64_t, uint64_t,
uint64_t) {
throw Error("write2D is not supported", ErrorCode::InvalidUsage);
}

void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) {
validateTransport(dst, remoteTransport());
auto dstTransportInfo = getRegisteredMemoryImpl(dst)->getTransportInfo(remoteTransport());
Expand Down
4 changes: 4 additions & 0 deletions src/include/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class CudaIpcConnection : public ConnectionBase {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;
void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, uint64_t srcOffset,
uint64_t srcPitch, uint64_t width, uint64_t height) override;
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void flush(int64_t timeoutUsec) override;
Expand All @@ -65,6 +67,8 @@ class IBConnection : public ConnectionBase {

void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;
void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, uint64_t srcOffset,
uint64_t srcPitch, uint64_t width, uint64_t height) override;
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;

void flush(int64_t timeoutUsec) override;
Expand Down
20 changes: 18 additions & 2 deletions src/proxy_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& com
return semaphores_.size() - 1;
}

MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make much sense to me, why do we need an extra an way of building a semaphore? We only need to provide a 2D write over 1D arrays. So, just a 2D write is enough. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to set pitch/stride for the channel (the name semaphore is not accurate). The reason we don't set stride in the put2D API is our trigger is only 128bit. We don't have extra bits for the it.

std::shared_ptr<Connection> connection,
std::pair<uint64_t, uint64_t> pitch) {
semaphores_.push_back(std::make_shared<Host2DeviceSemaphore>(communicator, connection));
SemaphoreId id = semaphores_.size() - 1;
if (id >= pitches_.size()) pitches_.resize(id + 1, std::pair<uint64_t, uint64_t>(0, 0));
pitches_[id] = pitch;
return id;
}

MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr<Host2DeviceSemaphore> semaphore) {
semaphores_.push_back(semaphore);
return semaphores_.size() - 1;
Expand Down Expand Up @@ -67,8 +77,14 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) {
if (trigger->fields.type & TriggerData) {
RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId];
RegisteredMemory& src = memories_[trigger->fields.srcMemoryId];
semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset,
trigger->fields.size);
if (trigger->fields2D.multiDimensionFlag) {
std::pair<uint64_t, uint64_t>& pitch = pitches_.at(trigger->fields.chanId);
semaphore->connection()->write2D(dst, trigger->fields.dstOffset, pitch.first, src, trigger->fields.srcOffset,
pitch.second, trigger->fields2D.width, trigger->fields2D.height);
} else {
semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset,
trigger->fields.size);
}
}

if (trigger->fields.type & TriggerFlag) {
Expand Down
Loading