Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iceberg): support eq delete merge on read with hash join #19126

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ message SourceNode {
}

message IcebergScanNode {
enum IcebergScanType {
DATA_SCAN = 0;
EQUALITY_DELETE_SCAN = 1;
}
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
repeated plan_common.ColumnCatalog columns = 1;
map<string, string> with_properties = 2;
repeated bytes split = 3;
map<string, secret.SecretRef> secret_refs = 4;
IcebergScanType iceberg_scan_type = 5;
}

message FileScanNode {
Expand Down
279 changes: 83 additions & 196 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use core::ops::BitAnd;
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
Expand All @@ -23,13 +23,15 @@ use iceberg::spec::TableMetadata;
use iceberg::table::Table;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayImpl, I64Array};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::iceberg::{
IcebergFileScanTaskJsonStrEnum, IcebergProperties, IcebergSplit,
};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand All @@ -42,14 +44,48 @@ use crate::task::BatchTaskContext;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;

pub enum IcebergFileScanTaskEnum {
DataAndPositionDelete(Vec<FileScanTask>, Vec<FileScanTask>),
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
EqualityDelete(Vec<FileScanTask>),
}

impl IcebergFileScanTaskEnum {
fn from_iceberg_file_scan_task_json_str_enum(
iceberg_file_scan_task_json_str_enum: IcebergFileScanTaskJsonStrEnum,
) -> Self {
match iceberg_file_scan_task_json_str_enum {
IcebergFileScanTaskJsonStrEnum::DataAndPositionDelete(
data_file_scan_tasks,
position_delete_file_scan_tasks,
) => IcebergFileScanTaskEnum::DataAndPositionDelete(
data_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
position_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
),
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_file_scan_tasks) => {
IcebergFileScanTaskEnum::EqualityDelete(
equality_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
}
}
}

pub struct IcebergScanExecutor {
iceberg_config: IcebergProperties,
#[allow(dead_code)]
snapshot_id: Option<i64>,
table_meta: TableMetadata,
data_file_scan_tasks: Vec<FileScanTask>,
equality_delete_file_scan_tasks: Vec<FileScanTask>,
position_delete_file_scan_tasks: Vec<FileScanTask>,
file_scan_tasks: Option<IcebergFileScanTaskEnum>,
batch_size: usize,
schema: Schema,
identity: String,
Expand All @@ -75,9 +111,7 @@ impl IcebergScanExecutor {
iceberg_config: IcebergProperties,
snapshot_id: Option<i64>,
table_meta: TableMetadata,
data_file_scan_tasks: Vec<FileScanTask>,
equality_delete_file_scan_tasks: Vec<FileScanTask>,
position_delete_file_scan_tasks: Vec<FileScanTask>,
file_scan_tasks: IcebergFileScanTaskEnum,
batch_size: usize,
schema: Schema,
identity: String,
Expand All @@ -87,11 +121,9 @@ impl IcebergScanExecutor {
iceberg_config,
snapshot_id,
table_meta,
data_file_scan_tasks,
equality_delete_file_scan_tasks,
position_delete_file_scan_tasks,
batch_size,
schema,
file_scan_tasks: Some(file_scan_tasks),
identity,
metrics,
}
Expand All @@ -104,27 +136,33 @@ impl IcebergScanExecutor {
.load_table_v2_with_metadata(self.table_meta)
.await?;
let data_types = self.schema.data_types();
let executor_schema_names = self.schema.names();
let table_name = table.identifier().name().to_string();

let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

let mut position_delete_filter = PositionDeleteFilter::new(
mem::take(&mut self.position_delete_file_scan_tasks),
&data_file_scan_tasks,
&table,
self.batch_size,
)
.await?;
let mut equality_delete_filter = EqualityDeleteFilter::new(
mem::take(&mut self.equality_delete_file_scan_tasks),
&table,
self.batch_size,
executor_schema_names,
)
.await?;

// Delete rows in the data file that need to be deleted by map
let (mut position_delete_filter, data_file_scan_tasks) =
match Option::take(&mut self.file_scan_tasks) {
Some(IcebergFileScanTaskEnum::DataAndPositionDelete(
data_file_scan_tasks,
position_delete_file_scan_tasks,
)) => (
Some(
PositionDeleteFilter::new(
position_delete_file_scan_tasks,
&data_file_scan_tasks,
&table,
self.batch_size,
)
.await?,
),
data_file_scan_tasks,
),
Some(IcebergFileScanTaskEnum::EqualityDelete(equality_delete_file_scan_tasks)) => {
(None, equality_delete_file_scan_tasks)
}
None => {
bail!("file_scan_tasks must be Some")
}
};

let mut read_bytes = 0;
let _metrics_report_guard = scopeguard::guard(
(read_bytes, table_name, self.metrics.clone()),
Expand All @@ -138,13 +176,10 @@ impl IcebergScanExecutor {
}
},
);

for data_file_scan_task in data_file_scan_tasks {
let data_file_path = data_file_scan_task.data_file_path.clone();
let data_sequence_number = data_file_scan_task.sequence_number;

equality_delete_filter.apply_data_file_scan_task(&data_file_scan_task);

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
Expand All @@ -157,15 +192,23 @@ impl IcebergScanExecutor {
let record_batch = record_batch?;

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
let (mut columns, visibility) = chunk.into_parts();
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
vec![data_sequence_number; visibility.len()],
))));
let mut chunk = DataChunk::from_parts(columns.into(), visibility);

// position delete
let chunk = position_delete_filter.filter(&data_file_path, chunk, index);
// equality delete
let chunk = equality_delete_filter.filter(chunk, data_sequence_number)?;
if let Some(position_delete_filter) = &mut position_delete_filter {
chunk = position_delete_filter.filter(&data_file_path, chunk, index);
}
assert_eq!(chunk.data_types(), data_types);
read_bytes += chunk.estimated_heap_size() as u64;
yield chunk;
}
position_delete_filter.remove_file_path(&data_file_path);
if let Some(position_delete_filter) = &mut position_delete_filter {
position_delete_filter.remove_file_path(&data_file_path);
}
}
}
}
Expand Down Expand Up @@ -223,17 +266,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
iceberg_properties,
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
split
.equality_delete_files
.into_iter()
.map(|x| x.deserialize())
.collect(),
split
.position_delete_files
.into_iter()
.map(|x| x.deserialize())
.collect(),
IcebergFileScanTaskEnum::from_iceberg_file_scan_task_json_str_enum(split.files),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
Expand Down Expand Up @@ -334,149 +367,3 @@ impl PositionDeleteFilter {
self.position_delete_file_path_pos_map.remove(file_path);
}
}

struct EqualityDeleteFilter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the following logics shifted elsewhere? Or are they invalidated for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this pr, this logic is no longer valid and we have implemented the same functionality using hash join exec

// The `seq_num` corresponding to each row in the equality delete file
equality_delete_rows_seq_num_map: HashMap<OwnedRow, i64>,
// The field ids of the equality delete columns
equality_delete_ids: Option<Vec<i32>>,
// In chunk, the indexes of the equality delete columns
equality_delete_column_idxes: Option<Vec<usize>>,
// The schema of the data file, which is the intersection of the output shema and the equality delete columns
data_chunk_column_names: Option<Vec<String>>,
// Column names for the output schema so that columns can be trimmed after filter
executor_schema_names: Vec<String>,
}

impl EqualityDeleteFilter {
async fn new(
equality_delete_file_scan_tasks: Vec<FileScanTask>,
table: &Table,
batch_size: usize,
executor_schema_names: Vec<String>,
) -> crate::error::Result<Self> {
let mut equality_delete_rows_seq_num_map: HashMap<OwnedRow, i64> = HashMap::default();

// Build hash map for equality delete files
// Currently, all equality delete files have the same schema which is guaranteed by `IcebergSplitEnumerator`.
let mut equality_delete_ids: Option<Vec<_>> = None;
for equality_delete_file_scan_task in equality_delete_file_scan_tasks {
let mut sequence_number = equality_delete_file_scan_task.sequence_number;

if equality_delete_ids.is_none() {
equality_delete_ids =
Some(equality_delete_file_scan_task.project_field_ids.clone());
} else {
debug_assert_eq!(
equality_delete_ids.as_ref().unwrap(),
&equality_delete_file_scan_task.project_field_ids
);
}

let reader = table.reader_builder().with_batch_size(batch_size).build();
let delete_file_scan_stream = tokio_stream::once(Ok(equality_delete_file_scan_task));

let mut delete_record_batch_stream = reader.read(Box::pin(delete_file_scan_stream))?;

while let Some(record_batch) = delete_record_batch_stream.next().await {
let record_batch = record_batch?;

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for row in chunk.rows() {
let entry = equality_delete_rows_seq_num_map
.entry(row.to_owned_row())
.or_default();
*entry = *entry.max(&mut sequence_number);
}
}
}
Ok(Self {
equality_delete_rows_seq_num_map,
equality_delete_ids,
equality_delete_column_idxes: None,
data_chunk_column_names: None,
executor_schema_names,
})
}

fn apply_data_file_scan_task(&mut self, data_file_scan_task: &FileScanTask) {
if let Some(equality_delete_ids) = &self.equality_delete_ids {
self.data_chunk_column_names = Some(
data_file_scan_task
.project_field_ids
.iter()
.filter_map(|id| {
data_file_scan_task
.schema
.name_by_field_id(*id)
.map(|name| name.to_string())
})
.collect(),
);
// eq_delete_column_idxes are used to fetch equality delete columns from data files.
self.equality_delete_column_idxes = Some(
equality_delete_ids
.iter()
.map(|equality_delete_id| {
data_file_scan_task
.project_field_ids
.iter()
.position(|project_field_id| equality_delete_id == project_field_id)
.expect("equality_delete_id not found in delete_equality_ids")
})
.collect_vec(),
);
}
}

fn filter(
&self,
mut chunk: DataChunk,
data_sequence_number: i64,
) -> crate::error::Result<DataChunk> {
chunk = chunk.compact();
match self.equality_delete_column_idxes.as_ref() {
Some(delete_column_ids) => {
let new_visibility = Bitmap::from_iter(
// Project with the schema of the delete file
chunk.project(delete_column_ids).rows().map(|row_ref| {
let row = row_ref.to_owned_row();
if let Some(delete_sequence_number) =
self.equality_delete_rows_seq_num_map.get(&row)
&& delete_sequence_number > &data_sequence_number
{
// delete_sequence_number > data_sequence_number means the delete file is written later than data file,
// so it needs to be deleted
false
} else {
true
}
}),
)
.clone();
let Some(ref data_chunk_column_names) = self.data_chunk_column_names else {
bail!("data_chunk_column_names is not set")
};

// Keep the schema consistent(chunk and executor)
// Filter out (equality delete) columns that are not in the executor schema
let (data, old_visibility) = chunk.into_parts_v2();
let data = data
.iter()
.zip_eq_fast(data_chunk_column_names)
.filter_map(|(array, columns)| {
if self.executor_schema_names.contains(columns) {
Some(array.clone())
} else {
None
}
})
.collect_vec();
let chunk = DataChunk::new(data, old_visibility.bitand(new_visibility));
Ok(chunk)
}
// If there is no delete file, the data file is directly output
None => Ok(chunk),
}
}
}
Loading
Loading