Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions mooncake-integration/store/store_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,16 @@ class MooncakeStorePyWrapper {
try {
// Section with GIL released
py::gil_scoped_release release_gil;
auto buffer_handle = store_.get_buffer(key);
if (!buffer_handle) {
py::gil_scoped_acquire acquire_gil;
return pybind11::none();
}
// Create contiguous buffer and copy data
auto total_length = buffer_handle->size();
char *exported_data = new char[total_length];
if (!exported_data) {
uint64_t total_length = 0;
auto get_result = store_.get_allocated_internal(key, total_length);
if (!get_result) {
py::gil_scoped_acquire acquire_gil;
LOG(ERROR) << "Invalid data format: insufficient data for "
"metadata";
return pybind11::none();
}
auto exported_data = *get_result;

// Copy metadata from buffer
TensorMetadata metadata;
// Copy data from buffer to contiguous memory
memcpy(exported_data, buffer_handle->ptr(), total_length);
memcpy(&metadata, exported_data, sizeof(TensorMetadata));

if (metadata.ndim < 0 || metadata.ndim > 4) {
Expand Down
11 changes: 11 additions & 0 deletions mooncake-store/include/client_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ std::vector<Slice> split_into_slices(BufferHandle& handle);
*/
uint64_t calculate_total_size(const Replica::Descriptor& replica);

/**
* @brief Allocate slices from a buffer pointer based on replica descriptor
* @param slices Output vector to store the allocated slices
* @param replica The replica descriptor defining the slice structure
* @param buffer The buffer pointer to allocate slices from
* @return 0 on success, non-zero on error
*/
int allocateSlices(std::vector<Slice>& slices,
const Replica::Descriptor& replica,
char* buffer);

/**
* @brief Allocate slices from a buffer handle based on replica descriptor
* @param slices Output vector to store the allocated slices
Expand Down
3 changes: 3 additions & 0 deletions mooncake-store/include/pybind_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ class PyClient {
const std::vector<std::span<const char>> &values,
const ReplicateConfig &config = ReplicateConfig{});

tl::expected<char *, ErrorCode> get_allocated_internal(
const std::string &key, uint64_t &data_length);

tl::expected<void, ErrorCode> remove_internal(const std::string &key);

tl::expected<long, ErrorCode> removeByRegex_internal(
Expand Down
14 changes: 10 additions & 4 deletions mooncake-store/src/client_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,14 @@ uint64_t calculate_total_size(const Replica::Descriptor& replica) {
}

int allocateSlices(std::vector<Slice>& slices,
const Replica::Descriptor& replica,
BufferHandle& buffer_handle) {
const Replica::Descriptor& replica, char* buffer) {
uint64_t offset = 0;
if (replica.is_memory_replica() == false) {
// For disk-based replica, split into slices based on file size
uint64_t total_length = replica.get_disk_descriptor().object_size;
while (offset < total_length) {
auto chunk_size = std::min(total_length - offset, kMaxSliceSize);
void* chunk_ptr = static_cast<char*>(buffer_handle.ptr()) + offset;
void* chunk_ptr = buffer + offset;
slices.emplace_back(Slice{chunk_ptr, chunk_size});
offset += chunk_size;
}
Expand All @@ -95,12 +94,19 @@ int allocateSlices(std::vector<Slice>& slices,
// descriptors
for (auto& handle :
replica.get_memory_descriptor().buffer_descriptors) {
void* chunk_ptr = static_cast<char*>(buffer_handle.ptr()) + offset;
void* chunk_ptr = buffer + offset;
slices.emplace_back(Slice{chunk_ptr, handle.size_});
offset += handle.size_;
}
}
return 0;
}

int allocateSlices(std::vector<Slice>& slices,
const Replica::Descriptor& replica,
BufferHandle& buffer_handle) {
return allocateSlices(slices, replica,
static_cast<char*>(buffer_handle.ptr()));
}

} // namespace mooncake
63 changes: 63 additions & 0 deletions mooncake-store/src/pybind_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1091,4 +1091,67 @@ int PyClient::put_from_with_metadata(const std::string &key, void *buffer,
return 0;
}

tl::expected<char *, ErrorCode> PyClient::get_allocated_internal(
const std::string &key, uint64_t &data_length) {
// Query object info first
auto query_result = client_->Query(key);
if (!query_result) {
LOG(ERROR) << "Query failed: " << query_result.error();
return tl::unexpected(query_result.error());
}

auto replica_list = query_result.value();
if (replica_list.empty()) {
LOG(INFO) << "No replicas found for key: " << key;
return tl::unexpected(ErrorCode::INVALID_KEY);
}

const auto &replica = replica_list[0];
uint64_t total_length = calculate_total_size(replica);
if (total_length == 0) {
LOG(ERROR) << "Zero length value for key: " << key;
return tl::unexpected(ErrorCode::INVALID_KEY);
}

// Create contiguous buffer to read data
char *data_ptr = new char[total_length];
if (!data_ptr) {
LOG(ERROR) << "Failed to allocate memory for length: " << total_length;
return tl::unexpected(ErrorCode::INTERNAL_ERROR);
}

// register the buffer
auto register_result = register_buffer_internal(
reinterpret_cast<void *>(data_ptr), total_length);
if (!register_result) {
LOG(ERROR) << "Failed to register buffer";
delete[] data_ptr;
return tl::unexpected(register_result.error());
}

// Create slices for the allocated buffer
std::vector<Slice> slices;
allocateSlices(slices, replica, data_ptr);

// Get the object data
auto get_result = client_->Get(key, replica_list, slices);

// unregister the buffer for whatever cases
auto unregister_result =
unregister_buffer_internal(reinterpret_cast<void *>(data_ptr));
if (!unregister_result) {
LOG(WARNING) << "Failed to unregister buffer";
}

if (!get_result) {
delete[] data_ptr;
LOG(ERROR) << "Get failed for key: " << key;
return tl::unexpected(get_result.error());
}

// return the data ptr transferring the ownership to the caller
data_length = total_length;
return data_ptr;
}

} // namespace mooncake
Loading