Skip to content

Commit 8e73566

Browse files
zxqfd555-pwManul from Pathway
authored and
Manul from Pathway
committed
support simple deletions in deltalake connector (#7876)
GitOrigin-RevId: f24d1b605adcd9faecc00a985516c46cc13052e5
1 parent baa7600 commit 8e73566

File tree

4 files changed

+100
-24
lines changed

4 files changed

+100
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
99
- `pw.xpacks.llm.prompts.RAGPromptTemplate`, set of prompt utilities that enable verifying templates and creating UDFs from prompt strings or callables.
1010
- `pw.xpacks.llm.question_answering.BaseContextProcessor` streamlines development and tuning of representing retrieved context documents to the LLM.
1111
- `pw.io.kafka.read` now supports `with_metadata` flag, which makes it possible to attach the metadata of the Kafka messages to the table entries.
12+
- `pw.io.deltalake.read` can now stream the tables with deletions, if no deletion vectors were used.
1213

1314
### Changed
1415
- `pw.io.sharepoint.read` now explicitly terminates with an error if it fails to read the data the specified number of times per row (the default is `8`).

python/pathway/io/deltalake/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ def read(
4848
debug_data: Any = None,
4949
) -> Table:
5050
"""
51-
Reads an **append-only** table from Delta Lake. Currently, local and S3 lakes are
52-
supported.
51+
Reads a table from Delta Lake. Currently, local and S3 lakes are supported. The table
52+
doesn't have to be append only, however, the deletion vectors are not supported yet.
5353
5454
Args:
5555
uri: URI of the Delta Lake source that must be read.

python/pathway/tests/test_io.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3203,7 +3203,7 @@ class InputSchema(pw.Schema):
32033203
k: int = pw.column_definition(primary_key=True)
32043204
v: str
32053205

3206-
def run_pathway_program(expected_key_set):
3206+
def run_pathway_program(expected_key_set, expected_diff=1):
32073207
table = pw.io.deltalake.read(lake_path, schema=InputSchema, mode="static")
32083208
pw.io.csv.write(table, output_path)
32093209

@@ -3215,6 +3215,7 @@ def run_pathway_program(expected_key_set):
32153215
try:
32163216
result = pd.read_csv(output_path)
32173217
assert set(result["k"]) == expected_key_set
3218+
assert set(result["diff"]) == {expected_diff}
32183219
except pd.errors.EmptyDataError:
32193220
assert expected_key_set == {}
32203221
G.clear()
@@ -3254,6 +3255,42 @@ def run_pathway_program(expected_key_set):
32543255
write_deltalake(lake_path, df, mode="append")
32553256
run_pathway_program({10})
32563257

3258+
# The seventh run: remove some data from the table
3259+
condition = "k > 4 and k < 8"
3260+
table = DeltaTable(lake_path)
3261+
table.delete(condition)
3262+
run_pathway_program({5, 6, 7}, -1)
3263+
3264+
3265+
def test_deltalake_read_after_modification(tmp_path):
3266+
data = [
3267+
{"k": 1, "v": "one"},
3268+
{"k": 2, "v": "two"},
3269+
{"k": 3, "v": "three"},
3270+
{"k": 4, "v": "four"},
3271+
{"k": 5, "v": "five"},
3272+
{"k": 6, "v": "six"},
3273+
]
3274+
df = pd.DataFrame(data).set_index("k")
3275+
lake_path = str(tmp_path / "lake")
3276+
output_path = str(tmp_path / "output.csv")
3277+
write_deltalake(lake_path, df)
3278+
3279+
condition = "k > 1 and k < 5"
3280+
table = DeltaTable(lake_path)
3281+
table.delete(condition)
3282+
3283+
class InputSchema(pw.Schema):
3284+
k: int = pw.column_definition(primary_key=True)
3285+
v: str
3286+
3287+
table = pw.io.deltalake.read(lake_path, schema=InputSchema, mode="static")
3288+
pw.io.csv.write(table, output_path)
3289+
pw.run()
3290+
3291+
result = pd.read_csv(output_path)
3292+
assert set(result["k"]) == {1, 5, 6}
3293+
32573294

32583295
@needs_multiprocessing_fork
32593296
def test_streaming_from_deltalake(tmp_path):

src/connectors/data_storage.rs

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ pub enum ReadError {
273273
#[error("parquet value type mismatch: got {0:?} expected {1:?}")]
274274
WrongParquetType(ParquetValue, Type),
275275

276-
#[error("only append-only delta tables are supported")]
277-
DeltaLakeForbiddenRemoval,
276+
#[error("deletion vectors in delta tables are not supported")]
277+
DeltaDeletionVectorsNotSupported,
278278
}
279279

280280
#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
@@ -1909,6 +1909,18 @@ impl ObjectDownloader {
19091909
}
19101910
}
19111911

1912+
#[derive(Debug)]
1913+
pub struct DeltaReaderAction {
1914+
action_type: DataEventType,
1915+
path: String,
1916+
}
1917+
1918+
impl DeltaReaderAction {
1919+
pub fn new(action_type: DataEventType, path: String) -> Self {
1920+
Self { action_type, path }
1921+
}
1922+
}
1923+
19121924
pub struct DeltaTableReader {
19131925
table: DeltaTable,
19141926
streaming_mode: ConnectorMode,
@@ -1921,7 +1933,8 @@ pub struct DeltaTableReader {
19211933
current_version: i64,
19221934
last_fully_read_version: Option<i64>,
19231935
rows_read_within_version: i64,
1924-
parquet_files_queue: VecDeque<String>,
1936+
parquet_files_queue: VecDeque<DeltaReaderAction>,
1937+
current_event_type: DataEventType,
19251938
}
19261939

19271940
const DELTA_LAKE_INITIAL_POLL_DURATION: Duration = Duration::from_millis(5);
@@ -1940,7 +1953,7 @@ impl DeltaTableReader {
19401953
let runtime = create_async_tokio_runtime()?;
19411954
let table = runtime.block_on(async { open_delta_table(path, storage_options).await })?;
19421955
let current_version = table.version();
1943-
let parquet_files_queue = Self::get_file_uris(&table)?;
1956+
let parquet_files_queue = Self::get_reader_actions(&table, path)?;
19441957

19451958
Ok(Self {
19461959
table,
@@ -1955,21 +1968,39 @@ impl DeltaTableReader {
19551968
reader: None,
19561969
parquet_files_queue,
19571970
rows_read_within_version: 0,
1971+
current_event_type: DataEventType::Insert,
19581972
})
19591973
}
19601974

1961-
fn get_file_uris(table: &DeltaTable) -> Result<VecDeque<String>, ReadError> {
1962-
Ok(table.get_file_uris()?.collect())
1975+
fn get_reader_actions(
1976+
table: &DeltaTable,
1977+
base_path: &str,
1978+
) -> Result<VecDeque<DeltaReaderAction>, ReadError> {
1979+
Ok(table
1980+
.snapshot()?
1981+
.file_actions()?
1982+
.into_iter()
1983+
.map(|action| {
1984+
DeltaReaderAction::new(
1985+
DataEventType::Insert,
1986+
Self::ensure_absolute_path_with_base(&action.path, base_path),
1987+
)
1988+
})
1989+
.collect())
19631990
}
19641991

19651992
fn ensure_absolute_path(&self, path: &str) -> String {
1966-
if path.starts_with(&self.base_path) {
1993+
Self::ensure_absolute_path_with_base(path, &self.base_path)
1994+
}
1995+
1996+
fn ensure_absolute_path_with_base(path: &str, base_path: &str) -> String {
1997+
if path.starts_with(base_path) {
19671998
return path.to_string();
19681999
}
1969-
if self.base_path.ends_with('/') {
1970-
format!("{}{path}", self.base_path)
2000+
if base_path.ends_with('/') {
2001+
format!("{base_path}{path}")
19712002
} else {
1972-
format!("{}/{path}", self.base_path)
2003+
format!("{base_path}/{path}")
19732004
}
19742005
}
19752006

@@ -1998,18 +2029,23 @@ impl DeltaTableReader {
19982029
for action in txn_actions {
19992030
// Protocol description for Delta Lake actions:
20002031
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#actions
2001-
match action {
2032+
let action = match action {
20022033
DeltaLakeAction::Remove(action) => {
2003-
if action.data_change {
2004-
return Err(ReadError::DeltaLakeForbiddenRemoval);
2034+
if action.deletion_vector.is_some() {
2035+
return Err(ReadError::DeltaDeletionVectorsNotSupported);
20052036
}
2037+
data_changed |= action.data_change;
2038+
let action_path = self.ensure_absolute_path(&action.path);
2039+
DeltaReaderAction::new(DataEventType::Delete, action_path)
20062040
}
20072041
DeltaLakeAction::Add(action) => {
20082042
data_changed |= action.data_change;
2009-
added_blocks.push_back(self.ensure_absolute_path(&action.path));
2043+
let action_path = self.ensure_absolute_path(&action.path);
2044+
DeltaReaderAction::new(DataEventType::Insert, action_path)
20102045
}
20112046
_ => continue,
20122047
};
2048+
added_blocks.push_back(action);
20132049
}
20142050

20152051
self.last_fully_read_version = Some(self.current_version);
@@ -2040,9 +2076,9 @@ impl DeltaTableReader {
20402076
return Err(ReadError::NoObjectsToRead);
20412077
}
20422078
}
2043-
let next_parquet_file = self.parquet_files_queue.pop_front().unwrap();
2044-
let local_object =
2045-
self.object_downloader.download_object(&next_parquet_file)?;
2079+
let next_action = self.parquet_files_queue.pop_front().unwrap();
2080+
let local_object = self.object_downloader.download_object(&next_action.path)?;
2081+
self.current_event_type = next_action.action_type;
20462082
self.reader = Some(DeltaLakeParquetReader::try_from(local_object)?.into_iter());
20472083
}
20482084
}
@@ -2116,7 +2152,7 @@ impl Reader for DeltaTableReader {
21162152

21172153
self.rows_read_within_version += 1;
21182154
Ok(ReadResult::Data(
2119-
ReaderContext::from_diff(DataEventType::Insert, None, row_map.into()),
2155+
ReaderContext::from_diff(self.current_event_type, None, row_map.into()),
21202156
(
21212157
OffsetKey::Empty,
21222158
OffsetValue::DeltaTablePosition {
@@ -2156,15 +2192,17 @@ impl Reader for DeltaTableReader {
21562192
// The offset is based on the full set of files present for `version`
21572193
self.current_version = *version;
21582194
runtime.block_on(async { self.table.load_version(self.current_version).await })?;
2159-
self.parquet_files_queue = Self::get_file_uris(&self.table)?;
2195+
self.parquet_files_queue = Self::get_reader_actions(&self.table, &self.base_path)?;
21602196
}
21612197

21622198
self.rows_read_within_version = 0;
21632199
while !self.parquet_files_queue.is_empty() {
21642200
let next_block = self.parquet_files_queue.front().unwrap();
2165-
let block_size = Self::rows_in_file_count(next_block)?;
2201+
let block_size = Self::rows_in_file_count(&next_block.path)?;
21662202
if self.rows_read_within_version + block_size <= *n_rows_to_rewind {
2167-
info!("Skipping parquet block with the size of {block_size} entries: {next_block}");
2203+
info!(
2204+
"Skipping parquet block with the size of {block_size} entries: {next_block:?}"
2205+
);
21682206
self.rows_read_within_version += block_size;
21692207
self.parquet_files_queue.pop_front();
21702208
} else {

0 commit comments

Comments
 (0)