From e959ba74c4cb9808437b27174f7222bc000e1315 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Sun, 31 Dec 2023 22:09:08 +0800
Subject: [PATCH] support fill missing tumble windows for aggr by setting
---
src/Core/Settings.h | 1 +
src/Interpreters/InterpreterSelectQuery.cpp | 16 +++++-
src/Interpreters/Streaming/WindowCommon.cpp | 34 +++++++++++
src/Interpreters/Streaming/WindowCommon.h | 2 +
.../QueryPlan/Streaming/AggregatingStep.cpp | 10 +++-
.../QueryPlan/Streaming/AggregatingStep.h | 4 +-
.../AggregatingStepWithSubstream.cpp | 10 +++-
.../Streaming/AggregatingStepWithSubstream.h | 4 +-
.../Streaming/AggregatingTransform.h | 4 +-
.../Streaming/TumbleAggregatingTransform.cpp | 48 ++++++++++++++++
...umbleAggregatingTransformWithSubstream.cpp | 56 +++++++++++++++++++
.../Streaming/WindowAggregatingTransform.cpp | 16 +++++-
...indowAggregatingTransformWithSubstream.cpp | 18 ++++--
13 files changed, 206 insertions(+), 17 deletions(-)
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index be9597c0168..367908cc967 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -808,6 +808,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Float, replay_speed, 0., "Control the replay speed..0 < replay_speed < 1, means replay slower.replay_speed == 1, means replay by actual ingest interval.1 < replay_speed < , means replay faster", 0) \
M(UInt64, max_events, 0, "Total events to generate for random stream", 0) \
M(Int64, eps, -1, "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \
+ M(Bool, fill_missing_window_for_aggr, false, "fill missing window if not exist for aggr query", 0) \
// End of GLOBAL_SETTINGS
#define CONFIGURABLE_GLOBAL_SETTINGS(M) \
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index 02fb339d581..daa31a6ca77 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -3268,10 +3268,22 @@ void InterpreterSelectQuery::executeStreamingAggregation(
/// 2) `shuffle by`: calculating light substream without substream ID (The data have been shuffled by `LightShufflingTransform`)
if (query_info.hasPartitionByKeys() || light_shuffled)
query_plan.addStep(std::make_unique(
- query_plan.getCurrentDataStream(), std::move(params), final, emit_version, data_stream_semantic_pair.isChangelogOutput()));
+ query_plan.getCurrentDataStream(),
+ std::move(params),
+ final,
+ emit_version,
+ data_stream_semantic_pair.isChangelogOutput(),
+ settings.fill_missing_window_for_aggr));
else
query_plan.addStep(std::make_unique(
- query_plan.getCurrentDataStream(), std::move(params), final, merge_threads, temporary_data_merge_threads, emit_version, data_stream_semantic_pair.isChangelogOutput()));
+ query_plan.getCurrentDataStream(),
+ std::move(params),
+ final,
+ merge_threads,
+ temporary_data_merge_threads,
+ emit_version,
+ data_stream_semantic_pair.isChangelogOutput(),
+ settings.fill_missing_window_for_aggr));
}
/// Resolve input / output data stream semantic.
diff --git a/src/Interpreters/Streaming/WindowCommon.cpp b/src/Interpreters/Streaming/WindowCommon.cpp
index 06f84b55415..cdf69cd2a05 100644
--- a/src/Interpreters/Streaming/WindowCommon.cpp
+++ b/src/Interpreters/Streaming/WindowCommon.cpp
@@ -873,5 +873,39 @@ void reassignWindow(Chunk & chunk, const Window & window, bool time_col_is_datet
chunk.setColumns(std::move(columns), rows);
}
+void addMissingWindow(
+ Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional start_pos, std::optional end_pos)
+{
+ auto add_window_time = [&](ColumnPtr & column, Int64 ts) {
+ auto col = IColumn::mutate(std::move(column));
+ if (time_col_is_datetime64)
+ col->insert(static_cast(ts));
+ else
+ col->insert(static_cast(ts));
+ column = std::move(col);
+ };
+
+ auto add_default = [&](ColumnPtr & column) {
+ auto col = IColumn::mutate(std::move(column));
+ col->insertDefault();
+ column = std::move(col);
+ };
+
+ auto rows = chunk.rows();
+ auto columns = chunk.detachColumns();
+ for (size_t pos = 0; auto & column : columns)
+ {
+ if (start_pos.has_value() && pos == *start_pos)
+ add_window_time(column, window.start);
+ else if (end_pos.has_value() && pos == *end_pos)
+ add_window_time(column, window.end);
+ else
+ add_default(column);
+
+ ++pos;
+ }
+
+ chunk.setColumns(std::move(columns), rows + 1);
+}
}
}
diff --git a/src/Interpreters/Streaming/WindowCommon.h b/src/Interpreters/Streaming/WindowCommon.h
index 33c9ee56a66..c6b2eb46461 100644
--- a/src/Interpreters/Streaming/WindowCommon.h
+++ b/src/Interpreters/Streaming/WindowCommon.h
@@ -298,5 +298,7 @@ void assignWindow(
Columns & columns, const WindowInterval & interval, size_t time_col_pos, bool time_col_is_datetime64, const DateLUTImpl & time_zone);
void reassignWindow(
Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional start_pos, std::optional end_pos);
+void addMissingWindow(
+ Chunk & chunk, const Window & window, bool time_col_is_datetime64, std::optional start_pos, std::optional end_pos);
}
}
diff --git a/src/Processors/QueryPlan/Streaming/AggregatingStep.cpp b/src/Processors/QueryPlan/Streaming/AggregatingStep.cpp
index 577dfc50d2e..1ce123718a4 100644
--- a/src/Processors/QueryPlan/Streaming/AggregatingStep.cpp
+++ b/src/Processors/QueryPlan/Streaming/AggregatingStep.cpp
@@ -37,14 +37,17 @@ AggregatingStep::AggregatingStep(
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool emit_version_,
- bool emit_changelog_)
- : ITransformingStep(input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
+ bool emit_changelog_,
+ bool fill_missing_window_)
+ : ITransformingStep(
+ input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
, params(std::move(params_))
, final(std::move(final_))
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
+ , fill_missing_window(fill_missing_window_)
{
}
@@ -69,7 +72,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
- auto transform_params = std::make_shared(std::move(params), final, emit_version, emit_changelog);
+ auto transform_params
+ = std::make_shared(std::move(params), final, emit_version, emit_changelog, fill_missing_window);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
diff --git a/src/Processors/QueryPlan/Streaming/AggregatingStep.h b/src/Processors/QueryPlan/Streaming/AggregatingStep.h
index 092cc38fc71..57f07923e9a 100644
--- a/src/Processors/QueryPlan/Streaming/AggregatingStep.h
+++ b/src/Processors/QueryPlan/Streaming/AggregatingStep.h
@@ -22,7 +22,8 @@ class AggregatingStep : public ITransformingStep
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool emit_version_,
- bool emit_changelog_);
+ bool emit_changelog_,
+ bool fill_missing_window_);
String getName() const override { return "StreamingAggregating"; }
@@ -43,6 +44,7 @@ class AggregatingStep : public ITransformingStep
bool emit_version;
bool emit_changelog;
+ bool fill_missing_window;
Processors aggregating;
};
diff --git a/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.cpp b/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.cpp
index 112a234e0f2..dc1249ddbc6 100644
--- a/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.cpp
+++ b/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.cpp
@@ -37,13 +37,19 @@ ITransformingStep::Traits getTraits()
}
AggregatingStepWithSubstream::AggregatingStepWithSubstream(
- const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool emit_version_, bool emit_changelog_)
+ const DataStream & input_stream_,
+ Aggregator::Params params_,
+ bool final_,
+ bool emit_version_,
+ bool emit_changelog_,
+ bool fill_missing_window_)
: ITransformingStep(
input_stream_, AggregatingTransformParams::getHeader(params_, final_, emit_version_, emit_changelog_), getTraits(), false)
, params(std::move(params_))
, final(std::move(final_))
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
+ , fill_missing_window(fill_missing_window_)
{
}
@@ -64,7 +70,7 @@ void AggregatingStepWithSubstream::transformPipeline(QueryPipelineBuilder & pipe
params.group_by_two_level_threshold_bytes = 0;
}
- auto transform_params = std::make_shared(std::move(params), final, emit_version, emit_changelog);
+ auto transform_params = std::make_shared(std::move(params), final, emit_version, emit_changelog, fill_missing_window);
/// If there are several sources, we perform aggregation separately (Assume it's shuffled data by substream keys)
pipeline.addSimpleTransform([&](const Block & header) -> std::shared_ptr {
diff --git a/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.h b/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.h
index ad3721be625..fd42151a754 100644
--- a/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.h
+++ b/src/Processors/QueryPlan/Streaming/AggregatingStepWithSubstream.h
@@ -19,7 +19,8 @@ class AggregatingStepWithSubstream final : public ITransformingStep
Aggregator::Params params_,
bool final_,
bool emit_version_,
- bool emit_changelog_);
+ bool emit_changelog_,
+ bool fill_missing_window_);
String getName() const override { return "StreamingAggregatingWithSubstream"; }
@@ -37,6 +38,7 @@ class AggregatingStepWithSubstream final : public ITransformingStep
bool final;
bool emit_version;
bool emit_changelog;
+ bool fill_missing_window;
Processors aggregating;
};
diff --git a/src/Processors/Transforms/Streaming/AggregatingTransform.h b/src/Processors/Transforms/Streaming/AggregatingTransform.h
index a189fd76c72..95724cb5c48 100644
--- a/src/Processors/Transforms/Streaming/AggregatingTransform.h
+++ b/src/Processors/Transforms/Streaming/AggregatingTransform.h
@@ -21,14 +21,16 @@ struct AggregatingTransformParams
bool only_merge = false;
bool emit_version = false;
bool emit_changelog = false;
+ bool fill_missing_window = false;
DataTypePtr version_type;
- AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool emit_version_, bool emit_changelog_)
+ AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool emit_version_, bool emit_changelog_, bool fill_missing_window_)
: aggregator(params_)
, params(aggregator.getParams())
, final(final_)
, emit_version(emit_version_)
, emit_changelog(emit_changelog_)
+ , fill_missing_window(fill_missing_window_)
{
if (emit_version)
version_type = DataTypeFactory::instance().get("int64");
diff --git a/src/Processors/Transforms/Streaming/TumbleAggregatingTransform.cpp b/src/Processors/Transforms/Streaming/TumbleAggregatingTransform.cpp
index 6e8809ef56b..e5e0da7e29d 100644
--- a/src/Processors/Transforms/Streaming/TumbleAggregatingTransform.cpp
+++ b/src/Processors/Transforms/Streaming/TumbleAggregatingTransform.cpp
@@ -79,6 +79,54 @@ WindowsWithBuckets TumbleAggregatingTransform::getLocalFinalizedWindowsWithBucke
{time_bucket}});
}
+ if (params->fill_missing_window)
+ {
+ /// If no finalized window, we should start from the current first window
+ Int64 next_window_end;
+ auto finalized_window_end = many_data->finalized_window_end.load(std::memory_order_relaxed);
+ if (finalized_window_end == INVALID_WATERMARK)
+ {
+ if (windows_with_buckets.empty())
+ return {};
+
+ next_window_end = windows_with_buckets.front().window.end;
+ }
+ else
+ next_window_end = addTime(
+ finalized_window_end,
+ window_params.interval_kind,
+ window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale);
+
+ auto it = windows_with_buckets.begin();
+ while (next_window_end <= watermark_)
+ {
+ /// Add missing window if not exist
+ if (it == windows_with_buckets.end() || next_window_end != it->window.end) [[unlikely]]
+ {
+ Window missing_window
+ = {addTime(
+ next_window_end,
+ window_params.interval_kind,
+ -window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale),
+ next_window_end};
+ it = windows_with_buckets.insert(it, WindowWithBuckets{.window = std::move(missing_window), .buckets = {}});
+ }
+
+ next_window_end = addTime(
+ next_window_end,
+ window_params.interval_kind,
+ window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale);
+
+ ++it;
+ }
+ }
+
return windows_with_buckets;
}
diff --git a/src/Processors/Transforms/Streaming/TumbleAggregatingTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/TumbleAggregatingTransformWithSubstream.cpp
index ea27e9036cd..a541699dc77 100644
--- a/src/Processors/Transforms/Streaming/TumbleAggregatingTransformWithSubstream.cpp
+++ b/src/Processors/Transforms/Streaming/TumbleAggregatingTransformWithSubstream.cpp
@@ -65,6 +65,62 @@ TumbleAggregatingTransformWithSubstream::getFinalizedWindowsWithBuckets(Int64 wa
{time_bucket}});
}
+ if (params->fill_missing_window)
+ {
+ /// If no finalized window, we should start from the current first window
+ Int64 next_window_end;
+ if (substream_ctx->finalized_watermark == INVALID_WATERMARK)
+ {
+ if (windows_with_buckets.empty())
+ return {};
+
+ next_window_end = windows_with_buckets.front().window.end;
+ }
+ else
+ {
+ auto finalized_window_start = toStartTime(
+ substream_ctx->finalized_watermark,
+ window_params.interval_kind,
+ window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale);
+
+ next_window_end = addTime(
+ finalized_window_start,
+ window_params.interval_kind,
+ 2 * window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale);
+ }
+
+ auto it = windows_with_buckets.begin();
+ while (next_window_end <= watermark)
+ {
+ /// Add missing window if not exist
+ if (it == windows_with_buckets.end() || next_window_end != it->window.end) [[unlikely]]
+ {
+ Window missing_window
+ = {addTime(
+ next_window_end,
+ window_params.interval_kind,
+ -window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale),
+ next_window_end};
+ it = windows_with_buckets.insert(it, WindowWithBuckets{.window = std::move(missing_window), .buckets = {}});
+ }
+
+ next_window_end = addTime(
+ next_window_end,
+ window_params.interval_kind,
+ window_params.window_interval,
+ *window_params.time_zone,
+ window_params.time_scale);
+
+ ++it;
+ }
+ }
+
return windows_with_buckets;
}
diff --git a/src/Processors/Transforms/Streaming/WindowAggregatingTransform.cpp b/src/Processors/Transforms/Streaming/WindowAggregatingTransform.cpp
index e1ab84507bd..1e20bb1d6d8 100644
--- a/src/Processors/Transforms/Streaming/WindowAggregatingTransform.cpp
+++ b/src/Processors/Transforms/Streaming/WindowAggregatingTransform.cpp
@@ -131,10 +131,20 @@ void WindowAggregatingTransform::finalize(const ChunkContextPtr & chunk_ctx)
assert(!prepared_windows_with_buckets.empty());
for (const auto & window_with_buckets : prepared_windows_with_buckets)
{
- chunk = AggregatingHelper::mergeAndSpliceAndConvertBucketsToChunk(many_data->variants, *params, window_with_buckets.buckets);
+ /// No buckets means it's empty window. We should fill it with default values, it's only possible for enable fill missing window
+ if (window_with_buckets.buckets.empty())
+ {
+ assert(params->fill_missing_window);
+ chunk.setColumns(params->aggregator.getHeader(params->final).cloneEmptyColumns(), 0);
+ addMissingWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ }
+ else
+ {
+ chunk = AggregatingHelper::mergeAndSpliceAndConvertBucketsToChunk(many_data->variants, *params, window_with_buckets.buckets);
- if (needReassignWindow())
- reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ if (needReassignWindow())
+ reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ }
if (params->emit_version && params->final)
emitVersion(chunk);
diff --git a/src/Processors/Transforms/Streaming/WindowAggregatingTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/WindowAggregatingTransformWithSubstream.cpp
index a1d0fa74813..e85cf867068 100644
--- a/src/Processors/Transforms/Streaming/WindowAggregatingTransformWithSubstream.cpp
+++ b/src/Processors/Transforms/Streaming/WindowAggregatingTransformWithSubstream.cpp
@@ -73,10 +73,20 @@ void WindowAggregatingTransformWithSubstream::doFinalize(
&& window_with_buckets.window.end <= last_finalized_windows_with_buckets.back().window.end)
continue;
- chunk = AggregatingHelper::spliceAndConvertBucketsToChunk(data_variant, *params, window_with_buckets.buckets);
-
- if (needReassignWindow())
- reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ /// No buckets means it's empty window. We should fill it with default values, it's only possible for enable fill missing window
+ if (window_with_buckets.buckets.empty())
+ {
+ assert(params->fill_missing_window);
+ chunk.setColumns(params->aggregator.getHeader(params->final).cloneEmptyColumns(), 0);
+ addMissingWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ }
+ else
+ {
+ chunk = AggregatingHelper::spliceAndConvertBucketsToChunk(data_variant, *params, window_with_buckets.buckets);
+
+ if (needReassignWindow())
+ reassignWindow(chunk, window_with_buckets.window, params->params.window_params->time_col_is_datetime64, window_start_col_pos, window_end_col_pos);
+ }
if (params->emit_version && params->final)
emitVersion(chunk, substream_ctx);