1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18+ #include < boost/range/algorithm/sort.hpp>
19+
1820using namespace facebook ::velox::exec;
1921using namespace facebook ::velox;
2022
@@ -39,15 +41,32 @@ inline std::string createShuffleFileName(
3941 id);
4042}
4143
42- // This file is used to indicate that the shuffle system is ready to be used for
43- // reading (acts as a sync point between readers if needed). Mostly used for
44- // test purposes.
45- const static std::string kReadyForReadFilename = " readyForRead" ;
44+ inline void appendShuffleRow (
45+ char * FOLLY_NONNULL buffer,
46+ std::string_view sortKey,
47+ std::string_view data) {
48+ // Write memory layout: [keySize][dataSize][key][data]
49+ *reinterpret_cast <TRowSize*>(buffer) =
50+ folly::Endian::big (static_cast <TRowSize>(sortKey.size ()));
51+ buffer += kUint32Size ;
52+
53+ *reinterpret_cast <TRowSize*>(buffer) =
54+ folly::Endian::big (static_cast <TRowSize>(data.size ()));
55+ buffer += kUint32Size ;
56+
57+ if (!sortKey.empty ()) {
58+ memcpy (buffer, sortKey.data (), sortKey.size ());
59+ buffer += sortKey.size ();
60+ }
61+
62+ memcpy (buffer, data.data (), data.size ());
63+ }
4664} // namespace
4765
66+
4867LocalShuffleWriter::LocalShuffleWriter (
49- const std::string& rootPath,
50- const std::string& queryId,
68+ std::string rootPath,
69+ std::string queryId,
5170 uint32_t shuffleId,
5271 uint32_t numPartitions,
5372 uint64_t maxBytesPerPartition,
@@ -59,14 +78,54 @@ LocalShuffleWriter::LocalShuffleWriter(
5978 rootPath_(std::move(rootPath)),
6079 queryId_(std::move(queryId)),
6180 shuffleId_(shuffleId) {
62- // Use resize/assign instead of resize(size, val).
63- inProgressPartitions_.resize (numPartitions_);
6481 inProgressPartitions_.assign (numPartitions_, nullptr );
65- inProgressSizes_.resize (numPartitions_);
6682 inProgressSizes_.assign (numPartitions_, 0 );
83+ inProgressRowOffsets_.assign (numPartitions_, {});
6784 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
6885}
6986
87+ void LocalShuffleWriter::writeBlock (int32_t partition) {
88+ auto & buffer = inProgressPartitions_[partition];
89+ const auto bufferSize = inProgressSizes_[partition];
90+ if (!buffer || bufferSize == 0 ) {
91+ return ;
92+ }
93+
94+ auto file = getNextOutputFile (partition);
95+ auto & offsets = inProgressRowOffsets_[partition];
96+
97+ const bool needsSorting = !offsets.empty () && offsets[0 ].keySize > 0 ;
98+
99+ if (needsSorting) {
100+ const char * sourceData = buffer->as <char >();
101+ sortRows (offsets, sourceData);
102+
103+ for (const auto & offset : offsets) {
104+ const char * rowStart = sourceData + offset.keyOffset - (kUint32Size * 2 );
105+ const size_t rowLen =
106+ (kUint32Size * 2 ) + offset.keySize + offset.dataSize ;
107+ file->append (std::string_view (rowStart, rowLen));
108+ }
109+ } else {
110+ file->append (std::string_view (buffer->as <char >(), bufferSize));
111+ }
112+ file->close ();
113+
114+ // Reset for buffer reuse
115+ inProgressSizes_[partition] = 0 ;
116+ inProgressRowOffsets_[partition].clear ();
117+ }
118+
119+ void LocalShuffleWriter::sortRows (
120+ std::vector<RowOffset>& offsets,
121+ const char * bufferData) {
122+ boost::range::sort (
123+ offsets, [bufferData](const RowOffset& lhs, const RowOffset& rhs) {
124+ return detail::compareKeys (
125+ lhs.getKey (bufferData), rhs.getKey (bufferData));
126+ });
127+ }
128+
70129std::unique_ptr<velox::WriteFile> LocalShuffleWriter::getNextOutputFile (
71130 int32_t partition) {
72131 auto filename = nextAvailablePartitionFileName (rootPath_, partition);
@@ -92,49 +151,42 @@ std::string LocalShuffleWriter::nextAvailablePartitionFileName(
92151 return filename;
93152}
94153
95- void LocalShuffleWriter::storePartitionBlock (int32_t partition) {
96- auto & buffer = inProgressPartitions_[partition];
97- auto file = getNextOutputFile (partition);
98- file->append (
99- std::string_view (buffer->as <char >(), inProgressSizes_[partition]));
100- file->close ();
101- inProgressPartitions_[partition].reset ();
102- inProgressSizes_[partition] = 0 ;
103- }
104-
105154void LocalShuffleWriter::collect (
106155 int32_t partition,
107- std::string_view /* key */ ,
156+ std::string_view key,
108157 std::string_view data) {
109- using TRowSize = uint32_t ;
158+ VELOX_CHECK_LT (partition, numPartitions_);
159+
160+ const auto rowSize = (kUint32Size * 2 ) + key.size () + data.size ();
110161
111162 auto & buffer = inProgressPartitions_[partition];
112- const TRowSize rowSize = data.size ();
113- const auto size = sizeof (TRowSize) + rowSize;
114-
115- // Check if there is enough space in the buffer.
116- if ((buffer != nullptr ) &&
117- (inProgressSizes_[partition] + size >= buffer->capacity ())) {
118- storePartitionBlock (partition);
119- // NOTE: the referenced 'buffer' will be reset in storePartitionBlock.
120- }
121163
122- // Allocate buffer if needed.
123164 if (buffer == nullptr ) {
124165 buffer = AlignedBuffer::allocate<char >(
125- std::max ((uint64_t )size, maxBytesPerPartition_), pool_);
166+ std::max (static_cast <uint64_t >(rowSize), maxBytesPerPartition_),
167+ pool_,
168+ 0 );
126169 inProgressSizes_[partition] = 0 ;
127- inProgressPartitions_[partition] = buffer;
170+ } else if (inProgressSizes_[partition] + rowSize >= buffer->capacity ()) {
171+ writeBlock (partition);
128172 }
129173
130- // Copy data.
131- auto offset = inProgressSizes_[partition];
132- auto rawBuffer = buffer->asMutable <char >() + offset;
174+ auto * rawBuffer = buffer->asMutable <char >();
175+ auto * writePos = rawBuffer + inProgressSizes_[partition];
133176
134- *(TRowSize*)(rawBuffer) = folly::Endian::big (rowSize);
135- ::memcpy (rawBuffer + sizeof (TRowSize), data.data(), rowSize);
177+ appendShuffleRow (writePos, key, data);
178+ const auto currentOffset = inProgressSizes_[partition];
179+ const auto keyOffset = currentOffset + (kUint32Size * 2 );
180+ const auto dataOffset = keyOffset + key.size ();
136181
137- inProgressSizes_[partition] += size;
182+ inProgressRowOffsets_[partition].emplace_back (
183+ RowOffset{
184+ .keyOffset = keyOffset,
185+ .dataOffset = dataOffset,
186+ .keySize = static_cast <uint32_t >(key.size ()),
187+ .dataSize = static_cast <uint32_t >(data.size ())});
188+
189+ inProgressSizes_[partition] += rowSize;
138190}
139191
140192void LocalShuffleWriter::noMoreData (bool success) {
@@ -144,27 +196,24 @@ void LocalShuffleWriter::noMoreData(bool success) {
144196 }
145197 for (auto i = 0 ; i < numPartitions_; ++i) {
146198 if (inProgressSizes_[i] > 0 ) {
147- storePartitionBlock (i);
199+ writeBlock (i);
148200 }
149201 }
150202}
151-
152203LocalShuffleReader::LocalShuffleReader (
153- const std::string& rootPath,
154- const std::string& queryId,
204+ std::string rootPath,
205+ std::string queryId,
155206 std::vector<std::string> partitionIds,
156207 velox::memory::MemoryPool* pool)
157- : rootPath_(rootPath),
158- queryId_(queryId),
208+ : rootPath_(std::move( rootPath) ),
209+ queryId_(std::move( queryId) ),
159210 partitionIds_(std::move(partitionIds)),
160211 pool_(pool) {
161212 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
162213}
163214
164215folly::SemiFuture<std::vector<std::unique_ptr<ReadBatch>>>
165216LocalShuffleReader::next (uint64_t maxBytes) {
166- using TRowSize = uint32_t ;
167-
168217 if (readPartitionFiles_.empty ()) {
169218 readPartitionFiles_ = getReadPartitionFiles ();
170219 }
@@ -188,21 +237,34 @@ LocalShuffleReader::next(uint64_t maxBytes) {
188237 ++readPartitionFileIndex_;
189238
190239 // Parse the buffer to extract individual rows.
191- // Each row is stored as: | row-size (4 bytes) | row- data (row-size bytes) |
240+ // Each row is stored as: [keySize][dataSize][key][ data]
192241 std::vector<std::string_view> rows;
193242 const char * data = buffer->as <char >();
194243 size_t offset = 0 ;
195244 const size_t totalSize = buffer->size ();
196245
197- while (offset + sizeof (TRowSize ) <= totalSize) {
246+ while (offset + ( kUint32Size * 2 ) <= totalSize) {
198247 // Read row size (stored in big endian).
199- const TRowSize rowSize = folly::Endian::big (*(TRowSize*)(data + offset));
200- offset += sizeof (TRowSize);
248+ const TRowSize keySize = folly::Endian::big (*reinterpret_cast <const TRowSize*>(data + offset));
249+
250+ offset += kUint32Size ;
251+ const TRowSize dataSize = folly::Endian::big (*reinterpret_cast <const TRowSize*>(data + offset));
252+ offset += kUint32Size ;
253+
254+ VELOX_CHECK_LE (
255+ offset + keySize + dataSize,
256+ totalSize,
257+ " Corrupted shuffle data: expected {} bytes for row (offset={}, keySize={}, dataSize={}) but only {} bytes available in buffer" ,
258+ offset + keySize + dataSize,
259+ offset,
260+ keySize,
261+ dataSize,
262+ totalSize);
201263
202- VELOX_CHECK_LE (offset + rowSize, totalSize, " Invalid row data: row size" );
203264 // Create a Row with empty key and the row data as value.
204- rows.emplace_back (std::string_view{data + offset, rowSize});
205- offset += rowSize;
265+ offset += keySize; // Skip sort key
266+ rows.emplace_back (data + offset, dataSize);
267+ offset += dataSize;
206268 }
207269
208270 totalBytes += fileSize;
@@ -234,7 +296,7 @@ std::vector<std::string> LocalShuffleReader::getReadPartitionFiles() const {
234296 fmt::format (" {}/{}_{}_" , trimmedRootPath, queryId_, partitionId);
235297 auto files = fileSystem_->list (fmt::format (" {}/" , rootPath_));
236298 for (const auto & file : files) {
237- if (file.find (prefix) == 0 ) {
299+ if (file.starts_with (prefix)) {
238300 partitionFiles.push_back (file);
239301 }
240302 }
0 commit comments