Skip to content

Commit

Permalink
Add RowsStreamingWindowBuild to avoid OOM in Window operator (9025)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Jun 17, 2024
1 parent b44766b commit cf6ae3a
Show file tree
Hide file tree
Showing 24 changed files with 596 additions and 58 deletions.
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ class QueryConfig {
static constexpr const char* kTopNRowNumberSpillEnabled =
"topn_row_number_spill_enabled";

/// RowsStreamingWindow flag, only used to test RowsStreamingWindowBuild in
/// window test.
static constexpr const char* kRowsStreamingWindowEnabled =
"rows_streaming_window_enabled";

/// The max row numbers to fill and spill for each spill run. This is used to
/// cap the memory used for spilling. If it is zero, then there is no limit
/// and spilling might run out of memory.
Expand Down Expand Up @@ -471,6 +476,10 @@ class QueryConfig {
return get<bool>(kAdaptiveFilterReorderingEnabled, true);
}

bool rowsStreamingWindowEnabled() const {
return get<bool>(kRowsStreamingWindowEnabled, false);
}

bool isLegacyCast() const {
return get<bool>(kLegacyCast, false);
}
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) {
pool,
stringAllocator,
config);
});
},
{exec::Scope::kRows, true});
}
}
} // namespace facebook::velox::exec
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ add_library(
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand Down
87 changes: 87 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowsStreamingWindowBuild.h"

namespace facebook::velox::exec {

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) {
if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);

if (isFinished) {
windowPartitions_[inputPartition_]->setComplete();
inputPartition_++;
}

inputRows_.clear();
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
buildNextInputOrPartition(true);
}

if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
buildNextInputOrPartition(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowsStreamingWindowBuild::noMoreInput() {
buildNextInputOrPartition(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
if (outputPartition_ > 0) {
windowPartitions_[outputPartition_].reset();
}

return windowPartitions_[++outputPartition_];
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return windowPartitions_.size() > 0 &&
outputPartition_ <= int(windowPartitions_.size() - 2);
}

} // namespace facebook::velox::exec
80 changes: 80 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entire partition to be ready. This approach
/// can significantly reduce memory usage, especially when a single partition
/// contains a large amount of data. It is particularly suited for optimizing
/// rank and row_number functions, as well as aggregate window functions with a
/// default frame.
class RowsStreamingWindowBuild : public WindowBuild {
public:
RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.size() == 0 ||
outputPartition_ == windowPartitions_.size() - 1;
}

private:
void buildNextInputOrPartition(bool isFinished);

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to return the WidnowPartitions.
vector_size_t outputPartition_ = -1;

// Current partition when adding input. Used to construct WindowPartitions.
vector_size_t inputPartition_ = 0;

// Holds all the WindowPartitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
};

} // namespace facebook::velox::exec
6 changes: 3 additions & 3 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand All @@ -316,7 +316,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/StreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() {
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -89,7 +89,7 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/StreamingWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
Loading

0 comments on commit cf6ae3a

Please sign in to comment.