Skip to content

Commit

Permalink
feat(compaction): default new compaction group for new table (#19080)
Browse files Browse the repository at this point in the history
Co-authored-by: zwang28 <84491488@qq.com>
  • Loading branch information
Li0k and zwang28 authored Nov 6, 2024
1 parent a945f52 commit 9a32e75
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 131 deletions.
20 changes: 11 additions & 9 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl GlobalBarrierWorkerContextImpl {
if is_first_time {
commit_info
.new_table_fragment_infos
.push(NewTableFragmentInfo::NewCompactionGroup {
.push(NewTableFragmentInfo {
table_ids: tables_to_commit,
});
};
Expand Down Expand Up @@ -1747,14 +1747,16 @@ fn collect_commit_epoch_info(
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
let table_fragments = &info.table_fragments;
vec![NewTableFragmentInfo::Normal {
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
}]
let mut table_ids: HashSet<_> = table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect();
if let Some(mv_table_id) = table_fragments.mv_table_id() {
table_ids.insert(TableId::new(mv_table_id));
}

vec![NewTableFragmentInfo { table_ids }]
} else {
vec![]
};
Expand Down
120 changes: 44 additions & 76 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::catalog::TableId;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
use risingwave_hummock_sdk::compaction_group::group_split::split_sst_with_table_ids;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, to_prost_table_stats_map, PbTableStatsMap,
Expand Down Expand Up @@ -49,14 +48,8 @@ use crate::hummock::{
commit_multi_var_with_provided_txn, start_measure_real_process_timer, HummockManager,
};

pub enum NewTableFragmentInfo {
Normal {
mv_table_id: Option<TableId>,
internal_table_ids: Vec<TableId>,
},
NewCompactionGroup {
table_ids: HashSet<TableId>,
},
pub struct NewTableFragmentInfo {
pub table_ids: HashSet<TableId>,
}

#[derive(Default)]
Expand Down Expand Up @@ -124,73 +117,48 @@ impl HummockManager {
let mut compaction_group_config: Option<Arc<CompactionConfig>> = None;

// Add new table
for new_table_fragment_info in new_table_fragment_infos {
match new_table_fragment_info {
NewTableFragmentInfo::Normal {
mv_table_id,
internal_table_ids,
} => {
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

on_handle_add_new_table(
state_table_info,
&mv_table_id,
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}
NewTableFragmentInfo::NewCompactionGroup { table_ids } => {
let (compaction_group_manager, compaction_group_config) =
if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
(
compaction_group_manager,
(*compaction_group_config
.as_ref()
.expect("must be set with compaction_group_manager_txn"))
.clone(),
)
} else {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let new_compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
compaction_group_config = Some(new_compaction_group_config.clone());
(
compaction_group_manager_txn.insert(
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
),
),
new_compaction_group_config,
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups
.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}
}
for NewTableFragmentInfo { table_ids } in new_table_fragment_infos {
let (compaction_group_manager, compaction_group_config) =
if let Some(compaction_group_manager) = &mut compaction_group_manager_txn {
(
compaction_group_manager,
(*compaction_group_config
.as_ref()
.expect("must be set with compaction_group_manager_txn"))
.clone(),
)
} else {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let new_compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
compaction_group_config = Some(new_compaction_group_config.clone());
(
compaction_group_manager_txn.insert(
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
),
),
new_compaction_group_config,
)
};
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
new_compaction_groups.insert(new_compaction_group_id, compaction_group_config.clone());
compaction_group_manager.insert(
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: compaction_group_config,
},
);

on_handle_add_new_table(
state_table_info,
&table_ids,
new_compaction_group_id,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}

let commit_sstables = self
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,12 @@ impl HummockMetaClient for MockHummockMetaClient {
{
vec![]
} else {
vec![NewTableFragmentInfo::Normal {
mv_table_id: None,
internal_table_ids: commit_table_ids
vec![NewTableFragmentInfo {
table_ids: commit_table_ids
.iter()
.cloned()
.map(TableId::from)
.collect_vec(),
.collect(),
}]
};

Expand Down
51 changes: 18 additions & 33 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_common::range::RangeBoundsExt;
use risingwave_common::util::epoch::{test_epoch, EpochExt, INVALID_EPOCH};
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{
gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN,
};
Expand All @@ -36,6 +35,7 @@ use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::{EpochWithGap, LocalSstableInfo};
use risingwave_meta::hummock::test_utils::get_compaction_group_id_by_table_id;
use risingwave_meta::hummock::{CommitEpochInfo, NewTableFragmentInfo};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
Expand Down Expand Up @@ -2635,20 +2635,20 @@ async fn test_commit_multi_epoch() {
commit_epoch(
epoch1,
sst1_epoch1.clone(),
vec![NewTableFragmentInfo::Normal {
mv_table_id: None,
internal_table_ids: vec![existing_table_id],
vec![NewTableFragmentInfo {
table_ids: HashSet::from_iter([existing_table_id]),
}],
&[existing_table_id],
)
.await;

let old_cg_id_set: HashSet<_> = {
let cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), existing_table_id.table_id())
.await;

{
let version = test_env.manager.get_current_version().await;
let cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let cg = version.levels.get(&(cg_id)).unwrap();
let sub_levels = &cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 1);
let sub_level = &sub_levels[0];
Expand All @@ -2661,13 +2661,8 @@ async fn test_commit_multi_epoch() {
.get(&existing_table_id)
.unwrap();
assert_eq!(epoch1, info.committed_epoch);
assert_eq!(
StaticCompactionGroupId::StateDefault as u64,
info.compaction_group_id
);

version.levels.keys().cloned().collect()
};
assert_eq!(cg_id, info.compaction_group_id);
}

let sst1_epoch2 = SstableInfo {
sst_id: 22,
Expand All @@ -2684,10 +2679,7 @@ async fn test_commit_multi_epoch() {

{
let version = test_env.manager.get_current_version().await;
let cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let cg = version.levels.get(&(cg_id)).unwrap();
let sub_levels = &cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 2);
let sub_level = &sub_levels[0];
Expand All @@ -2703,10 +2695,7 @@ async fn test_commit_multi_epoch() {
.get(&existing_table_id)
.unwrap();
assert_eq!(epoch2, info.committed_epoch);
assert_eq!(
StaticCompactionGroupId::StateDefault as u64,
info.compaction_group_id
);
assert_eq!(cg_id, info.compaction_group_id);
};

let new_table_id = TableId::new(2);
Expand All @@ -2723,7 +2712,7 @@ async fn test_commit_multi_epoch() {
commit_epoch(
epoch1,
sst2_epoch1.clone(),
vec![NewTableFragmentInfo::NewCompactionGroup {
vec![NewTableFragmentInfo {
table_ids: HashSet::from_iter([new_table_id]),
}],
&[new_table_id],
Expand All @@ -2732,10 +2721,9 @@ async fn test_commit_multi_epoch() {

let new_cg_id = {
let version = test_env.manager.get_current_version().await;
let new_cg_id_set: HashSet<_> = version.levels.keys().cloned().collect();
let added_cg_id_set = &new_cg_id_set - &old_cg_id_set;
assert_eq!(added_cg_id_set.len(), 1);
let new_cg_id = added_cg_id_set.into_iter().next().unwrap();
let new_cg_id =
get_compaction_group_id_by_table_id(test_env.manager.clone(), new_table_id.table_id())
.await;

let new_cg = version.levels.get(&new_cg_id).unwrap();
let sub_levels = &new_cg.l0.sub_levels;
Expand Down Expand Up @@ -2801,10 +2789,7 @@ async fn test_commit_multi_epoch() {

{
let version = test_env.manager.get_current_version().await;
let old_cg = version
.levels
.get(&(StaticCompactionGroupId::StateDefault as _))
.unwrap();
let old_cg = version.levels.get(&cg_id).unwrap();
let sub_levels = &old_cg.l0.sub_levels;
assert_eq!(sub_levels.len(), 3);
let sub_level1 = &sub_levels[0];
Expand Down
25 changes: 16 additions & 9 deletions src/tests/simulation/tests/integration_tests/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,25 @@ async fn test_vnode_watermark_reclaim_impl(
.parse::<u64>()
.unwrap();

// Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables.
cluster.split_compaction_group(2, table_id).await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
let compaction_group_id = session
.run(format!(
"SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;",
table_id
))
async fn compaction_group_id_by_table_id(session: &mut Session, table_id: u64) -> u64 {
session
.run(format!(
"SELECT id FROM rw_hummock_compaction_group_configs where member_tables @> '[{}]'::jsonb;",
table_id
))
.await
.unwrap()
.parse::<u64>()
.unwrap()
}
let original_compaction_group_id = compaction_group_id_by_table_id(session, table_id).await;
// Move the table to a dedicated group to prevent its vnode watermark from being reclaimed during the compaction of other tables.
cluster
.split_compaction_group(original_compaction_group_id, table_id)
.await
.unwrap()
.parse::<u64>()
.unwrap();
let compaction_group_id = compaction_group_id_by_table_id(session, table_id).await;

session
.run("INSERT INTO t2 VALUES (now(), 1);")
Expand Down

0 comments on commit 9a32e75

Please sign in to comment.