1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18+ #include < boost/range/algorithm/sort.hpp>
19+
1820using namespace facebook ::velox::exec;
1921using namespace facebook ::velox;
2022
@@ -39,15 +41,41 @@ inline std::string createShuffleFileName(
3941 id);
4042}
4143
42- // This file is used to indicate that the shuffle system is ready to be used for
43- // reading (acts as a sync point between readers if needed). Mostly used for
44- // test purposes.
45- const static std::string kReadyForReadFilename = " readyForRead" ;
44+ inline size_t rowSize (size_t keySize, size_t dataSize) noexcept {
45+ return (kUint32Size * 2 ) + keySize + dataSize;
46+ }
47+
48+ inline void appendShuffleRow (
49+ char * FOLLY_NONNULL buffer,
50+ std::string_view sortKey,
51+ std::string_view data) {
52+ // Write memory layout: [keySize][dataSize][key][data]
53+ *(TRowSize*)(buffer) =
54+ folly::Endian::big (static_cast <TRowSize>(sortKey.size ()));
55+ buffer += kUint32Size ;
56+
57+ *(TRowSize*)(buffer) = folly::Endian::big (static_cast <TRowSize>(data.size ()));
58+ buffer += kUint32Size ;
59+
60+ if (!sortKey.empty ()) {
61+ memcpy (buffer, sortKey.data (), sortKey.size ());
62+ buffer += sortKey.size ();
63+ }
64+
65+ memcpy (buffer, data.data (), data.size ());
66+ }
67+
68+ void writeBufferToFile (velox::WriteFile* file, const char * data, size_t size) {
69+ VELOX_CHECK_NOT_NULL (file);
70+ file->append (std::string_view (data, size));
71+ file->close ();
72+ }
73+
4674} // namespace
4775
4876LocalShuffleWriter::LocalShuffleWriter (
49- const std::string& rootPath,
50- const std::string& queryId,
77+ std::string rootPath,
78+ std::string queryId,
5179 uint32_t shuffleId,
5280 uint32_t numPartitions,
5381 uint64_t maxBytesPerPartition,
@@ -59,14 +87,47 @@ LocalShuffleWriter::LocalShuffleWriter(
5987 rootPath_(std::move(rootPath)),
6088 queryId_(std::move(queryId)),
6189 shuffleId_(shuffleId) {
62- // Use resize/assign instead of resize(size, val).
63- inProgressPartitions_.resize (numPartitions_);
6490 inProgressPartitions_.assign (numPartitions_, nullptr );
65- inProgressSizes_.resize (numPartitions_);
6691 inProgressSizes_.assign (numPartitions_, 0 );
92+ inProgressRowOffsets_.assign (numPartitions_, {});
6793 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
6894}
6995
96+ void LocalShuffleWriter::writeBlock (int32_t partition) {
97+ auto & buffer = inProgressPartitions_[partition];
98+ const auto bufferSize = inProgressSizes_[partition];
99+ if (!buffer || bufferSize == 0 ) {
100+ return ;
101+ }
102+ auto file = getNextOutputFile (partition);
103+ auto & offsets = inProgressRowOffsets_[partition];
104+ const bool needsSorting = !offsets.empty () && offsets[0 ].keySize > 0 ;
105+
106+ if (needsSorting) {
107+ const char * sourceData = buffer->as <char >();
108+ sortRows (offsets, sourceData);
109+ auto sortedBuffer = serializeSortedRows (offsets, sourceData, bufferSize);
110+ writeBufferToFile (
111+ file.get (), sortedBuffer->as <char >(), sortedBuffer->size ());
112+ } else {
113+ writeBufferToFile (file.get (), buffer->as <char >(), bufferSize);
114+ }
115+
116+ // Reset the buffer size to 0 to reuse the allocated buffer.
117+ // This avoids repeated allocation/deallocation cycles.
118+ inProgressSizes_[partition] = 0 ;
119+ }
120+
121+ void LocalShuffleWriter::sortRows (
122+ std::vector<RowOffset>& offsets,
123+ const char * bufferData) {
124+ boost::range::sort (
125+ offsets, [bufferData](const RowOffset& lhs, const RowOffset& rhs) {
126+ return detail::compareKeys (
127+ lhs.getKey (bufferData), rhs.getKey (bufferData));
128+ });
129+ }
130+
70131std::unique_ptr<velox::WriteFile> LocalShuffleWriter::getNextOutputFile (
71132 int32_t partition) {
72133 auto filename = nextAvailablePartitionFileName (rootPath_, partition);
@@ -92,47 +153,36 @@ std::string LocalShuffleWriter::nextAvailablePartitionFileName(
92153 return filename;
93154}
94155
95- void LocalShuffleWriter::storePartitionBlock (int32_t partition) {
96- auto & buffer = inProgressPartitions_[partition];
97- auto file = getNextOutputFile (partition);
98- file->append (
99- std::string_view (buffer->as <char >(), inProgressSizes_[partition]));
100- file->close ();
101- inProgressPartitions_[partition].reset ();
102- inProgressSizes_[partition] = 0 ;
103- }
104-
105156void LocalShuffleWriter::collect (
106157 int32_t partition,
107- std::string_view /* key */ ,
158+ std::string_view key,
108159 std::string_view data) {
109- using TRowSize = uint32_t ;
160+ VELOX_CHECK_LT (partition, numPartitions_) ;
110161
111- auto & buffer = inProgressPartitions_[partition];
112- const TRowSize rowSize = data.size ();
113- const auto size = sizeof (TRowSize) + rowSize;
114-
115- // Check if there is enough space in the buffer.
116- if ((buffer != nullptr ) &&
117- (inProgressSizes_[partition] + size >= buffer->capacity ())) {
118- storePartitionBlock (partition);
119- // NOTE: the referenced 'buffer' will be reset in storePartitionBlock.
120- }
162+ const auto size = rowSize (key.size (), data.size ());
121163
122- // Allocate buffer if needed.
123- if (buffer == nullptr ) {
124- buffer = AlignedBuffer::allocate<char >(
125- std::max (( uint64_t )size, maxBytesPerPartition_), pool_);
126- inProgressSizes_[partition] = 0 ;
127- inProgressPartitions_[ partition] = buffer ;
164+ if (inProgressPartitions_[partition] == nullptr ) {
165+ inProgressPartitions_[partition] =
166+ AlignedBuffer::allocate<char >(maxBytesPerPartition_, pool_, 0 );
167+ }
168+ if ( inProgressSizes_[partition] + size >= maxBytesPerPartition_) {
169+ writeBlock ( partition) ;
128170 }
129171
130- // Copy data.
131- auto offset = inProgressSizes_[partition];
132- auto rawBuffer = buffer->asMutable <char >() + offset;
172+ auto * rawBuffer = inProgressPartitions_[partition]->asMutable <char >();
173+ auto * writePos = rawBuffer + inProgressSizes_[partition];
133174
134- *(TRowSize*)(rawBuffer) = folly::Endian::big (rowSize);
135- ::memcpy (rawBuffer + sizeof (TRowSize), data.data(), rowSize);
175+ appendShuffleRow (writePos, key, data);
176+ const auto currentOffset = inProgressSizes_[partition];
177+ const auto keyOffset = currentOffset + (kUint32Size * 2 );
178+ const auto dataOffset = keyOffset + key.size ();
179+
180+ inProgressRowOffsets_[partition].emplace_back (
181+ RowOffset{
182+ .keyOffset = keyOffset,
183+ .dataOffset = dataOffset,
184+ .keySize = static_cast <uint32_t >(key.size ()),
185+ .dataSize = static_cast <uint32_t >(data.size ())});
136186
137187 inProgressSizes_[partition] += size;
138188}
@@ -144,11 +194,29 @@ void LocalShuffleWriter::noMoreData(bool success) {
144194 }
145195 for (auto i = 0 ; i < numPartitions_; ++i) {
146196 if (inProgressSizes_[i] > 0 ) {
147- storePartitionBlock (i);
197+ writeBlock (i);
148198 }
149199 }
150200}
151201
202+ velox::BufferPtr LocalShuffleWriter::serializeSortedRows (
203+ const std::vector<RowOffset>& offsets,
204+ const char * sourceBuffer,
205+ const size_t totalSize) {
206+ auto sortedBuffer = AlignedBuffer::allocate<char >(totalSize, pool_, 0 );
207+ auto * writePos = sortedBuffer->asMutable <char >();
208+
209+ for (const auto & offset : offsets) {
210+ const char * rowStart = sourceBuffer + offset.keyOffset - (kUint32Size * 2 );
211+ const size_t completeRowSize = rowSize (offset.keySize , offset.dataSize );
212+
213+ memcpy (writePos, rowStart, completeRowSize);
214+ writePos += completeRowSize;
215+ }
216+
217+ return sortedBuffer;
218+ }
219+
152220LocalShuffleReader::LocalShuffleReader (
153221 const std::string& rootPath,
154222 const std::string& queryId,
0 commit comments