diff --git a/benchmark/clickbench/merge_into/create_local.sql b/benchmark/clickbench/merge_into/create_local.sql index 7a9d7d2f80044..263a08da354f0 100644 --- a/benchmark/clickbench/merge_into/create_local.sql +++ b/benchmark/clickbench/merge_into/create_local.sql @@ -1,7 +1,7 @@ drop table if exists target_table; drop table if exists source_table; -CREATE TRANSIENT TABLE IF NOT EXISTS source_table ( +CREATE TABLE IF NOT EXISTS source_table ( l_orderkey BIGINT not null, l_partkey BIGINT not null, l_suppkey BIGINT not null, @@ -18,7 +18,7 @@ CREATE TRANSIENT TABLE IF NOT EXISTS source_table ( l_shipinstruct STRING not null, l_shipmode STRING not null, l_comment STRING not null -) CLUSTER BY(l_shipdate, l_orderkey); +) ENGINE = RANDOM; CREATE TRANSIENT TABLE IF NOT EXISTS target_table ( l_orderkey BIGINT not null, diff --git a/benchmark/clickbench/merge_into/load.sql b/benchmark/clickbench/merge_into/load.sql index 4773d8f13d04d..2413a105b0861 100644 --- a/benchmark/clickbench/merge_into/load.sql +++ b/benchmark/clickbench/merge_into/load.sql @@ -7,7 +7,7 @@ ANALYZE TABLE source_table; -- load data to target_table, it's almost 6000000 rows in source COPY INTO target_table -FROM 'fs:///tmp/data/lineitem2.tbl' PATTERN = 'lineitem.tbl2.*' FILE_FORMAT =( +FROM 'fs:///tmp/data/lineitem.tbl' PATTERN = 'lineitem.tbl.*' FILE_FORMAT =( type = 'CSV' field_delimiter = '|' record_delimiter = '\n' skip_header = 0 ); diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index ea7d5ba70e90e..8bab296fd4230 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -324,6 +324,7 @@ impl MergeIntoInterpreter { .into_iter() .enumerate() .collect(), + snapshot_loc: fuse_table.snapshot_loc().await?, }); // build mutation_aggregate diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index f574ef453e3fe..f3139672e4e6b 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -57,6 +57,8 @@ use storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS; use storages_common_table_meta::table::OPT_KEY_COMMENT; use storages_common_table_meta::table::OPT_KEY_DATABASE_ID; use storages_common_table_meta::table::OPT_KEY_ENGINE; +use storages_common_table_meta::table::OPT_KEY_EXTERNAL_LOCATION; +use storages_common_table_meta::table::OPT_KEY_LEGACY_SNAPSHOT_LOC; use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT; use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX; @@ -366,7 +368,8 @@ pub static CREATE_TABLE_OPTIONS: Lazy> = Lazy::new(|| { r.insert(FUSE_OPT_KEY_ROW_PER_BLOCK); r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD); r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD); - + r.insert(OPT_KEY_SNAPSHOT_LOCATION); + r.insert(OPT_KEY_LEGACY_SNAPSHOT_LOC); r.insert(OPT_KEY_BLOOM_INDEX_COLUMNS); r.insert(OPT_KEY_TABLE_COMPRESSION); r.insert(OPT_KEY_STORAGE_FORMAT); diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 5ff201c0e78d8..f8bf647b14a91 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -434,6 +434,7 @@ impl PipelineBuilder { field_index_of_input_schema, row_id_idx, segments, + snapshot_loc, } = merge_into; self.build_pipeline(input)?; @@ -618,6 +619,7 @@ impl PipelineBuilder { block_builder, io_request_semaphore, segments.clone(), + snapshot_loc.clone(), )?); for _ in 0..self.main_pipeline.output_len() - 1 { diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index 8330c44cee07d..afe429ca9761e 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -46,4 +46,5 @@ pub struct MergeInto { pub field_index_of_input_schema: HashMap, pub row_id_idx: usize, pub segments: Vec<(usize, Location)>, + pub snapshot_loc: Option, } diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 0724a322df604..07bed1165dd43 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -77,6 +77,7 @@ impl FuseTable { block_builder: BlockBuilder, io_request_semaphore: Arc, segment_locations: Vec<(SegmentIndex, Location)>, + snapshot_loc: Option, ) -> Result { let read_settings = ReadSettings::from_ctx(&ctx)?; let aggregator = MatchedAggregator::create( @@ -88,6 +89,7 @@ impl FuseTable { block_builder, io_request_semaphore, segment_locations, + snapshot_loc, )?; Ok(aggregator.into_pipe_item()) } diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index bbdb8fbec0b3a..13a81016b2ff8 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -81,6 +81,7 @@ pub struct MatchedAggregator { segment_locations: AHashMap, block_mutation_row_offset: HashMap, HashSet)>, aggregation_ctx: Arc, + snapshot_loc: Option, } impl MatchedAggregator { @@ -94,6 +95,7 @@ impl MatchedAggregator { block_builder: BlockBuilder, io_request_semaphore: Arc, segment_locations: Vec<(SegmentIndex, Location)>, + snapshot_loc: Option, ) -> Result { let segment_reader = MetaReaders::segment_info_reader(data_accessor.clone(), target_table_schema.clone()); @@ -122,6 +124,7 @@ impl MatchedAggregator { segment_reader, block_mutation_row_offset: HashMap::new(), segment_locations: AHashMap::from_iter(segment_locations.into_iter()), + snapshot_loc, }) } @@ -169,6 +172,42 @@ impl MatchedAggregator { Ok(()) } + fn check_segments( + snapshot_loc: Option, + segment_infos: &HashMap, + block_mutation_row_offset: &HashMap, HashSet)>, + ) { + println!("SNAPSHOT_LOCATION:({:?})", snapshot_loc); + for prefix in segment_infos.keys().sorted() { + println!("=========== segment_info: ============"); + println!( + "prefix: {:?} , segment_info:{:?},\n blocks_len:{:?}", + prefix, + segment_infos.get(prefix).unwrap().blocks, + segment_infos.get(prefix).unwrap().blocks.len() + ); + for (blk_idx, block_meta) in + segment_infos.get(prefix).unwrap().blocks.iter().enumerate() + { + println!( + "prefix: rows {:?},blk_idx:{:?},rows:{:?}", + prefix, blk_idx, block_meta.row_count + ); + } + println!("=========== segment_info: ============"); + } + println!("\n\n\n"); + for prefix in block_mutation_row_offset.keys().sorted() { + println!("\n\n\n=========== blocks_mutation: ============"); + let (segment_idx, block_idx) = split_prefix(*prefix); + println!( + "from prefix ==> segment_idx:{:?},block_idx:{:?}", + segment_idx, block_idx + ); + println!("=========== blocks_mutation: ============"); + } + } + #[async_backtrace::framed] pub async fn apply(&mut self) -> Result> { let start = Instant::now(); @@ -215,7 +254,23 @@ impl MatchedAggregator { "merge into apply: segment_idx:{},blk_idx:{}", segment_idx, block_idx ); + + let old_block_idx = block_idx; let block_idx = segment_info.blocks.len() - block_idx as usize - 1; + if block_idx >= segment_info.blocks.len() { + println!( + "blocks.len():{:?}, old_block_idx:{:?}, new_block_idx:{:?},segments_info_len:{:?}\n\n", + segment_info.blocks.len(), + old_block_idx, + block_idx, + segment_infos.len() + ); + Self::check_segments( + self.snapshot_loc.clone(), + &segment_infos, + &self.block_mutation_row_offset, + ); + } assert!(block_idx < segment_info.blocks.len()); // the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta() let block_meta = segment_info.blocks[block_idx].clone();