Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Oct 13, 2023
1 parent 4e98a38 commit 89ab1c9
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 4 deletions.
4 changes: 2 additions & 2 deletions benchmark/clickbench/merge_into/create_local.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
drop table if exists target_table;
drop table if exists source_table;

CREATE TRANSIENT TABLE IF NOT EXISTS source_table (
CREATE TABLE IF NOT EXISTS source_table (
l_orderkey BIGINT not null,
l_partkey BIGINT not null,
l_suppkey BIGINT not null,
Expand All @@ -18,7 +18,7 @@ CREATE TRANSIENT TABLE IF NOT EXISTS source_table (
l_shipinstruct STRING not null,
l_shipmode STRING not null,
l_comment STRING not null
) CLUSTER BY(l_shipdate, l_orderkey);
) ENGINE = RANDOM;

CREATE TRANSIENT TABLE IF NOT EXISTS target_table (
l_orderkey BIGINT not null,
Expand Down
2 changes: 1 addition & 1 deletion benchmark/clickbench/merge_into/load.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ANALYZE TABLE source_table;

-- load data to target_table, it's almost 6000000 rows in source
COPY INTO target_table
FROM 'fs:///tmp/data/lineitem2.tbl' PATTERN = 'lineitem.tbl2.*' FILE_FORMAT =(
FROM 'fs:///tmp/data/lineitem.tbl' PATTERN = 'lineitem.tbl.*' FILE_FORMAT =(
type = 'CSV' field_delimiter = '|' record_delimiter = '\n' skip_header = 0
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl MergeIntoInterpreter {
.into_iter()
.enumerate()
.collect(),
snapshot_loc: fuse_table.snapshot_loc().await?,
});

// build mutation_aggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS;
use storages_common_table_meta::table::OPT_KEY_COMMENT;
use storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use storages_common_table_meta::table::OPT_KEY_ENGINE;
use storages_common_table_meta::table::OPT_KEY_EXTERNAL_LOCATION;
use storages_common_table_meta::table::OPT_KEY_LEGACY_SNAPSHOT_LOC;
use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
Expand Down Expand Up @@ -366,7 +368,8 @@ pub static CREATE_TABLE_OPTIONS: Lazy<HashSet<&'static str>> = Lazy::new(|| {
r.insert(FUSE_OPT_KEY_ROW_PER_BLOCK);
r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD);
r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD);

r.insert(OPT_KEY_SNAPSHOT_LOCATION);
r.insert(OPT_KEY_LEGACY_SNAPSHOT_LOC);
r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS);
r.insert(OPT_KEY_TABLE_COMPRESSION);
r.insert(OPT_KEY_STORAGE_FORMAT);
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ impl PipelineBuilder {
field_index_of_input_schema,
row_id_idx,
segments,
snapshot_loc,
} = merge_into;

self.build_pipeline(input)?;
Expand Down Expand Up @@ -618,6 +619,7 @@ impl PipelineBuilder {
block_builder,
io_request_semaphore,
segments.clone(),
snapshot_loc.clone(),
)?);

for _ in 0..self.main_pipeline.output_len() - 1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ pub struct MergeInto {
pub field_index_of_input_schema: HashMap<FieldIndex, usize>,
pub row_id_idx: usize,
pub segments: Vec<(usize, Location)>,
pub snapshot_loc: Option<String>,
}
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl FuseTable {
block_builder: BlockBuilder,
io_request_semaphore: Arc<Semaphore>,
segment_locations: Vec<(SegmentIndex, Location)>,
snapshot_loc: Option<String>,
) -> Result<PipeItem> {
let read_settings = ReadSettings::from_ctx(&ctx)?;
let aggregator = MatchedAggregator::create(
Expand All @@ -88,6 +89,7 @@ impl FuseTable {
block_builder,
io_request_semaphore,
segment_locations,
snapshot_loc,
)?;
Ok(aggregator.into_pipe_item())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct MatchedAggregator {
segment_locations: AHashMap<SegmentIndex, Location>,
block_mutation_row_offset: HashMap<u64, (HashSet<usize>, HashSet<usize>)>,
aggregation_ctx: Arc<AggregationContext>,
snapshot_loc: Option<String>,
}

impl MatchedAggregator {
Expand All @@ -94,6 +95,7 @@ impl MatchedAggregator {
block_builder: BlockBuilder,
io_request_semaphore: Arc<Semaphore>,
segment_locations: Vec<(SegmentIndex, Location)>,
snapshot_loc: Option<String>,
) -> Result<Self> {
let segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), target_table_schema.clone());
Expand Down Expand Up @@ -122,6 +124,7 @@ impl MatchedAggregator {
segment_reader,
block_mutation_row_offset: HashMap::new(),
segment_locations: AHashMap::from_iter(segment_locations.into_iter()),
snapshot_loc,
})
}

Expand Down Expand Up @@ -169,6 +172,42 @@ impl MatchedAggregator {
Ok(())
}

fn check_segments(
snapshot_loc: Option<String>,
segment_infos: &HashMap<SegmentIndex, SegmentInfo>,
block_mutation_row_offset: &HashMap<u64, (HashSet<usize>, HashSet<usize>)>,
) {
println!("SNAPSHOT_LOCATION:({:?})", snapshot_loc);
for prefix in segment_infos.keys().sorted() {
println!("=========== segment_info: ============");
println!(
"prefix: {:?} , segment_info:{:?},\n blocks_len:{:?}",
prefix,
segment_infos.get(prefix).unwrap().blocks,
segment_infos.get(prefix).unwrap().blocks.len()
);
for (blk_idx, block_meta) in
segment_infos.get(prefix).unwrap().blocks.iter().enumerate()
{
println!(
"prefix: rows {:?},blk_idx:{:?},rows:{:?}",
prefix, blk_idx, block_meta.row_count
);
}
println!("=========== segment_info: ============");
}
println!("\n\n\n");
for prefix in block_mutation_row_offset.keys().sorted() {
println!("\n\n\n=========== blocks_mutation: ============");
let (segment_idx, block_idx) = split_prefix(*prefix);
println!(
"from prefix ==> segment_idx:{:?},block_idx:{:?}",
segment_idx, block_idx
);
println!("=========== blocks_mutation: ============");
}
}

#[async_backtrace::framed]
pub async fn apply(&mut self) -> Result<Option<MutationLogs>> {
let start = Instant::now();
Expand Down Expand Up @@ -215,7 +254,23 @@ impl MatchedAggregator {
"merge into apply: segment_idx:{},blk_idx:{}",
segment_idx, block_idx
);

let old_block_idx = block_idx;
let block_idx = segment_info.blocks.len() - block_idx as usize - 1;
if block_idx >= segment_info.blocks.len() {
println!(
"blocks.len():{:?}, old_block_idx:{:?}, new_block_idx:{:?},segments_info_len:{:?}\n\n",
segment_info.blocks.len(),
old_block_idx,
block_idx,
segment_infos.len()
);
Self::check_segments(
self.snapshot_loc.clone(),
&segment_infos,
&self.block_mutation_row_offset,
);
}
assert!(block_idx < segment_info.blocks.len());
// the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta()
let block_meta = segment_info.blocks[block_idx].clone();
Expand Down

0 comments on commit 89ab1c9

Please sign in to comment.