Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stat to Remove action #633

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ pub struct Add {
/// in the added file must be contained in one or more remove actions in the same version.
pub data_change: bool,

/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
Expand Down Expand Up @@ -442,6 +442,12 @@ struct Remove {
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file encoded as a JSON string.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,
Expand Down Expand Up @@ -633,6 +639,7 @@ mod tests {
StructField::new("extendedFileMetadata", DataType::BOOLEAN, true),
partition_values_field(),
StructField::new("size", DataType::LONG, true),
StructField::new("stats", DataType::STRING, true),
tags_field(),
deletion_vector_field(),
StructField::new("baseRowId", DataType::LONG, true),
Expand Down
101 changes: 87 additions & 14 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl RemoveVisitor {
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Remove> {
require!(
getters.len() == 14,
getters.len() == 15,
Error::InternalError(format!(
"Wrong number of RemoveVisitor getters: {}",
getters.len()
Expand All @@ -267,25 +267,29 @@ impl RemoveVisitor {
let extended_file_metadata: Option<bool> =
getters[3].get_opt(row_index, "remove.extendedFileMetadata")?;

// TODO(nick) handle partition values in getters[4]
let partition_values: Option<HashMap<_, _>> =
getters[4].get_opt(row_index, "remove.partitionValues")?;

let size: Option<i64> = getters[5].get_opt(row_index, "remove.size")?;

// TODO(nick) tags are skipped in getters[6]
let stats: Option<String> = getters[6].get_opt(row_index, "remove.stats")?;

let deletion_vector = visit_deletion_vector_at(row_index, &getters[7..])?;
// TODO(nick) tags are skipped in getters[7]

let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..])?;

let base_row_id: Option<i64> = getters[12].get_opt(row_index, "remove.baseRowId")?;
let base_row_id: Option<i64> = getters[13].get_opt(row_index, "remove.baseRowId")?;
let default_row_commit_version: Option<i64> =
getters[13].get_opt(row_index, "remove.defaultRowCommitVersion")?;
getters[14].get_opt(row_index, "remove.defaultRowCommitVersion")?;

Ok(Remove {
path,
data_change,
deletion_timestamp,
extended_file_metadata,
partition_values: None,
partition_values,
size,
stats,
tags: None,
deletion_vector,
base_row_id,
Expand All @@ -305,10 +309,9 @@ impl RowVisitor for RemoveVisitor {
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
for i in 0..row_count {
// Since path column is required, use it to detect presence of an Remove action
// Since path column is required, use it to detect presence of a Remove action
if let Some(path) = getters[0].get_opt(i, "remove.path")? {
self.removes.push(Self::visit_remove(i, path, getters)?);
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah I didn't realize the break was there from the beginning

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this has been dead code for a while, but what in the world? 🤦

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good find lol!

}
}
Ok(())
Expand Down Expand Up @@ -603,11 +606,7 @@ mod tests {
modification_time: 1670892998135,
data_change: true,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()),
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
..Default::default()
};
let add2 = Add {
path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(),
Expand All @@ -630,11 +629,85 @@ mod tests {
..add1.clone()
};
let expected = vec![add1, add2, add3];
assert_eq!(add_visitor.adds.len(), expected.len());
for (add, expected) in add_visitor.adds.into_iter().zip(expected.into_iter()) {
assert_eq!(add, expected);
}
}

#[test]
fn test_parse_remove() {
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let json_strings: StringArray = vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"DELETE","operationParameters":{"mode":"Append"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"remove":{"path":"part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"size":452,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();
let expected_remove: Remove = Remove {
path: "part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(),
deletion_timestamp: Some(1670892998135),
data_change: true,
size: Some(452),
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}".into()),
..Default::default()
};
assert_eq!(
remove_visitor.removes.len(),
1,
"Unexpected number of removal actions"
);
assert_eq!(
remove_visitor.removes[0], expected_remove,
"Unexpected removal action"
);
}

#[test]
fn test_parse_remove_partitioned() {
sebastiantia marked this conversation as resolved.
Show resolved Hide resolved
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let json_strings: StringArray = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#,
]
.into();
let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let mut remove_visitor = RemoveVisitor::default();
remove_visitor.visit_rows_of(batch.as_ref()).unwrap();
let expected_remove = Remove {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet"
.into(),
deletion_timestamp: Some(1670892998135),
data_change: true,
partition_values: Some(HashMap::from([
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
])),
size: Some(452),
..Default::default()
};
assert_eq!(
remove_visitor.removes.len(),
1,
"Unexpected number of removal actions"
);
assert_eq!(
remove_visitor.removes[0], expected_remove,
"Unexpected removal action"
);
}

#[test]
fn test_parse_txn() {
let engine = SyncEngine::new();
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub struct StructType {
pub type_name: String,
/// The type of element stored in this array
// We use indexmap to preserve the order of fields as they are defined in the schema
// while also allowing for fast lookup by name. The atlerative to do a liner search
// while also allowing for fast lookup by name. The alternative to do a linear search
// for each field by name would be potentially quite expensive for large schemas.
pub fields: IndexMap<String, StructField>,
}
Expand Down
1 change: 1 addition & 0 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ async fn dv() {
assert_eq!(sv, &[false, true, true]);
}

// Note: Data skipping does not work on Remove actions.
#[tokio::test]
async fn data_skipping_filter() {
let engine = Arc::new(SyncEngine::new());
Expand Down
Loading