Skip to content

Commit

Permalink
support fill missing tumble windows for aggr by setting
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Dec 31, 2023
1 parent 8ac7bbb commit b25b04e
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Float, replay_speed, 0., "Control the replay speed..0 < replay_speed < 1, means replay slower.replay_speed == 1, means replay by actual ingest interval.1 < replay_speed < <max_limit>, means replay faster", 0) \
M(UInt64, max_events, 0, "Total events to generate for random stream", 0) \
M(Int64, eps, -1, "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \
M(Bool, fill_missing_window_for_aggr, false, "fill missing window if not exist for aggr query", 0) \
// End of GLOBAL_SETTINGS

#define CONFIGURABLE_GLOBAL_SETTINGS(M) \
Expand Down
16 changes: 14 additions & 2 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3268,10 +3268,22 @@ void InterpreterSelectQuery::executeStreamingAggregation(
/// 2) `shuffle by`: calculating light substream without substream ID (The data have been shuffled by `LightShufflingTransform`)
if (query_info.hasPartitionByKeys() || light_shuffled)
query_plan.addStep(std::make_unique<Streaming::AggregatingStepWithSubstream>(
query_plan.getCurrentDataStream(), std::move(params), final, emit_version, data_stream_semantic_pair.isChangelogOutput()));
query_plan.getCurrentDataStream(),
std::move(params),
final,
emit_version,
data_stream_semantic_pair.isChangelogOutput(),
settings.fill_missing_window_for_aggr));
else
query_plan.addStep(std::make_unique<Streaming::AggregatingStep>(
query_plan.getCurrentDataStream(), std::move(params), final, merge_threads, temporary_data_merge_threads, emit_version, data_stream_semantic_pair.isChangelogOutput()));
query_plan.getCurrentDataStream(),
std::move(params),
final,
merge_threads,
temporary_data_merge_threads,
emit_version,
data_stream_semantic_pair.isChangelogOutput(),
settings.fill_missing_window_for_aggr));
}

/// Resolve input / output data stream semantic.
Expand Down
34 changes: 34 additions & 0 deletions src/Interpreters/Streaming/WindowCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,5 +873,39 @@ void reassignWindow(Chunk & chunk, const Window & window, bool time_col_is_datet
chunk.setColumns(std::move(columns), rows);
}

void addMissingWindow(
Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional<size_t> start_pos, std::optional<size_t> end_pos)
{
auto add_window_time = [&](ColumnPtr & column, Int64 ts) {
auto col = IColumn::mutate(std::move(column));
if (time_col_is_datetime64)
col->insert(static_cast<DateTime64>(ts));
else
col->insert(static_cast<UInt32>(ts));
column = std::move(col);
};

auto add_default = [&](ColumnPtr & column) {
auto col = IColumn::mutate(std::move(column));
col->insertDefault();
column = std::move(col);
};

auto rows = chunk.rows();
auto columns = chunk.detachColumns();
for (size_t pos = 0; auto & column : columns)
{
if (start_pos.has_value() && pos == *start_pos)
add_window_time(column, window.start);
else if (end_pos.has_value() && pos == *end_pos)
add_window_time(column, window.end);
else
add_default(column);

++pos;
}

chunk.setColumns(std::move(columns), rows + 1);
}
}
}
2 changes: 2 additions & 0 deletions src/Interpreters/Streaming/WindowCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,7 @@ void assignWindow(
Columns & columns, const WindowInterval & interval, size_t time_col_pos, bool time_col_is_datetime64, const DateLUTImpl & time_zone);
void reassignWindow(
Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional<size_t> start_pos, std::optional<size_t> end_pos);
void addMissingWindow(
Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional<size_t> start_pos, std::optional<size_t> end_pos);
}
}
10 changes: 7 additions & 3 deletions src/Processors/QueryPlan/Streaming/AggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ AggregatingStep::AggregatingStep(
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool emit_version_,
bool emit_changelog_)
: ITransformingStep(input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
bool emit_changelog_,
bool fill_missing_window_)
: ITransformingStep(
input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
, params(std::move(params_))
, final(std::move(final_))
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
, fill_missing_window(fill_missing_window_)
{
}

Expand All @@ -69,7 +72,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final, emit_version, emit_changelog);
auto transform_params
= std::make_shared<AggregatingTransformParams>(std::move(params), final, emit_version, emit_changelog, fill_missing_window);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/QueryPlan/Streaming/AggregatingStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class AggregatingStep : public ITransformingStep
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool emit_version_,
bool emit_changelog_);
bool emit_changelog_,
bool fill_missing_window_);

String getName() const override { return "StreamingAggregating"; }

Expand All @@ -43,6 +44,7 @@ class AggregatingStep : public ITransformingStep

bool emit_version;
bool emit_changelog;
bool fill_missing_window;

Processors aggregating;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ ITransformingStep::Traits getTraits()
}

AggregatingStepWithSubstream::AggregatingStepWithSubstream(
const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool emit_version_, bool emit_changelog_)
const DataStream & input_stream_,
Aggregator::Params params_,
bool final_,
bool emit_version_,
bool emit_changelog_,
bool fill_missing_window_)
: ITransformingStep(
input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
, params(std::move(params_))
, final(std::move(final_))
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
, fill_missing_window(fill_missing_window_)
{
}

Expand All @@ -64,7 +70,7 @@ void AggregatingStepWithSubstream::transformPipeline(QueryPipelineBuilder & pipe
params.group_by_two_level_threshold_bytes = 0;
}

auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final, emit_version, emit_changelog);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final, emit_version, emit_changelog, fill_missing_window);

/// If there are several sources, we perform aggregation separately (Assume it's shuffled data by substream keys)
pipeline.addSimpleTransform([&](const Block & header) -> std::shared_ptr<IProcessor> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class AggregatingStepWithSubstream final : public ITransformingStep
Aggregator::Params params_,
bool final_,
bool emit_version_,
bool emit_changelog_);
bool emit_changelog_,
bool fill_missing_window_);

String getName() const override { return "StreamingAggregatingWithSubstream"; }

Expand All @@ -37,6 +38,7 @@ class AggregatingStepWithSubstream final : public ITransformingStep
bool final;
bool emit_version;
bool emit_changelog;
bool fill_missing_window;

Processors aggregating;
};
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/Transforms/Streaming/AggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ struct AggregatingTransformParams
bool only_merge = false;
bool emit_version = false;
bool emit_changelog = false;
bool fill_missing_window = false;
DataTypePtr version_type;

AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool emit_version_, bool emit_changelog_)
AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool emit_version_, bool emit_changelog_, bool fill_missing_window_)
: aggregator(params_)
, params(aggregator.getParams())
, final(final_)
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
, fill_missing_window(fill_missing_window_)
{
if (emit_version)
version_type = DataTypeFactory::instance().get("int64");
Expand Down
37 changes: 37 additions & 0 deletions src/Processors/Transforms/Streaming/TumbleAggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ WindowsWithBuckets TumbleAggregatingTransform::getLocalFinalizedWindowsWithBucke
{time_bucket}});
}

if (params->fill_missing_window && many_data->finalized_window_end.load(std::memory_order_relaxed) != INVALID_WATERMARK)
{
auto next_window_end = addTime(
many_data->finalized_window_end.load(std::memory_order_relaxed),
window_params.interval_kind,
window_params.window_interval,
*window_params.time_zone,
window_params.time_scale);

auto it = windows_with_buckets.begin();
while (next_window_end <= watermark_)
{
/// Add missing window if not exist
if (it == windows_with_buckets.end() || next_window_end != it->window.end) [[unlikely]]
{
Window missing_window
= {addTime(
next_window_end,
window_params.interval_kind,
-window_params.window_interval,
*window_params.time_zone,
window_params.time_scale),
next_window_end};
it = windows_with_buckets.insert(it, WindowWithBuckets{.window = std::move(missing_window), .buckets = {}});
}

next_window_end = addTime(
next_window_end,
window_params.interval_kind,
window_params.window_interval,
*window_params.time_zone,
window_params.time_scale);

++it;
}
}

return windows_with_buckets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,50 @@ TumbleAggregatingTransformWithSubstream::getFinalizedWindowsWithBuckets(Int64 wa
{time_bucket}});
}

if (params->fill_missing_window && substream_ctx->finalized_watermark != INVALID_WATERMARK)
{
auto finalized_window_start = toStartTime(
substream_ctx->finalized_watermark,
window_params.interval_kind,
window_params.window_interval,
*window_params.time_zone,
window_params.time_scale);

auto next_window_end = addTime(
finalized_window_start,
window_params.interval_kind,
2 * window_params.window_interval,
*window_params.time_zone,
window_params.time_scale);

auto it = windows_with_buckets.begin();
while (next_window_end <= watermark)
{
/// Add missing window if not exist
if (it == windows_with_buckets.end() || next_window_end != it->window.end) [[unlikely]]
{
Window missing_window
= {addTime(
next_window_end,
window_params.interval_kind,
-window_params.window_interval,
*window_params.time_zone,
window_params.time_scale),
next_window_end};
it = windows_with_buckets.insert(it, WindowWithBuckets{.window = std::move(missing_window), .buckets = {}});
}

next_window_end = addTime(
next_window_end,
window_params.interval_kind,
window_params.window_interval,
*window_params.time_zone,
window_params.time_scale);

++it;
}
}

return windows_with_buckets;
}

Expand Down
18 changes: 15 additions & 3 deletions src/Processors/Transforms/Streaming/WindowAggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,22 @@ void WindowAggregatingTransform::finalize(const ChunkContextPtr & chunk_ctx)
assert(!prepared_windows_with_buckets.empty());
for (const auto & window_with_buckets : prepared_windows_with_buckets)
{
chunk = AggregatingHelper::mergeAndSpliceAndConvertBucketsToChunk(many_data->variants, *params, window_with_buckets.buckets);
/// No buckets means it's empty window. We should fill it with default values, it's only possible for enable fill missing window
if (window_with_buckets.buckets.empty())
{
assert(params->fill_missing_window);
if (!chunk.hasColumns())
chunk.setColumns(params->aggregator.getHeader(params->final).cloneEmptyColumns(), 0);

addMissingWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
}
else
{
chunk = AggregatingHelper::mergeAndSpliceAndConvertBucketsToChunk(many_data->variants, *params, window_with_buckets.buckets);

if (needReassignWindow())
reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
if (needReassignWindow())
reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
}

if (params->emit_version && params->final)
emitVersion(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,22 @@ void WindowAggregatingTransformWithSubstream::doFinalize(
&& window_with_buckets.window.end <= last_finalized_windows_with_buckets.back().window.end)
continue;

chunk = AggregatingHelper::spliceAndConvertBucketsToChunk(data_variant, *params, window_with_buckets.buckets);

if (needReassignWindow())
reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
/// No buckets means it's empty window. We should fill it with default values, it's only possible for enable fill missing window
if (window_with_buckets.buckets.empty())
{
assert(params->fill_missing_window);
if (!chunk.hasColumns())
chunk.setColumns(params->aggregator.getHeader(params->final).cloneEmptyColumns(), 0);

addMissingWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
}
else
{
chunk = AggregatingHelper::spliceAndConvertBucketsToChunk(data_variant, *params, window_with_buckets.buckets);

if (needReassignWindow())
reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
}

if (params->emit_version && params->final)
emitVersion(chunk, substream_ctx);
Expand Down

0 comments on commit b25b04e

Please sign in to comment.