Skip to content
Merged
38 changes: 12 additions & 26 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_exception::Result;
use databend_common_pipeline::core::always_callback;
use databend_common_pipeline::core::ExecutionInfo;
use databend_common_pipeline::core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::OptimizeCompactBlock;
use databend_common_sql::plans::ReclusterPlan;
Expand All @@ -50,7 +49,6 @@ pub struct CompactTargetTableDescription {
pub catalog: String,
pub database: String,
pub table: String,
pub mutation_kind: MutationKind,
}

pub struct CompactHookTraceCtx {
Expand Down Expand Up @@ -95,30 +93,18 @@ async fn do_hook_compact(
info!("Operation {op_name} completed successfully, starting table optimization job.");

let compact_start_at = Instant::now();
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint =
ctx.get_compaction_num_block_hint(&compact_target.table);
info!(
"Table {} requires compaction of {} blocks",
compact_target.table, compaction_num_block_hint
);
if compaction_num_block_hint == 0 {
return Ok(());
}
CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
}
}
_ => {
let auto_compaction_segments_limit =
ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
let compaction_num_block_hint =
ctx.get_compaction_num_block_hint(&compact_target.table);
info!(
"Table {} requires compaction of {} blocks",
compact_target.table, compaction_num_block_hint
);
if compaction_num_block_hint == 0 {
return Ok(());
}
let compaction_limits = CompactionLimits {
segment_limit: None,
block_limit: Some(compaction_num_block_hint as usize),
};

// keep the original progress value
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/interpreters/hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl HookOperator {
catalog: self.catalog.to_owned(),
database: self.database.to_owned(),
table: self.table.to_owned(),
mutation_kind: self.mutation_kind,
};

let trace_ctx = CompactHookTraceCtx {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn test_deletion_mutator_multiple_empty_segments() -> Result<()> {
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 7 | 7 |",
"| 9 | 9 |",
"+----------+----------+",
];
let qry = format!(
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ impl DefaultSettings {
}),
("auto_compaction_segments_limit", DefaultSettingValue {
value: UserSettingValue::UInt64(3),
desc: "The maximum number of segments that can be compacted automatically triggered after write(replace-into/merge-into).",
desc: "The maximum number of segments that can be reclustered automatically triggered after write.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(2..=u64::MAX)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::info;
use log::warn;

use crate::operations::common::ConflictResolveContext;
Expand All @@ -41,7 +40,8 @@ use crate::statistics::TableStatsGenerator;

#[derive(Clone)]
pub struct AppendGenerator {
ctx: Arc<dyn TableContext>,
pub(crate) ctx: Arc<dyn TableContext>,

leaf_default_values: HashMap<ColumnId, Scalar>,
overwrite: bool,
conflict_resolve_ctx: ConflictResolveContext,
Expand Down Expand Up @@ -194,28 +194,6 @@ impl SnapshotGenerator for AppendGenerator {
}
}

// check if need to auto compact
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
let imperfect_count = new_summary.block_count - new_summary.perfect_block_count;
let auto_compaction_imperfect_blocks_threshold = self
.ctx
.get_settings()
.get_auto_compaction_imperfect_blocks_threshold()?;

if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
// eligible for auto-compaction, this adjustment is intended to help reduce
// fragmentation over time.
let compact_num_block_hint = std::cmp::min(
imperfect_count,
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
);
info!("set compact_num_block_hint to {compact_num_block_hint }");
self.ctx
.set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint);
}

// merge statistics will set the additional_stats_meta to none,
// so reset additional_stats_meta here.
let table_statistics_location = table_stats_gen.table_statistics_location();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ pub use conflict_resolve_context::SnapshotChanges;
pub use conflict_resolve_context::SnapshotMerged;
pub use mutation_generator::MutationGenerator;
pub use snapshot_generator::decorate_snapshot;
pub(crate) use snapshot_generator::set_compaction_num_block_hint;
pub use snapshot_generator::SnapshotGenerator;
pub use truncate_generator::TruncateGenerator;
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
use std::any::Any;
use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::TableSchema;
use databend_common_meta_app::schema::TableInfo;
use databend_storages_common_session::TxnManagerRef;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::info;

use crate::operations::common::ConflictResolveContext;
use crate::statistics::TableStatsGenerator;
Expand Down Expand Up @@ -92,3 +95,39 @@ pub fn decorate_snapshot(
}
Ok(())
}

pub(crate) fn set_compaction_num_block_hint(
ctx: &dyn TableContext,
table_name: &str,
summary: &Statistics,
) {
if let Err(e) = try_set_compaction_num_block_hint(ctx, table_name, summary) {
log::warn!("set_compaction_num_block_hint failed: {}", e);
}
}

pub(crate) fn try_set_compaction_num_block_hint(
ctx: &dyn TableContext,
table_name: &str,
summary: &Statistics,
) -> Result<()> {
// check if need to auto compact
// the algorithm is: if the number of imperfect blocks is greater than the threshold, then auto compact.
// the threshold is set by the setting `auto_compaction_imperfect_blocks_threshold`, default is 25.
let imperfect_count = summary.block_count - summary.perfect_block_count;
let auto_compaction_imperfect_blocks_threshold = ctx
.get_settings()
.get_auto_compaction_imperfect_blocks_threshold()?;
if imperfect_count >= auto_compaction_imperfect_blocks_threshold {
// If imperfect_count is larger, SLIGHTLY increase the number of blocks
// eligible for auto-compaction, this adjustment is intended to help reduce
// fragmentation over time.
let compact_num_block_hint = std::cmp::min(
imperfect_count,
(auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64,
);
info!("set compact_num_block_hint to {compact_num_block_hint }");
ctx.set_compaction_num_block_hint(table_name, compact_num_block_hint);
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use log::error;
use log::info;

use crate::operations::set_backoff;
use crate::operations::set_compaction_num_block_hint;
use crate::operations::AppendGenerator;
use crate::operations::CommitMeta;
use crate::operations::SnapshotGenerator;
Expand Down Expand Up @@ -272,15 +273,21 @@ async fn build_update_table_meta_req(
let table_stats_gen = fuse_table
.generate_table_stats(&previous, insert_hll, insert_rows)
.await?;
let table_info = table.get_table_info();
let snapshot = snapshot_generator.generate_new_snapshot(
table.get_table_info(),
table_info,
fuse_table.cluster_key_id(),
previous,
txn_mgr,
table_meta_timestamps,
table_stats_gen,
)?;
snapshot.ensure_segments_unique()?;
set_compaction_num_block_hint(
snapshot_generator.ctx.as_ref(),
table_info.name.as_str(),
&snapshot.summary,
);

// write snapshot
let dal = fuse_table.get_operator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use opendal::Operator;

use crate::io::TableMetaLocationGenerator;
use crate::operations::set_backoff;
use crate::operations::set_compaction_num_block_hint;
use crate::operations::vacuum::vacuum_table;
use crate::operations::AppendGenerator;
use crate::operations::CommitMeta;
Expand Down Expand Up @@ -442,6 +443,11 @@ where F: SnapshotGenerator + Send + Sync + 'static
table_stats_gen,
) {
Ok(snapshot) => {
set_compaction_num_block_hint(
self.ctx.as_ref(),
table_info.name.as_str(),
&snapshot.summary,
);
self.state = State::TryCommit {
data: snapshot.to_bytes()?,
snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
statement ok
set enable_compact_after_write = 1;

statement ok
set auto_compaction_imperfect_blocks_threshold = 2;

statement ok
create table t1(a int) change_tracking = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ query T
show statistics from table db_09_0020.t2;
----
db_09_0020 t2 a 3 3 2 0 2 4 4 (empty)
db_09_0020 t2 b 3 3 2 0 2 4 4 (empty)
db_09_0020 t2 b 3 3 3 0 2 4 4 (empty)

statement ok
analyze table t2;
Expand All @@ -209,12 +209,12 @@ query T
show statistics from table db_09_0020.t2;
----
db_09_0020 t2 a 3 3 2 0 2 4 4 [bucket id: 0, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 1, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 2, min: "4", max: "4", ndv: 1.0, count: 1.0]
db_09_0020 t2 b 3 3 2 0 2 4 4 [bucket id: 0, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 1, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 2, min: "4", max: "4", ndv: 1.0, count: 1.0]
db_09_0020 t2 b 3 3 3 0 2 4 4 [bucket id: 0, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 1, min: "2", max: "2", ndv: 1.0, count: 1.0], [bucket id: 2, min: "4", max: "4", ndv: 1.0, count: 1.0]

query I
select count() from fuse_snapshot('db_09_0020','t2');
----
6
5

statement ok
DROP TABLE t2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,12 +580,8 @@ merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1
----
5

# By default setting, all rows merged from `cluster_source` will be resident in a single block of `cluster_target`,
# as table `cluster_target` is clustered by `(a,b)`, the rows inside the one block are assumed to be sorted
# by `(a, b)`, consequently, the result of the following query should be ordered by `(a,b)` without an explicit
# `order by` clause.
query TTT
select * from cluster_target;
select * from cluster_target order by a, b;
----
1 a 2
2 a 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ CREATE DATABASE issue_13610
statement ok
USE issue_13610

statement ok
set auto_compaction_imperfect_blocks_threshold = 2;

statement ok
create table t(a int);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t

## without order by
query TTT
select * from t1_separate;
select * from t1_separate order by a;
----
1 a5 b5
3 a6 b6
Expand All @@ -129,7 +129,7 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t
4

query TTT
select * from t1_separate;
select * from t1_separate order by a;
----
1 a5 b5
3 a6 b6
Expand All @@ -145,14 +145,8 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t
----
2 4

query T
select count(*) from fuse_block('db','t1_separate');
----
1

## we will do compact
query TTT
select * from t1_separate;
select * from t1_separate order by a;
----
1 a5 b5
3 a6 b6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,8 @@ merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1
----
5

# By default setting, all rows merged from `cluster_source` will be resident in a single block of `cluster_target`,
# as table `cluster_target` is clustered by `(a,b)`, the rows inside the one block are assumed to be sorted
# by `(a, b)`, consequently, the result of the following query should be ordered by `(a,b)` without an explicit
# `order by` clause.
query TTT
select * from cluster_target;
select * from cluster_target order by a,b;
----
1 a 2
2 a 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ insert into t values(1);
statement ok
insert into t values(1);


# fourth block(after compaction)
statement ok
set auto_compaction_segments_limit = 2;

statement ok
insert into t values(1);

Expand All @@ -63,7 +58,6 @@ replace into t on(c) values(2);
query III
select segment_count , block_count , row_count from fuse_snapshot('i15760', 't') limit 20;
----
2 4 11
3 5 11
2 4 10
1 3 9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ merge into test_merge as tba using (select * from (values('1','add','11'),('4','
1

query ITI
select * from test_merge;
select * from test_merge order by col1;
----
2 abc 2
3 abc 3
Expand Down
Loading
Loading