1515#include " presto_cpp/external/json/nlohmann/json.hpp"
1616#include " presto_cpp/main/common/Configs.h"
1717
18+ #include < boost/range/algorithm/sort.hpp>
19+
1820using namespace facebook ::velox::exec;
1921using namespace facebook ::velox;
2022
@@ -39,15 +41,41 @@ inline std::string createShuffleFileName(
3941 id);
4042}
4143
42- // This file is used to indicate that the shuffle system is ready to be used for
43- // reading (acts as a sync point between readers if needed). Mostly used for
44- // test purposes.
45- const static std::string kReadyForReadFilename = " readyForRead" ;
44+ inline size_t rowSize (size_t keySize, size_t dataSize) noexcept {
45+ return (kUint32Size * 2 ) + keySize + dataSize;
46+ }
47+
48+ inline void appendShuffleRow (
49+ char * FOLLY_NONNULL buffer,
50+ std::string_view sortKey,
51+ std::string_view data) {
52+ // Write memory layout: [keySize][dataSize][key][data]
53+ *(TRowSize*)(buffer) =
54+ folly::Endian::big (static_cast <TRowSize>(sortKey.size ()));
55+ buffer += kUint32Size ;
56+
57+ *(TRowSize*)(buffer) = folly::Endian::big (static_cast <TRowSize>(data.size ()));
58+ buffer += kUint32Size ;
59+
60+ if (!sortKey.empty ()) {
61+ memcpy (buffer, sortKey.data (), sortKey.size ());
62+ buffer += sortKey.size ();
63+ }
64+
65+ memcpy (buffer, data.data (), data.size ());
66+ }
67+
68+ void writeBufferToFile (velox::WriteFile* file, const char * data, size_t size) {
69+ VELOX_CHECK_NOT_NULL (file);
70+ file->append (std::string_view (data, size));
71+ file->close ();
72+ }
73+
4674} // namespace
4775
4876LocalShuffleWriter::LocalShuffleWriter (
49- const std::string& rootPath,
50- const std::string& queryId,
77+ std::string rootPath,
78+ std::string queryId,
5179 uint32_t shuffleId,
5280 uint32_t numPartitions,
5381 uint64_t maxBytesPerPartition,
@@ -59,14 +87,53 @@ LocalShuffleWriter::LocalShuffleWriter(
5987 rootPath_(std::move(rootPath)),
6088 queryId_(std::move(queryId)),
6189 shuffleId_(shuffleId) {
62- // Use resize/assign instead of resize(size, val).
63- inProgressPartitions_.resize (numPartitions_);
6490 inProgressPartitions_.assign (numPartitions_, nullptr );
65- inProgressSizes_.resize (numPartitions_);
6691 inProgressSizes_.assign (numPartitions_, 0 );
92+ inProgressRowOffsets_.assign (numPartitions_, {});
6793 fileSystem_ = velox::filesystems::getFileSystem (rootPath_, nullptr );
6894}
6995
96+ void LocalShuffleWriter::writeBlock (int32_t partition) {
97+ auto & buffer = inProgressPartitions_[partition];
98+ const auto bufferSize = inProgressSizes_[partition];
99+ if (!buffer || bufferSize == 0 ) {
100+ return ;
101+ }
102+
103+ auto file = getNextOutputFile (partition);
104+ auto & offsets = inProgressRowOffsets_[partition];
105+
106+ const bool needsSorting = !offsets.empty () && offsets[0 ].keySize > 0 ;
107+
108+ if (needsSorting) {
109+ const char * sourceData = buffer->as <char >();
110+ sortRows (offsets, sourceData);
111+
112+ for (const auto & offset : offsets) {
113+ const char * rowStart = sourceData + offset.keyOffset - (kUint32Size * 2 );
114+ const size_t rowLen = (kUint32Size * 2 ) + offset.keySize + offset.dataSize ;
115+ file->append (std::string_view (rowStart, rowLen));
116+ }
117+ file->close ();
118+ } else {
119+ writeBufferToFile (file.get (), buffer->as <char >(), bufferSize);
120+ }
121+
122+ // Reset for buffer reuse
123+ inProgressSizes_[partition] = 0 ;
124+ inProgressRowOffsets_[partition].clear ();
125+ }
126+
127+ void LocalShuffleWriter::sortRows (
128+ std::vector<RowOffset>& offsets,
129+ const char * bufferData) {
130+ boost::range::sort (
131+ offsets, [bufferData](const RowOffset& lhs, const RowOffset& rhs) {
132+ return detail::compareKeys (
133+ lhs.getKey (bufferData), rhs.getKey (bufferData));
134+ });
135+ }
136+
70137std::unique_ptr<velox::WriteFile> LocalShuffleWriter::getNextOutputFile (
71138 int32_t partition) {
72139 auto filename = nextAvailablePartitionFileName (rootPath_, partition);
@@ -92,47 +159,36 @@ std::string LocalShuffleWriter::nextAvailablePartitionFileName(
92159 return filename;
93160}
94161
95- void LocalShuffleWriter::storePartitionBlock (int32_t partition) {
96- auto & buffer = inProgressPartitions_[partition];
97- auto file = getNextOutputFile (partition);
98- file->append (
99- std::string_view (buffer->as <char >(), inProgressSizes_[partition]));
100- file->close ();
101- inProgressPartitions_[partition].reset ();
102- inProgressSizes_[partition] = 0 ;
103- }
104-
105162void LocalShuffleWriter::collect (
106163 int32_t partition,
107- std::string_view /* key */ ,
164+ std::string_view key,
108165 std::string_view data) {
109- using TRowSize = uint32_t ;
166+ VELOX_CHECK_LT (partition, numPartitions_) ;
110167
111- auto & buffer = inProgressPartitions_[partition];
112- const TRowSize rowSize = data.size ();
113- const auto size = sizeof (TRowSize) + rowSize;
114-
115- // Check if there is enough space in the buffer.
116- if ((buffer != nullptr ) &&
117- (inProgressSizes_[partition] + size >= buffer->capacity ())) {
118- storePartitionBlock (partition);
119- // NOTE: the referenced 'buffer' will be reset in storePartitionBlock.
120- }
168+ const auto size = rowSize (key.size (), data.size ());
121169
122- // Allocate buffer if needed.
123- if (buffer == nullptr ) {
124- buffer = AlignedBuffer::allocate<char >(
125- std::max (( uint64_t )size, maxBytesPerPartition_), pool_);
126- inProgressSizes_[partition] = 0 ;
127- inProgressPartitions_[ partition] = buffer ;
170+ if (inProgressPartitions_[partition] == nullptr ) {
171+ inProgressPartitions_[partition] =
172+ AlignedBuffer::allocate<char >(maxBytesPerPartition_, pool_, 0 );
173+ }
174+ if ( inProgressSizes_[partition] + size >= maxBytesPerPartition_) {
175+ writeBlock ( partition) ;
128176 }
129177
130- // Copy data.
131- auto offset = inProgressSizes_[partition];
132- auto rawBuffer = buffer->asMutable <char >() + offset;
178+ auto * rawBuffer = inProgressPartitions_[partition]->asMutable <char >();
179+ auto * writePos = rawBuffer + inProgressSizes_[partition];
133180
134- *(TRowSize*)(rawBuffer) = folly::Endian::big (rowSize);
135- ::memcpy (rawBuffer + sizeof (TRowSize), data.data(), rowSize);
181+ appendShuffleRow (writePos, key, data);
182+ const auto currentOffset = inProgressSizes_[partition];
183+ const auto keyOffset = currentOffset + (kUint32Size * 2 );
184+ const auto dataOffset = keyOffset + key.size ();
185+
186+ inProgressRowOffsets_[partition].emplace_back (
187+ RowOffset{
188+ .keyOffset = keyOffset,
189+ .dataOffset = dataOffset,
190+ .keySize = static_cast <uint32_t >(key.size ()),
191+ .dataSize = static_cast <uint32_t >(data.size ())});
136192
137193 inProgressSizes_[partition] += size;
138194}
@@ -144,11 +200,10 @@ void LocalShuffleWriter::noMoreData(bool success) {
144200 }
145201 for (auto i = 0 ; i < numPartitions_; ++i) {
146202 if (inProgressSizes_[i] > 0 ) {
147- storePartitionBlock (i);
203+ writeBlock (i);
148204 }
149205 }
150206}
151-
152207LocalShuffleReader::LocalShuffleReader (
153208 const std::string& rootPath,
154209 const std::string& queryId,
0 commit comments