Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 134 additions & 73 deletions ucm/integration/vllm/uc_connector.py

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion ucm/store/dramstore/cpy/dramstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ class DRAMStorePy : public DRAMStore {
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
std::vector<uintptr_t> addr_vec;
for (auto addr_item : address->cast<py::list>()) {
addr_vec.push_back(addr_item.cast<uintptr_t>());
}
task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
std::move(addr_vec), length->cast<size_t>());
blockId++;
offset++;
address++;
Expand Down
6 changes: 5 additions & 1 deletion ucm/store/localstore/cpy/localstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ class LocalStorePy : public LocalStore {
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
std::vector<uintptr_t> addr_vec;
for (auto addr_item : address->cast<py::list>()) {
addr_vec.push_back(addr_item.cast<uintptr_t>());
}
task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
std::move(addr_vec), length->cast<size_t>());
blockId++;
offset++;
address++;
Expand Down
24 changes: 18 additions & 6 deletions ucm/store/nfsstore/cc/domain/trans/posix_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,13 @@ Status PosixQueue::D2S(Task::Shard& shard, const Device& device)
return Status::OutOfMemory();
}
auto hub = shard.buffer.get();
auto status = device->D2HSync((std::byte*)hub, (std::byte*)shard.address, shard.length);
std::vector<std::byte*> dAddr(shard.address.size());
std::vector<std::byte*> hAddr(shard.address.size());
for (size_t i = 0; i < shard.address.size(); i++) {
hAddr[i] = (std::byte*)hub + i * shard.length / shard.address.size();
dAddr[i] = (std::byte*)shard.address[i];
}
auto status = device->D2HBatchSync(hAddr.data(), const_cast<const std::byte**>(dAddr.data()), shard.address.size(), shard.length / shard.address.size());
if (status.Failure()) { return status; }
auto path = this->layout_->DataFilePath(shard.block, true);
return File::Write(path, shard.offset, shard.length, (uintptr_t)hub);
Expand All @@ -120,21 +126,27 @@ Status PosixQueue::S2D(Task::Shard& shard, const Device& device)
auto path = this->layout_->DataFilePath(shard.block, false);
auto status = File::Read(path, shard.offset, shard.length, (uintptr_t)hub);
if (status.Failure()) { return status; }
return device->H2DAsync((std::byte*)shard.address, (std::byte*)hub, shard.length);
std::vector<std::byte*> dAddr(shard.address.size());
std::vector<std::byte*> hAddr(shard.address.size());
for (size_t i = 0; i < shard.address.size(); i++) {
hAddr[i] = (std::byte*)hub + i * shard.length / shard.address.size();
dAddr[i] = (std::byte*)shard.address[i];
}
return device->H2DBatchSync(dAddr.data(), const_cast<const std::byte**>(hAddr.data()), shard.address.size(), shard.length / shard.address.size());
}

Status PosixQueue::H2S(Task::Shard& shard)
{
auto path = this->layout_->DataFilePath(shard.block, true);
auto aligned = IsAligned(shard.offset) && IsAligned(shard.length) && IsAligned(shard.address);
return File::Write(path, shard.offset, shard.length, shard.address, aligned);
auto aligned = IsAligned(shard.offset) && IsAligned(shard.length) && IsAligned(shard.address[0]);
return File::Write(path, shard.offset, shard.length, shard.address[0], aligned);
}

Status PosixQueue::S2H(Task::Shard& shard)
{
auto path = this->layout_->DataFilePath(shard.block, false);
auto aligned = IsAligned(shard.offset) && IsAligned(shard.length) && IsAligned(shard.address);
return File::Read(path, shard.offset, shard.length, shard.address, aligned);
auto aligned = IsAligned(shard.offset) && IsAligned(shard.length) && IsAligned(shard.address[0]);
return File::Read(path, shard.offset, shard.length, shard.address[0], aligned);
}

} // namespace UC
8 changes: 5 additions & 3 deletions ucm/store/nfsstore/cpy/nfsstore.py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ class NFSStorePy : public NFSStore {
auto length = lengths.begin();
while ((blockId != blockIds.end()) && (offset != offsets.end()) &&
(address != addresses.end()) && (length != lengths.end())) {
std::vector<uintptr_t> addr_vec;
for (auto addr_item : address->cast<py::list>()) {
addr_vec.push_back(addr_item.cast<uintptr_t>());
}
task.Append(blockId->cast<std::string>(), offset->cast<size_t>(),
address->cast<uintptr_t>(), length->cast<size_t>());
std::move(addr_vec), length->cast<size_t>());
blockId++;
offset++;
address++;
Expand Down Expand Up @@ -123,8 +127,6 @@ PYBIND11_MODULE(ucmnfsstore, module)
config.def_readwrite("transferBufferNumber", &UC::NFSStorePy::Config::transferBufferNumber);
config.def_readwrite("transferTimeoutMs", &UC::NFSStorePy::Config::transferTimeoutMs);
config.def_readwrite("tempDumpDirEnable", &UC::NFSStorePy::Config::tempDumpDirEnable);
config.def_readwrite("hotnessEnable", &UC::NFSStorePy::Config::hotnessEnable);
config.def_readwrite("hotnessInterval", &UC::NFSStorePy::Config::hotnessInterval);
store.def(py::init<>());
store.def("CCStoreImpl", &UC::NFSStorePy::CCStoreImpl);
store.def("Setup", &UC::NFSStorePy::Setup);
Expand Down
5 changes: 3 additions & 2 deletions ucm/store/nfsstore/nfsstore_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, config: Dict):
if transfer_enable:
param.transferDeviceId = config["device"]
param.transferIoSize = config["io_size"]
param.transferStreamNumber = config.get("transfer_stream_number", 128)
ret = self.store.Setup(param)
if ret != 0:
msg = f"Failed to initialize ucmnfsstore, errcode: {ret}."
Expand Down Expand Up @@ -92,7 +93,7 @@ def fetch_data(
self,
block_ids: List[str],
offset: List[int],
dst_addr: List[int],
dst_addr: List[List[int]],
size: List[int],
) -> Task:
task_id = self.store.LoadToDevice(block_ids, offset, dst_addr, size)
Expand All @@ -102,7 +103,7 @@ def dump_data(
self,
block_ids: List[str],
offset: List[int],
src_addr: List[int],
src_addr: List[List[int]],
size: List[int],
) -> Task:
task_id = self.store.DumpFromDevice(block_ids, offset, src_addr, size)
Expand Down
6 changes: 3 additions & 3 deletions ucm/store/task/task_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ class Task {
Location location;
std::string block;
size_t offset;
uintptr_t address;
std::vector<uintptr_t> address;
size_t length;
size_t owner;
std::shared_ptr<void> buffer;
std::function<void(void)> done;
Shard(const Type type, const Location location, const std::string& block,
const size_t offset, const uintptr_t address, const size_t length, const size_t owner)
const size_t offset, const std::vector<uintptr_t> address, const size_t length, const size_t owner)
: type{type}, location{location}, block{block}, offset{offset}, address{address},
length{length}, owner{owner}, buffer{nullptr}, done{nullptr}
{
Expand Down Expand Up @@ -86,7 +86,7 @@ class Task {
auto Id() const noexcept { return id_; }
auto StartTp() const noexcept { return startTp_; }
auto Str() const noexcept { return fmt::format("{},{},{},{}", id_, brief_, number_, size_); }
void Append(const std::string& block, const size_t offset, const uintptr_t address,
void Append(const std::string& block, const size_t offset, const std::vector<uintptr_t> address,
const size_t length)
{
shards_.emplace_back(type_, location_, block, offset, address, length, id_);
Expand Down
4 changes: 2 additions & 2 deletions ucm/store/ucmstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def fetch_data(
self,
block_ids: List[str],
offset: List[int],
dst_addr: List[int],
dst_addr: List[List[int]],
size: List[int],
) -> Task:
"""
Expand All @@ -150,7 +150,7 @@ def dump_data(
self,
block_ids: List[str],
offset: List[int],
src_addr: List[int],
src_addr: List[List[int]],
size: List[int],
) -> Task:
"""
Expand Down