Skip to content

Commit c2ba373

Browse files
authored
Port addition of _file column (#10)
* Add REE file column helpers * Add helper tests * Add constants * Add support for _file constant * Update tests * Fix clippy warning * Fix doc test * Track in field ids * Add test * Allow repeated virtual file column selection * Refactor into own transformer step * Revert "Refactor into own transformer step" This reverts commit adf0da0. * Avoid special casing in batch creation * . * Modify record batch transformer to support reserved fields * Add metadata column helper functions * Store fields instead of constants * Add comment * Adapt comment * . * Adapt error message * Consider field_id range * Use REE encoding in record batch transformer * Fix clippy errors * Format * Add `with_file_path_column` helper * Port _file path column changes * Rename field * Adapt metadata column * Rename method * Undo some changes * . * Re-refactor tests * Undo reader test changes * . * Move import * PR comments * PR comments * . * . * Clippy fix
1 parent 21244af commit c2ba373

File tree

10 files changed

+1195
-542
lines changed

10 files changed

+1195
-542
lines changed

crates/iceberg/src/arrow/incremental.rs

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
1918
use std::pin::Pin;
2019
use std::sync::Arc;
2120

2221
use arrow_array::{RecordBatch, UInt64Array};
23-
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
22+
use arrow_schema::Schema as ArrowSchema;
2423
use futures::channel::mpsc::channel;
2524
use futures::stream::select;
2625
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
27-
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2826

2927
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
30-
use crate::arrow::{
31-
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH,
32-
RESERVED_FIELD_ID_POS, StreamsInto,
33-
};
28+
use crate::arrow::{ArrowReader, StreamsInto};
3429
use crate::delete_vector::DeleteVector;
3530
use crate::io::FileIO;
3631
use crate::runtime::spawn;
@@ -43,19 +38,6 @@ use crate::{Error, ErrorKind, Result};
4338
/// Default batch size for incremental delete operations.
4439
const DEFAULT_BATCH_SIZE: usize = 1024;
4540

46-
/// Creates the schema for positional delete records containing the "pos" column.
47-
/// The pos field includes the reserved field ID as metadata.
48-
fn create_pos_delete_schema() -> Arc<ArrowSchema> {
49-
let pos_field =
50-
Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([
51-
(
52-
PARQUET_FIELD_ID_META_KEY.to_string(),
53-
RESERVED_FIELD_ID_POS.to_string(),
54-
),
55-
]));
56-
Arc::new(ArrowSchema::new(vec![pos_field]))
57-
}
58-
5941
/// The type of incremental batch: appended data or deleted records.
6042
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6143
pub enum IncrementalBatchType {
@@ -254,10 +236,15 @@ async fn process_incremental_append_task(
254236
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
255237

256238
// RecordBatchTransformer performs any transformations required on the RecordBatches
257-
// that come back from the file, such as type promotion, default column insertion
258-
// and column re-ordering
239+
// that come back from the file, such as type promotion, default column insertion,
240+
// column re-ordering, and virtual field addition (like _file)
259241
let mut record_batch_transformer =
260-
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids).build();
242+
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
243+
.with_constant(
244+
crate::metadata_columns::RESERVED_FIELD_ID_FILE,
245+
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
246+
)?
247+
.build();
261248

262249
if let Some(batch_size) = batch_size {
263250
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
@@ -295,7 +282,9 @@ fn process_incremental_delete_task(
295282
delete_vector: DeleteVector,
296283
batch_size: Option<usize>,
297284
) -> Result<ArrowRecordBatchStream> {
298-
let schema = create_pos_delete_schema();
285+
let schema = Arc::new(ArrowSchema::new(vec![Arc::clone(
286+
crate::metadata_columns::pos_field(),
287+
)]));
299288

300289
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
301290

@@ -315,14 +304,7 @@ fn process_incremental_delete_task(
315304
"Failed to create RecordBatch for DeleteVector",
316305
)
317306
})
318-
.and_then(|batch| {
319-
ArrowReader::add_file_path_column(
320-
batch,
321-
&file_path,
322-
RESERVED_COL_NAME_FILE_PATH,
323-
RESERVED_FIELD_ID_FILE_PATH,
324-
)
325-
})
307+
.and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path))
326308
});
327309

328310
Ok(Box::pin(stream) as ArrowRecordBatchStream)
@@ -333,7 +315,9 @@ fn process_incremental_deleted_file_task(
333315
total_records: u64,
334316
batch_size: Option<usize>,
335317
) -> Result<ArrowRecordBatchStream> {
336-
let schema = create_pos_delete_schema();
318+
let schema = Arc::new(ArrowSchema::new(vec![Arc::clone(
319+
crate::metadata_columns::pos_field(),
320+
)]));
337321

338322
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
339323

@@ -352,14 +336,7 @@ fn process_incremental_deleted_file_task(
352336
"Failed to create RecordBatch for deleted file",
353337
)
354338
})
355-
.and_then(|batch| {
356-
ArrowReader::add_file_path_column(
357-
batch,
358-
&file_path,
359-
RESERVED_COL_NAME_FILE_PATH,
360-
RESERVED_FIELD_ID_FILE_PATH,
361-
)
362-
})
339+
.and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path))
363340
});
364341

365342
Ok(Box::pin(stream) as ArrowRecordBatchStream)

crates/iceberg/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ mod incremental;
3737
pub use incremental::*;
3838
pub use reader::*;
3939
pub use value::*;
40+
41+
// Re-export delete file constants for convenience
4042
/// Partition value calculator for computing partition values
4143
pub mod partition_value_calculator;
4244
pub use partition_value_calculator::*;

0 commit comments

Comments
 (0)