Skip to content

Conversation

@gbrgr
Copy link
Collaborator

@gbrgr gbrgr commented Oct 23, 2025

Closes RAI-43289.
Closes RAI-43292
Closes RAI-43688

Incremental Scan Implementation

Summary

This PR introduces Incremental Scan functionality to the Iceberg Rust implementation, enabling efficient querying of changes between table snapshots. Incremental scans return the net changes (appends and deletes) between two snapshots, which is essential for incremental data processing workflows, change data capture (CDC), and efficient data pipeline operations.

Key Features

Incremental Scan API

  • New IncrementalScan builder with fluent API for configuring scans between snapshots
  • Support for scanning changes from a starting snapshot to an ending snapshot
  • Returns separate streams for appended data and positional deletes
  • Implements column projection via .select() for efficient data retrieval
  • Configurable batch size via .with_batch_size() for memory optimization

File Path Tracking

  • Automatically adds a reserved _file column to all delete record batches containing the source parquet file path
  • Uses RunEndEncoded arrays for memory-efficient file path storage in non-empty batches
  • Includes proper field metadata with reserved field ID (-2048)

Net Change Computation

  • Computes net effects between snapshots: rows added then deleted within the range don't appear in results
  • Efficiently processes only the data files and delete files that changed between snapshots
  • Supports complex scenarios including partial deletes, cross-file operations, and multiple snapshots

Implementation Details

Core Components

  1. IncrementalScan Builder (scan/incremental/mod.rs)

    • Validates snapshot ranges and table state
    • Generates file scan tasks for appends and deletes
    • Integrates with existing ArrowReader infrastructure
  2. Streaming Implementation (arrow/incremental.rs)

    • StreamsInto trait with .stream() method for converting scan tasks to Arrow record streams
    • Separate processing for append tasks (data files) and delete tasks (positional deletes)
    • Optimized delete task processing with schema reuse (O(1) allocations instead of O(n_batches))
  3. File Path Column Addition (arrow/reader.rs)

    • add_file_path_column() function adds _file column to record batches
    • Handles both empty and non-empty batches correctly
    • Maintains proper Arrow schema metadata for Iceberg field IDs

Restrictions

  • Does not support yet deletion of entire parquet files, or overwriting of parquet files.

Testing

Comprehensive test suite added in scan/incremental/tests.rs:

Test Fixture

  • IncrementalTestFixture - Helper for creating test tables with controlled snapshots
  • Supports Add operations with custom file names and data
  • Supports Delete operations with position and file tracking
  • Verification helper verify_incremental_scan() for asserting expected results

Test Coverage

  1. test_incremental_fixture_simple - Basic append and delete operations
  2. test_incremental_fixture_complex - Multiple snapshots with overlapping operations
    • Tests 6 different snapshot range combinations
    • Verifies net change computation (e.g., data added then deleted doesn't appear)
  3. test_incremental_scan_edge_cases - Edge cases across 7 snapshots and 3 data files
    • Partial deletes from multiple files
    • Cross-file operations
    • Empty result sets
  4. test_incremental_scan_builder_options - Builder API functionality
    • Column projection (.select())
    • Batch size configuration
    • Multiple batch size scenarios
  5. test_add_file_path_column - Unit tests for file path column addition
    • Normal case with RunEndEncoded arrays
    • Empty batch handling with StringArray
    • Special characters in file paths

All tests passing: ✅ 4 incremental scan tests, ✅ 3 file path column tests

API Example

// Scan changes between snapshot 2 and snapshot 5
let incremental_scan = table
    .scan()
    .incremental()
    .from_snapshot_id(2)
    .to_snapshot_id(5)
    .select(vec!["id", "name"])  // Column projection
    .with_batch_size(Some(1024)) // Configure batch size
    .build()?;

let (appends_stream, deletes_stream) = incremental_scan.to_unzipped_arrow()?;

// Process appended rows
while let Some(batch) = appends_stream.next().await {
    let batch = batch?;
    // batch contains appended data
}

// Process deleted positions
while let Some(batch) = deletes_stream.next().await {
    let batch = batch?;
    // batch contains (pos, _file) tuples for deleted positions
}

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 363 files.

Valid Invalid Ignored Fixed
296 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 366 files.

Valid Invalid Ignored Fixed
299 2 65 0
Click to see the invalid file list
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has checked 367 files.

Valid Invalid Ignored Fixed
299 3 65 0
Click to see the invalid file list
  • crates/iceberg/src/scan/incremental/tests.rs
  • crates/playground/Cargo.toml
  • crates/playground/src/main.rs
Use this command to fix any missing license headers
```bash

docker run -it --rm -v $(pwd):/github/workspace apache/skywalking-eyes header fix

</details>

@gbrgr gbrgr marked this pull request as ready for review October 28, 2025 08:13
@gbrgr gbrgr requested a review from vustef October 28, 2025 08:13
@gbrgr gbrgr changed the title Add changelog scan for appends and positional deletes Add incremental scan for appends and positional deletes Oct 28, 2025
Copy link

@vustef vustef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Gerald, it's a big change. I went through everything, maybe the second pass I'll be able to digest the whole picture from different perspective. In the meanwhile, let me know what you think about the comments.

.and_then(|st| st.delete_vectors.get(delete_file_path).cloned())
}

pub(crate) fn with_read<F, G>(&self, f: F) -> Result<G>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the benefit of having this function (that has a callback) vs just doing everything inline? Seems like unnecessary optimization, and it's not a well-known pattern what with_read might mean.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allows public read access without changing the properties.

/// Adds a `_file` column to the RecordBatch containing the file path.
/// Uses Run-End Encoding (RLE) for maximum memory efficiency when the same
/// file path is repeated across all rows.
#[allow(dead_code)]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to allow dead_code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise clippy complains

batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"pos",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what names we should give here. spec defines pos for positional deletes, but in this stream we're going to include rows from file deletes.
And if we're returning this, should we also add field ID according to the spec?

On the other hand, below when you add_file_path_column, it will be named _file, which is defined for data files, while positional deletes should have file_path for a name. Personally, I don't like the distinction, as it makes us handle these two streams differently (which is kinda already the case, but this could lead to off-by-one errors)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field IDs are also different for positional deletes vs for metadata column on data file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion what to do here, or leave it for the time being?

"Failed to create RecordBatch for DeleteVector",
)
})
.and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so this is the only invocation? I don't think then that this PR handles Jira ticket for adding file path. That ticket should be closed only when we add the ability to add file paths to appends stream too, as well as to the regular (non-incremental) table scan.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thought about that, that's why I did not put it into the ticket description. We still need builder methods for this one, and I haven't implemented them here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants