-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat: Impl sort key for LocalShuffleWriter #26547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR enhances LocalShuffleWriter to support sorting by custom sort keys before writing partitioned shuffle files, consolidates related utilities into LocalShuffleUtils, and updates tests to validate sorted shuffle output. Sequence diagram for sorted shuffle write in LocalShuffleWritersequenceDiagram
participant W as LocalShuffleWriter
participant F as FileSystem
participant U as LocalShuffleUtils
W->>W: collect(partition, key, data)
W->>W: ensurePartitionCapacity(partition, entrySize)
W->>W: writeShuffleEntity(writePos, key, data)
W->>W: inProgressRowOffsets_[partition].emplace_back(RowOffset)
alt Buffer full or flush triggered
W->>W: storePartitionBlock(partition)
alt isSorted_[partition] == true
W->>W: sortRowoffsetsByKey(offsets, buffer)
W->>U: rewriteSortedRows(offsets, buffer, pool)
U-->>W: sortedBuffer
W->>F: writeBufferToFile(file, sortedBuffer)
else not sorted
W->>F: writeBufferToFile(file, buffer)
end
end
ER diagram for new ParsedShuffleData structure in LocalShuffleUtilserDiagram
PARSED_SHUFFLE_DATA {
string_view keys
string_view data
}
PARSED_SHUFFLE_DATA ||--o| KEY : contains
PARSED_SHUFFLE_DATA ||--o| DATA : contains
KEY {
string_view key
}
DATA {
string_view data
}
Class diagram for updated LocalShuffleWriter and new LocalShuffleUtilsclassDiagram
class LocalShuffleWriter {
+LocalShuffleWriter(std::string rootPath, std::string queryId, uint32_t shuffleId, uint32_t numPartitions, uint64_t maxBytesPerPartition, velox::memory::MemoryPool* pool)
+void collect(int32_t partition, std::string_view key, std::string_view data)
+void noMoreData(bool success)
+void ensurePartitionCapacity(int32_t partition, size_t requiredSize)
+void sortRowoffsetsByKey(std::vector<RowOffset>& offsets, const char* buffer)
+static velox::BufferPtr rewriteSortedRows(const std::vector<RowOffset>& offsets, const char* sourceBuffer, velox::memory::MemoryPool* pool)
-std::vector<velox::BufferPtr> inProgressPartitions_
-std::vector<size_t> inProgressSizes_
-std::vector<std::vector<RowOffset>> inProgressRowOffsets_
-std::vector<bool> isSorted_
-std::shared_ptr<velox::filesystems::FileSystem> fileSystem_
}
class RowOffset {
+size_t keyOffset
+uint32_t keySize
+size_t dataOffset
+uint32_t dataSize
+std::string_view getKey(const char* buffer) const
+std::string_view getData(const char* buffer) const
}
class detail {
+bool compareKeys(std::string_view key1, std::string_view key2)
+ParsedShuffleData parseShuffleRows(const char* buffer, size_t totalSize)
}
class ParsedShuffleData {
+std::vector<std::string_view> keys
+std::vector<std::string_view> data
}
LocalShuffleWriter --> detail : uses
LocalShuffleWriter o-- RowOffset
detail o-- ParsedShuffleData
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The localShuffleWriterSortedOutput test in ShuffleTest relies on hardcoded substring positions to extract partition IDs from filenames—consider using a more robust API or regex to parse the file names instead of brittle offsets.
- The new sorted‐output integration test exposes internal buffer layout and runs unconditionally; you might want to guard it behind a debug‐only compile flag or move it into a dedicated unit test for LocalShuffleUtils.
- rewriteSortedRows allocates and copies a full new buffer before writing—consider streaming sorted entries directly into the output file in storePartitionBlock to reduce peak memory usage and extra copies.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The localShuffleWriterSortedOutput test in ShuffleTest relies on hardcoded substring positions to extract partition IDs from filenames—consider using a more robust API or regex to parse the file names instead of brittle offsets.
- The new sorted‐output integration test exposes internal buffer layout and runs unconditionally; you might want to guard it behind a debug‐only compile flag or move it into a dedicated unit test for LocalShuffleUtils.
- rewriteSortedRows allocates and copies a full new buffer before writing—consider streaming sorted entries directly into the output file in storePartitionBlock to reduce peak memory usage and extra copies.
## Individual Comments
### Comment 1
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:191` </location>
<code_context>
- const auto size = sizeof(TRowSize) + rowSize;
+ VELOX_CHECK_LT(
+ partition,
+ numPartitions_,
+ "Inalid parition {}, must less than {}",
+ partition,
</code_context>
<issue_to_address>
**nitpick (typo):** Typo in error message: 'Inalid parition' should be 'Invalid partition'.
Please update the error message to 'Invalid partition' for clarity.
```suggestion
"Invalid partition {}, must less than {}",
```
</issue_to_address>
### Comment 2
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:248` </location>
<code_context>
+ std::string_view key = offset.getKey(sourceBuffer);
+ std::string_view data = offset.getData(sourceBuffer);
+
+ // Wirte memroy layout: [keySize][dataSize][key][data]
+ *(TRowSize*)(writePos) =
+ folly::Endian::big(static_cast<TRowSize>(key.size()));
</code_context>
<issue_to_address>
**nitpick (typo):** Typo in comment: 'Wirte memroy' should be 'Write memory'.
In rewriteSortedRows, update the comment to 'Write memory layout: [keySize][dataSize][key][data]'.
```suggestion
// Write memory layout: [keySize][dataSize][key][data]
```
</issue_to_address>
### Comment 3
<location> `presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp:143` </location>
<code_context>
+ writeBufferToFile(file.get(), buffer->as<char>(), bufferSize);
+ }
+
+ inProgressPartitions_[partition].reset();
+ inProgressSizes_[partition] = 0;
+ isSorted_[partition] = false;
</code_context>
<issue_to_address>
**issue (bug_risk):** Resetting buffer without clearing row offsets for unsorted partitions may lead to stale data.
Offsets are only cleared for sorted partitions, so unsorted partitions may retain stale offsets if collect is called again. Clearing offsets for both cases would prevent this issue.
</issue_to_address>
### Comment 4
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1449-1450` </location>
<code_context>
+
+ // Create test data with keys and values for sorting
+ const size_t numRows = 6;
+ std::vector<std::string> keys = {
+ "key3", "key1", "key5", "key2", "key4", "key6"};
+ std::vector<std::string> values = {
+ "data3", "data1", "data5", "data2", "data4", "data6"};
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding edge cases for empty keys and duplicate keys in sorted shuffle test.
Adding tests for empty and duplicate keys will verify that the sorting logic correctly handles these cases.
Suggested implementation:
```cpp
// Create test data with keys and values for sorting, including edge cases
const size_t numRows = 9;
std::vector<std::string> keys = {
"key3", "key1", "key5", "key2", "key4", "key6", "", "key1", ""};
std::vector<std::string> values = {
"data3", "data1", "data5", "data2", "data4", "data6", "empty1", "dup1", "empty2"};
```
If the test later checks the sorted output, you may need to update the expected results to account for the new empty and duplicate keys. Make sure the rest of the test logic (e.g., assertions) is updated to reflect the new test data.
</issue_to_address>
### Comment 5
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1470` </location>
<code_context>
+ // Partition 1: key1, key2, key6 -> sorted: key1, key2, key6
+ for (size_t i = 0; i < numRows; ++i) {
+ int32_t partition = i % numPartitions;
+ writer->collect(partition, keys[i], values[i]);
+ }
+ writer->noMoreData(true);
</code_context>
<issue_to_address>
**suggestion (testing):** Please add a test case for error handling when partition index is out of bounds.
Please add a test that calls collect() with an invalid partition index and asserts the correct exception is thrown.
Suggested implementation:
```cpp
for (size_t i = 0; i < numRows; ++i) {
int32_t partition = i % numPartitions;
writer->collect(partition, keys[i], values[i]);
}
writer->noMoreData(true);
// Test error handling for invalid partition index
{
// Partition index out of bounds (too high)
EXPECT_THROW(
writer->collect(numPartitions, keys[0], values[0]),
std::exception
);
// Partition index out of bounds (negative)
EXPECT_THROW(
writer->collect(-1, keys[0], values[0]),
std::exception
);
}
```
- If your codebase uses a specific exception type for out-of-bounds partition indices (e.g., `std::out_of_range`), replace `std::exception` with the correct type in `EXPECT_THROW`.
- If the test file uses a test fixture or a specific test macro (e.g., `TEST_F`), consider moving this block into its own test function for clarity and isolation.
</issue_to_address>
### Comment 6
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1532-1538` </location>
<code_context>
+ << "Partition " << partition << " row " << i << " has wrong data";
+ }
+
+ // Verify keys are in sorted order
+ for (size_t i = 1; i < actualData.size(); ++i) {
+ EXPECT_TRUE(
+ detail::compareKeys(actualData[i - 1].first, actualData[i].first))
+ << "Keys not sorted in partition " << partition << ": key[" << (i - 1)
+ << "]=" << actualData[i - 1].first << " >= key[" << i
+ << "]=" << actualData[i].first;
+ }
+ }
</code_context>
<issue_to_address>
**suggestion (testing):** The sorted order assertion could be improved for clarity and robustness.
Additionally, compare the sequence of keys to a sorted copy of the input to ensure the entire order matches expectations and catch subtle sorting issues.
</issue_to_address>
### Comment 7
<location> `presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp:1288` </location>
<code_context>
cleanupDirectory(rootPath);
}
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for the case where maxBytesPerPartition is exceeded and multiple files are written per partition.
Please add a test with a smaller maxBytesPerPartition value to trigger buffer overflow, ensuring multiple files are written per partition and verifying correct data reading and sorting across all files.
Suggested implementation:
```cpp
TEST_F(ShuffleTest, MaxBytesPerPartition_Overflow_WritesMultipleFiles) {
// Setup: small maxBytesPerPartition to force multiple files per partition.
uint32_t numPartitions = 2;
uint32_t numMapDrivers = 1;
size_t maxBytesPerPartition = 128; // Small value to trigger overflow.
// Prepare test data: enough rows to exceed maxBytesPerPartition several times.
std::vector<std::string> testRows;
for (int i = 0; i < 100; ++i) {
testRows.push_back(fmt::format("row_{}", i));
}
// Shuffle write: simulate writing rows to shuffle with small buffer.
std::string rootPath = "/tmp/shuffle_test_overflow";
cleanupDirectory(rootPath);
// Create shuffle writer with small buffer.
ShuffleWriterOptions options;
options.numPartitions = numPartitions;
options.maxBytesPerPartition = maxBytesPerPartition;
options.rootPath = rootPath;
auto shuffleWriter = std::make_unique<ShuffleWriter>(options);
for (const auto& row : testRows) {
// Partition by even/odd for demonstration.
int partition = (row.back() % 2 == 0) ? 0 : 1;
shuffleWriter->writeRow(partition, row);
}
shuffleWriter->flush();
// Verify: multiple files per partition exist.
for (uint32_t p = 0; p < numPartitions; ++p) {
auto files = listPartitionFiles(rootPath, p);
ASSERT_GT(files.size(), 1) << "Expected multiple files for partition " << p;
}
// Read all data back and verify sorting.
std::vector<std::string> allRows;
for (uint32_t p = 0; p < numPartitions; ++p) {
auto files = listPartitionFiles(rootPath, p);
for (const auto& file : files) {
auto rows = readRowsFromFile(file);
allRows.insert(allRows.end(), rows.begin(), rows.end());
}
}
std::sort(allRows.begin(), allRows.end());
std::vector<std::string> expectedRows = testRows;
std::sort(expectedRows.begin(), expectedRows.end());
ASSERT_EQ(allRows, expectedRows);
cleanupDirectory(rootPath);
}
```
You may need to implement or adjust the following helper functions/types if they do not already exist:
- `ShuffleWriterOptions` struct/class and `ShuffleWriter` class to support `maxBytesPerPartition`.
- `listPartitionFiles(rootPath, partition)` to list all files for a given partition.
- `readRowsFromFile(file)` to read all rows from a file.
- `cleanupDirectory(rootPath)` to clean up test files.
Adjust partitioning logic and row writing as needed to match your actual shuffle writer API.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/tests/ShuffleTest.cpp
Outdated
Show resolved
Hide resolved
tanjialiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have a better description on what this PR is trying to do?
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
Summary: Impl sort key for LocalShuffleWriter Differential Revision: D86322593
01694af to
4ca1b2c
Compare
Summary: Impl sort key for LocalShuffleWriter Differential Revision: D86322593
4ca1b2c to
ad3f1da
Compare
Summary: Impl sort key for LocalShuffleWriter Differential Revision: D86322593
ad3f1da to
b75bf65
Compare
Summary: Impl sort key for LocalShuffleWriter Differential Revision: D86322593
b75bf65 to
fda2b29
Compare
Summary: Impl sort key for LocalShuffleWriter Differential Revision: D86322593
fda2b29 to
f0d270a
Compare
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Differential Revision: D86322593
f0d270a to
0aa42c0
Compare
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Differential Revision: D86322593
0aa42c0 to
8e94401
Compare
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Differential Revision: D86322593
8e94401 to
23cb6a8
Compare
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffleUtils.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffleUtils.h
Outdated
Show resolved
Hide resolved
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Differential Revision: D86322593
23cb6a8 to
ab1e9d2
Compare
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Differential Revision: D86322593
ab1e9d2 to
2debb04
Compare
tanjialiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % minors
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.h
Outdated
Show resolved
Hide resolved
presto-native-execution/presto_cpp/main/operators/LocalShuffle.cpp
Outdated
Show resolved
Hide resolved
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Reviewed By: tanjialiang Differential Revision: D86322593
2debb04 to
98ba27a
Compare
Summary: Adds sorting capability to LocalShuffleWriter to enable sorted shuffle operations in Presto native execution. When sort keys are provided, shuffle data is sorted before writing to disk. Reviewed By: tanjialiang Differential Revision: D86322593
98ba27a to
2668484
Compare
Summary: Impl sort key for LocalShuffleWriter
Differential Revision: D86322593