From 32e4854d26e8886aac1692e0d3b8a608dd92b612 Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Thu, 9 Oct 2025 15:17:54 +0300 Subject: [PATCH 1/8] fill transaction meta on chunk store and simplify full outer join --- datapipe/datatable.py | 22 +++- datapipe/meta/sql_meta.py | 205 ++++++++++++++++++++++++++++++- datapipe/step/batch_transform.py | 36 +++--- 3 files changed, 244 insertions(+), 19 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 5f4f553f..5cb3d90e 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -5,7 +5,7 @@ from opentelemetry import trace from datapipe.event_logger import EventLogger -from datapipe.meta.sql_meta import MetaTable +from datapipe.meta.sql_meta import MetaTable, create_transformation_meta_for_changes from datapipe.run_config import RunConfig from datapipe.store.database import DBConn from datapipe.store.table_store import TableStore @@ -46,6 +46,7 @@ def get_metadata(self, idx: Optional[IndexDF] = None) -> MetadataDF: def get_data(self, idx: Optional[IndexDF] = None) -> DataDF: return self.table_store.read_rows(self.meta_table.get_existing_idx(idx)) + def reset_metadata(self): with self.meta_dbconn.con.begin() as con: con.execute(self.meta_table.sql_table.update().values(process_ts=0, update_ts=0)) @@ -56,6 +57,17 @@ def get_size(self) -> int: """ return self.meta_table.get_metadata_size(idx=None, include_deleted=False) + def add_transformations(self, transformations): + if not hasattr(self, "transformations"): + self.transformations = [] + self.transformations.extend(transformations) + + def get_index_data(self) -> DataDF: + # this can be optimized with new method in table_store that reads only index values + # however this causes large refactoring + all_data = self.table_store.read_rows() + return all_data[self.primary_keys] + def store_chunk( self, data_df: DataDF, @@ -112,6 +124,14 @@ def store_chunk( changes.append(data_to_index(new_df, self.primary_keys)) if not changed_df.empty: changes.append(data_to_index(changed_df, self.primary_keys)) + + with tracer.start_as_current_span("store transformation metadata"): + transformations = getattr(self, 'transformations', []) + create_transformation_meta_for_changes( + transformations, self.name, self.primary_keys, + new_df, changed_df + ) + else: data_df = pd.DataFrame(columns=self.primary_keys) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e1028858..e525053f 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -2,6 +2,7 @@ import math import time from dataclasses import dataclass +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -397,7 +398,7 @@ def get_agg_cte( run_config: Optional[RunConfig] = None, ) -> Tuple[List[str], Any]: """ - Create a CTE that aggregates the table by transform keys and returns the + Create a CTE that aggregates the meta table by transform keys and returns the maximum update_ts for each group. CTE has the following columns: @@ -428,10 +429,17 @@ def get_agg_cte( sa.Column("process_ts", sa.Float), # Время последней успешной обработки sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) + sa.Column("status", sa.String), # Статус исполнения трансформации sa.Column("error", sa.String), # Текст ошибки ] +class TransformStatus(Enum): + PENDING = "pending" + COMPLETED = "completed" + ERROR = "error" + + class TransformMetaTable: def __init__( self, @@ -454,6 +462,7 @@ def __init__( ) if create_table: + # TODO there must be an initialization for the data if new transformation added self.sql_table.create(self.dbconn.con, checkfirst=True) def __reduce__(self) -> Tuple[Any, ...]: @@ -481,6 +490,7 @@ def insert_rows( "is_success": False, "priority": 0, "error": None, + "status": TransformStatus.PENDING.value, **idx_dict, # type: ignore } for idx_dict in idx.to_dict(orient="records") @@ -492,6 +502,58 @@ def insert_rows( with self.dbconn.con.begin() as con: con.execute(sql) + def reset_rows(self, idx: IndexDF) -> None: + idx = cast(IndexDF, idx[self.primary_keys]) + if len(idx) == 0: + return + + colname_bind_mapping = { + col: col + "_val" + for col in idx.columns + } + primary_key_conditions = [ + getattr(self.sql_table.c, col) == sa.bindparam(col_bind) + for col, col_bind in colname_bind_mapping.items() + ] + + update_sql = ( + sa.update(self.sql_table) + .values({ + "process_ts": 0, + "is_success": False, + "status": TransformStatus.PENDING.value, + "error": None, + }) + .where(sa.and_(*primary_key_conditions)) + ) + update_data = [ + { + colname_bind_mapping[str(key)]: value + for key, value in value_dict.items() + } + for value_dict in idx.to_dict(orient="records") + ] + + with self.dbconn.con.begin() as con: + con.execute(update_sql, update_data) + + def reset_all_rows(self) -> None: + """ + Difference from mark_all_rows_unprocessed is in flag is_success=True + Here what happens is all rows are reset to original state + """ + update_sql = ( + sa.update(self.sql_table) + .values({ + "process_ts": 0, + "is_success": False, + "status": TransformStatus.PENDING.value, + "error": None, + }) + ) + with self.dbconn.con.begin() as con: + con.execute(update_sql) + def mark_rows_processed_success( self, idx: IndexDF, @@ -518,6 +580,7 @@ def mark_rows_processed_success( "process_ts": process_ts, "is_success": True, "priority": 0, + "status": TransformStatus.COMPLETED.value, "error": None, } ] @@ -534,6 +597,7 @@ def mark_rows_processed_success( "process_ts": process_ts, "is_success": True, "priority": 0, + "status": TransformStatus.COMPLETED.value, "error": None, **idx_dict, # type: ignore } @@ -546,6 +610,7 @@ def mark_rows_processed_success( set_={ "process_ts": process_ts, "is_success": True, + "status": TransformStatus.COMPLETED.value, "error": None, }, ) @@ -573,6 +638,7 @@ def mark_rows_processed_error( "process_ts": process_ts, "is_success": False, "priority": 0, + "status": TransformStatus.ERROR.value, "error": error, **idx_dict, # type: ignore } @@ -585,6 +651,7 @@ def mark_rows_processed_error( set_={ "process_ts": process_ts, "is_success": False, + "status": TransformStatus.ERROR.value, "error": error, }, ) @@ -615,6 +682,7 @@ def mark_all_rows_unprocessed( { "process_ts": 0, "is_success": False, + "status": TransformStatus.PENDING.value, "error": None, } ) @@ -717,6 +785,98 @@ def _make_agg_of_agg( return sql.cte(name=f"all__{agg_col}") +def create_transformation_meta_for_changes( + transformations, current_table_name, primary_keys, + new_df, changed_df +): + for transf in transformations: + # for each transformation independently of any other + all_input_tables = transf.input_dts + current_table_is_required = any([ + dt.dt for dt in all_input_tables + if dt.dt.name == current_table_name + and dt.join_type == 'inner' + ]) + remaining_tables = [ + dt.dt for dt in all_input_tables + if dt.dt.name != current_table_name + ] + required_tables = set([ + dt.dt.name for dt in all_input_tables + if dt.dt.name != current_table_name + and dt.join_type == 'inner' + ]) + + new_result, changed_result = new_df[primary_keys], changed_df[primary_keys] + if not set(primary_keys).issubset(set(transf.transform_keys)): + # what happens here: + # update to a table that participates in a join and later in aggregation + # since values from current table participate in all aggregations, aggs are no longer correct + # so I reset all the rows - this causes the aggregation to be recomputed + # this is the same for both required and normal tables + transf.meta_table.reset_all_rows() + + # no need to process changed, I reset previous rows above + # but there can be newly added data + # this describes interaction of transform keys and newly added data + # with key that is missing in transform_keys + for remaining_table in remaining_tables: + remaining_table_df = remaining_table.get_index_data() + new_result = pd.merge(new_result, remaining_table_df, how='cross') + + if not new_result.empty: + transf.meta_table.insert_rows(new_result) + else: + # join the data in python: chunk on the remaining data according to transform_keys + for remaining_table in remaining_tables: + # tables are joined by the index, don't need to care about it + remaining_table_df = remaining_table.get_index_data() + + # key intersection between current datatable and remaining table + key_intersection = list( + set(primary_keys).intersection(set(remaining_table.primary_keys)) + ) + if current_table_is_required: + # there is inner join step to filter rows based on keys + if len(key_intersection) > 0: + # filtering step based on the intersection of keys, don't pollute with other keys + new_filtered = pd.merge( + new_df[key_intersection].drop_duplicates(), remaining_table_df, + how='inner', on=key_intersection + ) + changed_filtered = pd.merge( + changed_df[key_intersection].drop_duplicates(), remaining_table_df, + how='inner', on=key_intersection + ) + new_result = pd.merge( + new_result, new_filtered, how='inner', on=key_intersection + ) + changed_result = pd.merge( + changed_result, changed_filtered, how='inner', on=key_intersection + ) + else: + # no filtering, all the data is accepted + new_filtered, changed_filtered = remaining_table_df, remaining_table_df + new_result = pd.merge(new_result, new_filtered, how='cross') + changed_result = pd.merge(changed_result, changed_filtered, how='cross') + elif ( + remaining_table.name in required_tables + and set(remaining_table.primary_keys).issubset(set(primary_keys)) + ): + # I can filter by the values in the required table + # otherwise no filtering and fallback to the last else more general case + new_result = pd.merge(new_result, remaining_table_df, how='inner', on=key_intersection) + changed_result = pd.merge(changed_result, remaining_table_df, how='inner', on=key_intersection) + else: + new_result = pd.merge(new_result, remaining_table_df, how='cross') + changed_result = pd.merge(changed_result, remaining_table_df, how='cross') + + if not new_result.empty: + transf.meta_table.insert_rows(new_result) + if not changed_result.empty: + transf.meta_table.reset_rows(changed_result) + + def build_changed_idx_sql( ds: "DataStore", meta_table: "TransformMetaTable", @@ -726,6 +886,49 @@ def build_changed_idx_sql( order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", run_config: Optional[RunConfig] = None, # TODO remove +): + sql = ( + sa.select( + # Нам нужно выбирать хотя бы что-то, чтобы не было ошибки при + # пустом transform_keys + sa.literal(1).label("_datapipe_dummy"), + *[meta_table.sql_table.c[key] for key in transform_keys], + ) + .select_from(meta_table.sql_table) + .where(sa.or_( + meta_table.sql_table.c.status == TransformStatus.PENDING.value, + meta_table.sql_table.c.is_success != True + )) + ) + + if order_by is None: + sql = sql.order_by( + meta_table.sql_table.c.priority.desc().nullslast(), + *[sa.column(k) for k in transform_keys], + ) + else: + if order == "desc": + sql = sql.order_by( + *[sa.desc(sa.column(k)) for k in order_by], + meta_table.sql_table.c.priority.desc().nullslast(), + ) + elif order == "asc": + sql = sql.order_by( + *[sa.asc(sa.column(k)) for k in order_by], + meta_table.sql_table.c.priority.desc().nullslast(), + ) + return transform_keys, sql + + +def build_changed_idx_sql_deprecated( + ds: "DataStore", + meta_table: "TransformMetaTable", + input_dts: List["ComputeInput"], + transform_keys: List[str], + filters_idx: Optional[IndexDF] = None, + order_by: Optional[List[str]] = None, + order: Literal["asc", "desc"] = "asc", + run_config: Optional[RunConfig] = None, # TODO remove ) -> Tuple[Iterable[str], Any]: all_input_keys_counts: Dict[str, int] = {} for col in itertools.chain(*[inp.dt.primary_schema for inp in input_dts]): diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 661b2079..dd419769 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -634,23 +634,25 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: input_dts = [self.pipeline_input_to_compute_input(ds, catalog, input) for input in self.inputs] output_dts = [catalog.get_datatable(ds, name) for name in self.outputs] - return [ - BatchTransformStep( - ds=ds, - name=f"{self.func.__name__}", - input_dts=input_dts, - output_dts=output_dts, - func=self.func, - kwargs=self.kwargs, - transform_keys=self.transform_keys, - chunk_size=self.chunk_size, - labels=self.labels, - executor_config=self.executor_config, - filters=self.filters, - order_by=self.order_by, - order=self.order, - ) - ] + step = BatchTransformStep( + ds=ds, + name=f"{self.func.__name__}", + input_dts=input_dts, + output_dts=output_dts, + func=self.func, + kwargs=self.kwargs, + transform_keys=self.transform_keys, + chunk_size=self.chunk_size, + labels=self.labels, + executor_config=self.executor_config, + filters=self.filters, + order_by=self.order_by, + order=self.order, + ) + for inp in input_dts: + inp.dt.add_transformations([step]) + + return [step] class BatchTransformStep(BaseBatchTransformStep): From aaa275626d4c5889703a9d1d70a2a4e4015d3904 Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:47:27 +0300 Subject: [PATCH 2/8] fixes for tests --- datapipe/compute.py | 3 + datapipe/datatable.py | 6 + datapipe/meta/sql_meta.py | 209 ++++++++++++++++++++++------ datapipe/step/batch_transform.py | 49 ++++--- tests/test_core_steps1.py | 8 +- tests/test_core_steps2.py | 12 +- tests/test_image_pipeline.py | 2 + tests/test_table_store_json_line.py | 1 + tests/test_table_store_qdrant.py | 2 + 9 files changed, 214 insertions(+), 78 deletions(-) diff --git a/datapipe/compute.py b/datapipe/compute.py index 5b634796..23f1ba6a 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -117,6 +117,9 @@ def __init__( self._labels = labels self.executor_config = executor_config + for input_dt in input_dts: + input_dt.dt.add_transformations([self]) + def get_name(self) -> str: ss = [ self.__class__.__name__, diff --git a/datapipe/datatable.py b/datapipe/datatable.py index 5cb3d90e..e623f04a 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -161,6 +161,12 @@ def delete_by_idx( self.table_store.delete_rows(idx) self.meta_table.mark_rows_deleted(idx, now=now) + transformations = getattr(self, 'transformations', []) + create_transformation_meta_for_changes( + transformations, self.name, self.primary_keys, + pd.DataFrame(columns=self.primary_keys), idx + ) + def delete_stale_by_process_ts( self, process_ts: float, diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e525053f..fcf96e5f 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -1,6 +1,7 @@ import itertools import math import time +from collections import defaultdict from dataclasses import dataclass from enum import Enum from typing import ( @@ -12,6 +13,7 @@ List, Literal, Optional, + Set, Tuple, cast, ) @@ -427,6 +429,7 @@ def get_agg_cte( TRANSFORM_META_SCHEMA: DataSchema = [ sa.Column("process_ts", sa.Float), # Время последней успешной обработки + # FIXME remove this and adjust the queries sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) sa.Column("status", sa.String), # Статус исполнения трансформации @@ -785,29 +788,168 @@ def _make_agg_of_agg( return sql.cte(name=f"all__{agg_col}") +def _split_tables_into_connected_groups( + input_dts: List["ComputeInput"] +) -> Dict[Tuple[str], List["ComputeInput"]]: + """ + This function calculates groups of tables that are connected with primary keys + """ + + all_input_keyset = set([]) + all_input_keys = [] + + # this is simple way of determining graph connectivity among tables and keys + # tables are vertices, primary keys are edges + adjacency_keys_dict: Dict[str, Set[str]] = {} + for input_dt in input_dts: + current_keys = input_dt.dt.primary_keys + for key in current_keys: + if adjacency_keys_dict.get(key) is None: + adjacency_keys_dict[key] = set() + + for another_key in current_keys: + adjacency_keys_dict[key].add(another_key) + + all_input_keyset.update(input_dt.dt.primary_keys) + if input_dt.join_type != 'inner': + all_input_keys.append(tuple(sorted(input_dt.dt.primary_keys))) + + max_sets: List[Set[str]] = [] # each set contains all connected keys + # all connected keys can be inner joined, cross join for inter-group joins + for key_set in adjacency_keys_dict.values(): + found_related_set = False + for max_set in max_sets: + for key in key_set: + if key in max_set: + max_set.update(key_set) + found_related_set = True + + if not found_related_set: + max_sets.append(key_set) + + table_groups = defaultdict(list) + for key_clique in max_sets: + for input_dt in input_dts: + if input_dt.dt.primary_keys[0] in key_clique: + key_clique_tuple = tuple(sorted(key_clique)) + if input_dt.join_type == 'inner': + # all inner joins are at the tail of the list + table_groups[key_clique_tuple].append(input_dt) + else: + table_groups[key_clique_tuple].insert(0, input_dt) + + return cast(dict, table_groups) + + +def extract_transformation_meta(input_dts: List["ComputeInput"], transform_keys: List[str]) -> IndexDF: + """ + This function takes all the input tables to the transformation + and creates index of rows to be created in transformation meta table + to initialize it if transformation was added after the input tables were already filled in + """ + table_groups = _split_tables_into_connected_groups(input_dts) + + # within each group join the tables how=inner by intersecting keys + within_group_results = {} + for group_key, table_group in table_groups.items(): + first_table = table_group[0].dt + first_table_index = first_table.get_index_data() + first_table_keys = set(first_table.primary_keys) + for table in table_group[1:]: + key_intersection = list(first_table_keys.intersection(set(table.dt.primary_keys))) + current_table_index = table.dt.get_index_data() + first_table_index = pd.merge( + first_table_index, current_table_index, how='inner', on=key_intersection + ) + first_table_keys = set(list(first_table_keys) + list(table.dt.primary_keys)) + within_group_results[group_key] = first_table_index + + # cross join as final step + within_group_indexes = list(within_group_results.values()) + final_result = within_group_indexes[0] + for table_data in within_group_indexes[1:]: + final_result = pd.merge(final_result, table_data, how="cross") + + # this probably can be done other way during merge without calculating full result + return cast(IndexDF, final_result[transform_keys].drop_duplicates()) + + +def _join_delta( + result_df: IndexDF, table_df: IndexDF, result_pkeys: Set[str], table_pkeys: Set[str], + result_is_required_table: bool, table_is_required_table: bool +): + """ + delta_df - this is df for changes (chunk that is stored currently) + pkeys + table_df - this is data from existing table + pkeys + result_df - this is aggregator of final result + pkeys + delta_is_required_table - this is flag that says that delta change is applied to Required table + table_is_required_table - this is flag that says that the table_df is Required table + """ + + key_intersection = list(result_pkeys.intersection(set(table_pkeys))) + if len(key_intersection) > 0: + if result_is_required_table: + # the difference is that delta data from required tables is not included + # into the results, it is only used for filtering + result = pd.merge( + result_df, table_df, + how="inner", on=key_intersection + ) + result_pkeys_copy = result_pkeys.copy() + # result_keys are not changed since the result doesn't extend + elif table_is_required_table and table_pkeys.issubset(result_pkeys): + result = pd.merge( + result_df, table_df.drop_duplicates(), + how="inner", on=key_intersection + ) + result_pkeys_copy = result_pkeys.copy() + # again result_pkeys don't change + else: + result = pd.merge( + result_df, table_df, how='inner', on=key_intersection + ) + result_pkeys_copy = result_pkeys.copy() + result_pkeys_copy.update(set(table_pkeys)) + else: + result = pd.merge(result_df, table_df, how='cross') + result_pkeys_copy = result_pkeys.copy() + result_pkeys_copy.update(set(table_pkeys)) + + return result, result_pkeys_copy + + def create_transformation_meta_for_changes( transformations, current_table_name, primary_keys, new_df, changed_df ): for transf in transformations: - # for each transformation independently of any other - all_input_tables = transf.input_dts + output_tables = set([dt.name for dt in transf.output_dts]) + all_input_tables = [dt for dt in transf.input_dts if dt.dt.name not in output_tables] + + table_groups = _split_tables_into_connected_groups(all_input_tables) + filtered_groups = { + key_tuple: table_group + for key_tuple, table_group in table_groups.items() + if any(transf_key in set(key_tuple) for transf_key in transf.transform_keys) + } + current_table_is_required = any([ dt.dt for dt in all_input_tables if dt.dt.name == current_table_name and dt.join_type == 'inner' ]) remaining_tables = [ - dt.dt for dt in all_input_tables + dt.dt for dt in itertools.chain(*filtered_groups.values()) if dt.dt.name != current_table_name ] required_tables = set([ - dt.dt.name for dt in all_input_tables + dt.dt.name for dt in itertools.chain(*filtered_groups.values()) if dt.dt.name != current_table_name and dt.join_type == 'inner' ]) new_result, changed_result = new_df[primary_keys], changed_df[primary_keys] + result_keys = set(primary_keys) if not set(primary_keys).issubset(set(transf.transform_keys)): # what happens here: # update to a table that participates in a join and later in aggregation @@ -816,60 +958,34 @@ def create_transformation_meta_for_changes( # this is the same for both required and normal tables transf.meta_table.reset_all_rows() - # no need to process changed, I reset previous rows above + # no need to process changed_df, I reset previous rows above # but there can be newly added data # this describes interaction of transform keys and newly added data # with key that is missing in transform_keys for remaining_table in remaining_tables: remaining_table_df = remaining_table.get_index_data() - new_result = pd.merge(new_result, remaining_table_df, how='cross') + new_result, result_keys = _join_delta( + new_result, remaining_table_df[remaining_table.primary_keys], + result_keys, set(remaining_table.primary_keys), + current_table_is_required, remaining_table.name in required_tables + ) if not new_result.empty: transf.meta_table.insert_rows(new_result) else: - # join the data in python: chunk on the remaining data according to transform_keys for remaining_table in remaining_tables: - # tables are joined by the index, don't need to care about it remaining_table_df = remaining_table.get_index_data() - - # key intersection between current datatable and remaining table - key_intersection = list( - set(primary_keys).intersection(set(remaining_table.primary_keys)) + result_keys_cache = result_keys.copy() + new_result, result_keys = _join_delta( + new_result, remaining_table_df[remaining_table.primary_keys], + result_keys_cache, set(remaining_table.primary_keys), + current_table_is_required, remaining_table.name in required_tables + ) + changed_result, _ = _join_delta( + changed_result, remaining_table_df[remaining_table.primary_keys], + result_keys_cache, set(remaining_table.primary_keys), + current_table_is_required, remaining_table.name in required_tables ) - if current_table_is_required: - # there is inner join step to filter rows based on keys - if len(key_intersection) > 0: - # filtering step based on the intersection of keys, don't pollute with other keys - new_filtered = pd.merge( - new_df[key_intersection].drop_duplicates(), remaining_table_df, - how='inner', on=key_intersection - ) - changed_filtered = pd.merge( - changed_df[key_intersection].drop_duplicates(), remaining_table_df, - how='inner', on=key_intersection - ) - new_result = pd.merge( - new_result, new_filtered, how='inner', on=key_intersection - ) - changed_result = pd.merge( - changed_result, changed_filtered, how='inner', on=key_intersection - ) - else: - # no filtering, all the data is accepted - new_filtered, changed_filtered = remaining_table_df, remaining_table_df - new_result = pd.merge(new_result, new_filtered, how='cross') - changed_result = pd.merge(changed_result, changed_filtered, how='cross') - elif ( - remaining_table.name in required_tables - and set(remaining_table.primary_keys).issubset(set(primary_keys)) - ): - # I can filter by the values in the required table - # otherwise no filtering and fallback to the last else more general case - new_result = pd.merge(new_result, remaining_table_df, how='inner', on=key_intersection) - changed_result = pd.merge(changed_result, remaining_table_df, how='inner', on=key_intersection) - else: - new_result = pd.merge(new_result, remaining_table_df, how='cross') - changed_result = pd.merge(changed_result, remaining_table_df, how='cross') if not new_result.empty: transf.meta_table.insert_rows(new_result) @@ -900,6 +1016,7 @@ def build_changed_idx_sql( meta_table.sql_table.c.is_success != True )) ) + sql = sql_apply_runconfig_filter(sql, meta_table.sql_table, transform_keys, run_config) if order_by is None: sql = sql.order_by( diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index dd419769..7e520083 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -21,7 +21,7 @@ import pandas as pd from opentelemetry import trace -from sqlalchemy import alias, func, select +from sqlalchemy import alias, func, select, inspect as sa_inspect from sqlalchemy.sql.expression import select from tqdm_loggable.auto import tqdm @@ -34,7 +34,7 @@ ) from datapipe.datatable import DataStore, DataTable, MetaTable from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor -from datapipe.meta.sql_meta import TransformMetaTable, build_changed_idx_sql +from datapipe.meta.sql_meta import TransformMetaTable, build_changed_idx_sql, extract_transformation_meta from datapipe.run_config import LabelDict, RunConfig from datapipe.types import ( ChangeList, @@ -111,9 +111,11 @@ def __init__( transform_keys, ) + meta_table_name = f"{self.get_name()}_meta" + transform_meta_table_exists = sa_inspect(ds.meta_dbconn.con).has_table(meta_table_name) self.meta_table = TransformMetaTable( dbconn=ds.meta_dbconn, - name=f"{self.get_name()}_meta", + name=meta_table_name, primary_schema=self.transform_schema, create_table=ds.create_meta_table, ) @@ -121,6 +123,11 @@ def __init__( self.order_by = order_by self.order = order + if not transform_meta_table_exists: + meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) + if not meta_index.empty: + self.meta_table.insert_rows(meta_index) + @classmethod def compute_transform_schema( cls, @@ -634,25 +641,23 @@ def build_compute(self, ds: DataStore, catalog: Catalog) -> List[ComputeStep]: input_dts = [self.pipeline_input_to_compute_input(ds, catalog, input) for input in self.inputs] output_dts = [catalog.get_datatable(ds, name) for name in self.outputs] - step = BatchTransformStep( - ds=ds, - name=f"{self.func.__name__}", - input_dts=input_dts, - output_dts=output_dts, - func=self.func, - kwargs=self.kwargs, - transform_keys=self.transform_keys, - chunk_size=self.chunk_size, - labels=self.labels, - executor_config=self.executor_config, - filters=self.filters, - order_by=self.order_by, - order=self.order, - ) - for inp in input_dts: - inp.dt.add_transformations([step]) - - return [step] + return [ + BatchTransformStep( + ds=ds, + name=f"{self.func.__name__}", + input_dts=input_dts, + output_dts=output_dts, + func=self.func, + kwargs=self.kwargs, + transform_keys=self.transform_keys, + chunk_size=self.chunk_size, + labels=self.labels, + executor_config=self.executor_config, + filters=self.filters, + order_by=self.order_by, + order=self.order, + ) + ] class BatchTransformStep(BaseBatchTransformStep): diff --git a/tests/test_core_steps1.py b/tests/test_core_steps1.py index 504beacf..c4db49b6 100644 --- a/tests/test_core_steps1.py +++ b/tests/test_core_steps1.py @@ -217,8 +217,6 @@ def inc_func(df): df3["a"] += 3 return df1, df2, df3 - tbl.store_chunk(TEST_DF) - step_inc = BatchTransformStep( ds=ds, name="step_inc", @@ -227,6 +225,8 @@ def inc_func(df): output_dts=[tbl1, tbl2, tbl3], ) + tbl.store_chunk(TEST_DF) + step_inc.run_full(ds) assert_datatable_equal(tbl1, TEST_DF_INC1) @@ -234,8 +234,6 @@ def inc_func(df): assert_datatable_equal(tbl3, TEST_DF_INC3) ########################## - tbl.store_chunk(TEST_DF[:5], processed_idx=data_to_index(TEST_DF, tbl.primary_keys)) - def inc_func_inv(df): df1 = df.copy() df2 = df.copy() @@ -253,6 +251,8 @@ def inc_func_inv(df): output_dts=[tbl3, tbl2, tbl1], ) + tbl.store_chunk(TEST_DF[:5], processed_idx=data_to_index(TEST_DF, tbl.primary_keys)) + step_inc_inv.run_full(ds) assert_datatable_equal(tbl1, TEST_DF_INC1[:5]) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index dfa589a8..4336483c 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -169,12 +169,6 @@ def test_batch_transform_with_dt_on_input_and_output(dbconn): tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA1, True)) - df2 = TEST_DF1_1.loc[3:8] - df2["a"] = df2["a"].apply(lambda x: x + 10) - - tbl1.store_chunk(TEST_DF1_1, now=0) - tbl2.store_chunk(df2, now=0) - def update_df(df1: pd.DataFrame, df2: pd.DataFrame): df1 = df1.set_index(["item_id", "pipeline_id"]) df2 = df2.set_index(["item_id", "pipeline_id"]) @@ -194,6 +188,12 @@ def update_df(df1: pd.DataFrame, df2: pd.DataFrame): output_dts=[tbl2], ) + df2 = TEST_DF1_1.loc[3:8] + df2["a"] = df2["a"].apply(lambda x: x + 10) + + tbl1.store_chunk(TEST_DF1_1, now=0) + tbl2.store_chunk(df2, now=0) + step.run_full(ds) df_res = TEST_DF1_1.copy().set_index(["item_id", "pipeline_id"]) diff --git a/tests/test_image_pipeline.py b/tests/test_image_pipeline.py index 0707f3fe..df036f05 100644 --- a/tests/test_image_pipeline.py +++ b/tests/test_image_pipeline.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd from PIL import Image +import pytest from datapipe.compute import ( Catalog, @@ -107,6 +108,7 @@ def test_image_pipeline(dbconn, tmp_dir): assert len(list(tmp_dir.glob("tbl2/*.png"))) == 10 +@pytest.mark.skip(reason="impossible to trace changes when they happen externally") def test_image_batch_generate_with_later_deleting(dbconn, tmp_dir): # Add images to tmp_dir df_images = make_df() diff --git a/tests/test_table_store_json_line.py b/tests/test_table_store_json_line.py index 356826df..0fa450a7 100644 --- a/tests/test_table_store_json_line.py +++ b/tests/test_table_store_json_line.py @@ -33,6 +33,7 @@ def make_file2(file): out.write('{"id": "2", "text": "text2"}\n') +@pytest.mark.skip(reason="impossible to trace changes when they happen externally") def test_table_store_json_line_with_deleting(dbconn, tmp_dir): input_file = tmp_dir / "data.json" diff --git a/tests/test_table_store_qdrant.py b/tests/test_table_store_qdrant.py index 2a71e26e..8db0dcda 100644 --- a/tests/test_table_store_qdrant.py +++ b/tests/test_table_store_qdrant.py @@ -1,5 +1,6 @@ from pathlib import Path from typing import Generator +import pytest import pandas as pd from qdrant_client.models import Distance, VectorParams @@ -23,6 +24,7 @@ def generate_data() -> Generator[pd.DataFrame, None, None]: yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ["foo"], "int_payload": [42]}) +@pytest.mark.skip(reason="qdrant store cannot read all the rows from the index") def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog( From 9e8d3d5669a2fe756c3dd5c3bac475080695d353 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 26 Oct 2025 19:58:43 +0400 Subject: [PATCH 3/8] fix empty pipeline --- datapipe/step/batch_transform.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 7e520083..e3f84a75 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -123,10 +123,11 @@ def __init__( self.order_by = order_by self.order = order - if not transform_meta_table_exists: - meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) - if not meta_index.empty: - self.meta_table.insert_rows(meta_index) + # FIXME move this to CLI command + # if not transform_meta_table_exists: + # meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) + # if not meta_index.empty: + # self.meta_table.insert_rows(meta_index) @classmethod def compute_transform_schema( From fbd30059ba30d9495f2f33b1bf03acfbf6a79fbb Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Tue, 28 Oct 2025 17:06:48 +0300 Subject: [PATCH 4/8] create initial transform metadata and adjust tests --- datapipe/compute.py | 1 + datapipe/datatable.py | 1 + datapipe/step/batch_transform.py | 19 +++++++------ tests/test_batch_transform_scheduling.py | 6 ++-- tests/test_chunked_processing_pipeline.py | 12 ++++++-- tests/test_core_steps1.py | 34 ++++++++++------------- tests/test_core_steps2.py | 14 ++++------ tests/test_image_pipeline.py | 11 ++++---- 8 files changed, 49 insertions(+), 49 deletions(-) diff --git a/datapipe/compute.py b/datapipe/compute.py index 23f1ba6a..8bd95e48 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -118,6 +118,7 @@ def __init__( self.executor_config = executor_config for input_dt in input_dts: + print('\nadding transformations for a table', input_dt) input_dt.dt.add_transformations([self]) def get_name(self) -> str: diff --git a/datapipe/datatable.py b/datapipe/datatable.py index e623f04a..5de446a3 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -127,6 +127,7 @@ def store_chunk( with tracer.start_as_current_span("store transformation metadata"): transformations = getattr(self, 'transformations', []) + print('transformations for data table', self, transformations) create_transformation_meta_for_changes( transformations, self.name, self.primary_keys, new_df, changed_df diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index e3f84a75..b1b685fa 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -111,11 +111,9 @@ def __init__( transform_keys, ) - meta_table_name = f"{self.get_name()}_meta" - transform_meta_table_exists = sa_inspect(ds.meta_dbconn.con).has_table(meta_table_name) self.meta_table = TransformMetaTable( dbconn=ds.meta_dbconn, - name=meta_table_name, + name=f"{self.get_name()}_meta", primary_schema=self.transform_schema, create_table=ds.create_meta_table, ) @@ -123,12 +121,6 @@ def __init__( self.order_by = order_by self.order = order - # FIXME move this to CLI command - # if not transform_meta_table_exists: - # meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) - # if not meta_index.empty: - # self.meta_table.insert_rows(meta_index) - @classmethod def compute_transform_schema( cls, @@ -386,6 +378,15 @@ def store_batch_err( run_config=run_config, ) + def init_metadata(self): + """ + Call this method if you add a new transformation to tables + where there is already some saved data + """ + meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) + if not meta_index.empty: + self.meta_table.insert_rows(meta_index) + def fill_metadata(self, ds: DataStore) -> None: idx_len, idx_gen = self.get_full_process_ids(ds=ds, chunk_size=1000) diff --git a/tests/test_batch_transform_scheduling.py b/tests/test_batch_transform_scheduling.py index c5196a20..a49b0939 100644 --- a/tests/test_batch_transform_scheduling.py +++ b/tests/test_batch_transform_scheduling.py @@ -32,9 +32,6 @@ def test_inc_process_proc_no_change(dbconn) -> None: def id_func(df): return TEST_DF - tbl2.store_chunk(TEST_DF) - tbl1.store_chunk(TEST_DF) - step = BatchTransformStep( ds=ds, name="step", @@ -45,6 +42,9 @@ def id_func(df): output_dts=[tbl2], ) + tbl2.store_chunk(TEST_DF) + tbl1.store_chunk(TEST_DF) + count, idx_gen = step.get_full_process_ids(ds) idx_dfs = list(idx_gen) idx_len = len(pd.concat(idx_dfs)) if len(idx_dfs) > 0 else 0 diff --git a/tests/test_chunked_processing_pipeline.py b/tests/test_chunked_processing_pipeline.py index 81f7ff0d..7b709659 100644 --- a/tests/test_chunked_processing_pipeline.py +++ b/tests/test_chunked_processing_pipeline.py @@ -86,6 +86,13 @@ def conversion(df, multiply): ) steps = build_compute(ds, catalog, pipeline) + + # another case with awkward initialization + # UpdateExternalTable is a kind of transformation step, however it has no transformation meta table + # this means that I cannot track input rows during that step + # but the next step needs those changes + # I cannot make next transform be aware of the previous transform + steps[1].init_metadata() run_steps(ds, steps) df_transformed = catalog.get_datatable(ds, "output_data").get_data() @@ -94,7 +101,7 @@ def conversion(df, multiply): assert len(set(df_transformed["x"].values).symmetric_difference(set(x))) == 0 -def test_transform_with_many_input_and_output_tables(tmp_dir, dbconn): +def test_transform_with_many_input_and_output_tables(dbconn): ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog( { @@ -147,10 +154,11 @@ def transform(df1, df2): ] ) + steps = build_compute(ds, catalog, pipeline) + catalog.get_datatable(ds, "inp1").store_chunk(TEST_DF) catalog.get_datatable(ds, "inp2").store_chunk(TEST_DF) - steps = build_compute(ds, catalog, pipeline) run_steps(ds, steps) out1 = catalog.get_datatable(ds, "out1") diff --git a/tests/test_core_steps1.py b/tests/test_core_steps1.py index c4db49b6..5a4214f1 100644 --- a/tests/test_core_steps1.py +++ b/tests/test_core_steps1.py @@ -92,8 +92,6 @@ def test_inc_process_modify_values(dbconn) -> None: def id_func(df): return df - tbl1.store_chunk(TEST_DF) - step = BatchTransformStep( ds=ds, name="test", @@ -101,7 +99,7 @@ def id_func(df): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) - + tbl1.store_chunk(TEST_DF) step.run_full(ds) assert_datatable_equal(tbl2, TEST_DF) @@ -123,8 +121,6 @@ def test_inc_process_delete_values_from_input(dbconn) -> None: def id_func(df): return df - tbl1.store_chunk(TEST_DF) - step = BatchTransformStep( ds=ds, name="test", @@ -133,6 +129,7 @@ def id_func(df): output_dts=[tbl2], ) + tbl1.store_chunk(TEST_DF) step.run_full(ds) assert_datatable_equal(tbl2, TEST_DF) @@ -156,10 +153,6 @@ def test_inc_process_delete_values_from_proc(dbconn) -> None: def id_func(df): return df[:5] - tbl2.store_chunk(TEST_DF) - - tbl1.store_chunk(TEST_DF) - step = BatchTransformStep( ds=ds, name="test", @@ -167,6 +160,8 @@ def id_func(df): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) + tbl2.store_chunk(TEST_DF) + tbl1.store_chunk(TEST_DF) step.run_full(ds) @@ -250,7 +245,7 @@ def inc_func_inv(df): input_dts=[ComputeInput(dt=tbl, join_type="full")], output_dts=[tbl3, tbl2, tbl1], ) - + step_inc_inv.init_metadata() tbl.store_chunk(TEST_DF[:5], processed_idx=data_to_index(TEST_DF, tbl.primary_keys)) step_inc_inv.run_full(ds) @@ -295,9 +290,6 @@ def inc_func(df1, df2): df["a_second"] += 2 return df - tbl1.store_chunk(TEST_DF) - tbl2.store_chunk(TEST_DF) - step = BatchTransformStep( ds=ds, name="test", @@ -308,6 +300,8 @@ def inc_func(df1, df2): ], output_dts=[tbl], ) + tbl1.store_chunk(TEST_DF) + tbl2.store_chunk(TEST_DF) step.run_full(ds) @@ -378,8 +372,6 @@ def test_inc_process_many_several_outputs(dbconn) -> None: tbl_good = ds.create_table("tbl_good", table_store=TableStoreDB(dbconn, "tbl_good_data", TEST_SCHEMA, True)) tbl_bad = ds.create_table("tbl_bad", table_store=TableStoreDB(dbconn, "tbl_bad_data", TEST_SCHEMA, True)) - tbl.store_chunk(TEST_DF) - def inc_func(df): df_good = df[df["id"].isin(good_ids)] df_bad = df[df["id"].isin(bad_ids)] @@ -392,7 +384,7 @@ def inc_func(df): input_dts=[ComputeInput(dt=tbl, join_type="full")], output_dts=[tbl_good, tbl_bad], ) - + tbl.store_chunk(TEST_DF) step.run_full(ds) assert_datatable_equal(tbl, TEST_DF) @@ -417,8 +409,6 @@ def test_inc_process_many_one_to_many(dbconn) -> None: ) tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA_OTM, True)) - tbl.store_chunk(TEST_OTM_DF) - def inc_func_unpack(df): res_df = df.explode("a") @@ -448,6 +438,8 @@ def inc_func_pack(df): output_dts=[tbl2], ) + tbl.store_chunk(TEST_OTM_DF) + step_unpack.run_full(ds) step_pack.run_full(ds) @@ -479,8 +471,6 @@ def test_inc_process_many_one_to_many_change_primary(dbconn) -> None: ) tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA_OTM3, True)) - tbl.store_chunk(TEST_OTM_DF) - def inc_func_unpack(df): res_df = df.explode("a") return res_df[res_df["a"].notna()] @@ -516,6 +506,8 @@ def inc_func_pack(df): output_dts=[tbl2], ) + tbl.store_chunk(TEST_OTM_DF) + step_unpack.run_full(ds) step_pack.run_full(ds) @@ -622,6 +614,7 @@ def inc_func_good(df): output_dts=[tbl_good], chunk_size=1, ) + step_bad.init_metadata() step_bad.run_full(ds) assert_datatable_equal(tbl_good, TEST_DF.loc[[0, 1, 2, 4, 5]]) @@ -634,6 +627,7 @@ def inc_func_good(df): output_dts=[tbl_good], chunk_size=CHUNKSIZE, ) + step_good.init_metadata() step_good.run_full(ds) assert_datatable_equal(tbl_good, TEST_DF.loc[GOOD_IDXS1]) diff --git a/tests/test_core_steps2.py b/tests/test_core_steps2.py index 4336483c..67bd8c03 100644 --- a/tests/test_core_steps2.py +++ b/tests/test_core_steps2.py @@ -84,8 +84,6 @@ def test_batch_transform(dbconn): tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA1, True)) - tbl1.store_chunk(TEST_DF1_1, now=0) - step = BatchTransformStep( ds=ds, name="test", @@ -93,6 +91,7 @@ def test_batch_transform(dbconn): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) + tbl1.store_chunk(TEST_DF1_1, now=0) step.run_full(ds) @@ -118,8 +117,6 @@ def test_batch_transform_with_filter(dbconn): tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA1, True)) - tbl1.store_chunk(TEST_DF1_1, now=0) - step = BatchTransformStep( ds=ds, name="test", @@ -127,6 +124,7 @@ def test_batch_transform_with_filter(dbconn): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) + tbl1.store_chunk(TEST_DF1_1, now=0) step.run_full( ds, run_config=RunConfig( @@ -144,8 +142,6 @@ def test_batch_transform_with_filter_not_in_transform_index(dbconn): tbl2 = ds.create_table("tbl2", table_store=TableStoreDB(dbconn, "tbl2_data", TEST_SCHEMA2, True)) - tbl1.store_chunk(TEST_DF1_2, now=0) - step = BatchTransformStep( ds=ds, name="test", @@ -153,6 +149,7 @@ def test_batch_transform_with_filter_not_in_transform_index(dbconn): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) + tbl1.store_chunk(TEST_DF1_2, now=0) step.run_full( ds, @@ -337,9 +334,6 @@ def test_batch_transform_with_entity(dbconn): items2 = ds.create_table("items2", table_store=TableStoreDB(dbconn, "items2_data", ITEMS_SCHEMA, True)) - products.store_chunk(PRODUCTS_DF, now=0) - items.store_chunk(ITEMS_DF, now=0) - def update_df(products: pd.DataFrame, items: pd.DataFrame): merged_df = pd.merge(items, products, on=["product_id", "pipeline_id"]) merged_df["a"] = merged_df.apply(lambda x: x["a"] + x["b"], axis=1) @@ -356,6 +350,8 @@ def update_df(products: pd.DataFrame, items: pd.DataFrame): ], output_dts=[items2], ) + products.store_chunk(PRODUCTS_DF, now=0) + items.store_chunk(ITEMS_DF, now=0) step.run_full(ds) diff --git a/tests/test_image_pipeline.py b/tests/test_image_pipeline.py index df036f05..dce8e525 100644 --- a/tests/test_image_pipeline.py +++ b/tests/test_image_pipeline.py @@ -56,12 +56,6 @@ def test_image_datatables(dbconn, tmp_dir): assert len(list(tmp_dir.glob("tbl1/*.png"))) == 0 assert len(list(tmp_dir.glob("tbl2/*.png"))) == 0 - do_batch_generate( - func=gen_images, - ds=ds, - output_dts=[tbl1], - ) - step = BatchTransformStep( ds=ds, name="resize_images", @@ -69,6 +63,11 @@ def test_image_datatables(dbconn, tmp_dir): input_dts=[ComputeInput(dt=tbl1, join_type="full")], output_dts=[tbl2], ) + do_batch_generate( + func=gen_images, + ds=ds, + output_dts=[tbl1], + ) step.run_full(ds) From 1226e1986828394e4f136641f68f9b947789e3fa Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Tue, 28 Oct 2025 18:01:11 +0300 Subject: [PATCH 5/8] remove is_success column --- datapipe/meta/sql_meta.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index fcf96e5f..d2373e63 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -21,6 +21,7 @@ import cityhash import pandas as pd import sqlalchemy as sa +from sqlalchemy import or_ from datapipe.run_config import RunConfig from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter @@ -429,8 +430,6 @@ def get_agg_cte( TRANSFORM_META_SCHEMA: DataSchema = [ sa.Column("process_ts", sa.Float), # Время последней успешной обработки - # FIXME remove this and adjust the queries - sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) sa.Column("status", sa.String), # Статус исполнения трансформации sa.Column("error", sa.String), # Текст ошибки @@ -490,7 +489,6 @@ def insert_rows( [ { "process_ts": 0, - "is_success": False, "priority": 0, "error": None, "status": TransformStatus.PENDING.value, @@ -523,7 +521,6 @@ def reset_rows(self, idx: IndexDF) -> None: sa.update(self.sql_table) .values({ "process_ts": 0, - "is_success": False, "status": TransformStatus.PENDING.value, "error": None, }) @@ -542,14 +539,13 @@ def reset_rows(self, idx: IndexDF) -> None: def reset_all_rows(self) -> None: """ - Difference from mark_all_rows_unprocessed is in flag is_success=True + Difference from mark_all_rows_unprocessed: mark_all_rows_unporocessed checks status Here what happens is all rows are reset to original state """ update_sql = ( sa.update(self.sql_table) .values({ "process_ts": 0, - "is_success": False, "status": TransformStatus.PENDING.value, "error": None, }) @@ -581,7 +577,6 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, "priority": 0, "status": TransformStatus.COMPLETED.value, "error": None, @@ -598,7 +593,6 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, "priority": 0, "status": TransformStatus.COMPLETED.value, "error": None, @@ -612,7 +606,6 @@ def mark_rows_processed_success( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": True, "status": TransformStatus.COMPLETED.value, "error": None, }, @@ -639,7 +632,6 @@ def mark_rows_processed_error( [ { "process_ts": process_ts, - "is_success": False, "priority": 0, "status": TransformStatus.ERROR.value, "error": error, @@ -653,7 +645,6 @@ def mark_rows_processed_error( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": False, "status": TransformStatus.ERROR.value, "error": error, }, @@ -684,12 +675,11 @@ def mark_all_rows_unprocessed( .values( { "process_ts": 0, - "is_success": False, "status": TransformStatus.PENDING.value, "error": None, } ) - .where(self.sql_table.c.is_success == True) # noqa: E712 + .where(self.sql_table.c.status == TransformStatus.COMPLETED.value) # noqa: E712 ) sql = sql_apply_runconfig_filter(update_sql, self.sql_table, self.primary_keys, run_config) @@ -1013,7 +1003,7 @@ def build_changed_idx_sql( .select_from(meta_table.sql_table) .where(sa.or_( meta_table.sql_table.c.status == TransformStatus.PENDING.value, - meta_table.sql_table.c.is_success != True + meta_table.sql_table.c.status == TransformStatus.ERROR.value )) ) sql = sql_apply_runconfig_filter(sql, meta_table.sql_table, transform_keys, run_config) @@ -1047,6 +1037,9 @@ def build_changed_idx_sql_deprecated( order: Literal["asc", "desc"] = "asc", run_config: Optional[RunConfig] = None, # TODO remove ) -> Tuple[Iterable[str], Any]: + """ + Function is not working, is_success field is removed + """ all_input_keys_counts: Dict[str, int] = {} for col in itertools.chain(*[inp.dt.primary_schema for inp in input_dts]): all_input_keys_counts[col.name] = all_input_keys_counts.get(col.name, 0) + 1 From 65d907176f908efbbe4eb6db497de1ba88d2259f Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Wed, 12 Nov 2025 21:27:03 +0300 Subject: [PATCH 6/8] calculate transformation meta in db --- datapipe/datatable.py | 7 +- datapipe/meta/sql_meta.py | 319 ++++++++++++++++++++++--------- datapipe/step/batch_transform.py | 6 +- 3 files changed, 233 insertions(+), 99 deletions(-) diff --git a/datapipe/datatable.py b/datapipe/datatable.py index e623f04a..4afdc3b4 100644 --- a/datapipe/datatable.py +++ b/datapipe/datatable.py @@ -158,15 +158,16 @@ def delete_by_idx( if len(idx) > 0: logger.debug(f"Deleting {len(idx.index)} rows from {self.name} data") - self.table_store.delete_rows(idx) - self.meta_table.mark_rows_deleted(idx, now=now) - transformations = getattr(self, 'transformations', []) create_transformation_meta_for_changes( transformations, self.name, self.primary_keys, pd.DataFrame(columns=self.primary_keys), idx ) + self.table_store.delete_rows(idx) + self.meta_table.mark_rows_deleted(idx, now=now) + + def delete_stale_by_process_ts( self, process_ts: float, diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 5fa8fdc1..4fba5a60 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -25,7 +25,7 @@ from datapipe.run_config import RunConfig from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter -from datapipe.store.database import DBConn, MetaKey +from datapipe.store.database import DBConn, MetaKey, TableStoreDB from datapipe.types import ( DataDF, DataSchema, @@ -38,6 +38,7 @@ if TYPE_CHECKING: from datapipe.compute import ComputeInput + from datapipe.step.batch_transform import BaseBatchTransformStep from datapipe.datatable import DataStore @@ -466,7 +467,6 @@ def __init__( ) if create_table: - # TODO there must be an initialization for the data if new transformation added self.sql_table.create(self.dbconn.con, checkfirst=True) def __reduce__(self) -> Tuple[Any, ...]: @@ -505,6 +505,21 @@ def insert_rows( with self.dbconn.con.begin() as con: con.execute(sql) + def insert_rows_by_sql(self, sql) -> None: + sql = sql.add_columns( + sa.literal(0).label("process_ts"), + sa.literal(0).label("priority"), + sa.literal(None).label("error"), + sa.literal(TransformStatus.PENDING.value).label("status") + ) + insert_sql = self.dbconn.insert(self.sql_table).from_select( + sorted(self.primary_keys) + ["process_ts", "priority", "error", "status"], + sql + ).on_conflict_do_nothing(index_elements=self.primary_keys) + + with self.dbconn.con.begin() as con: + con.execute(insert_sql) + def reset_rows(self, idx: IndexDF) -> None: idx = cast(IndexDF, idx[self.primary_keys]) if len(idx) == 0: @@ -539,6 +554,19 @@ def reset_rows(self, idx: IndexDF) -> None: with self.dbconn.con.begin() as con: con.execute(update_sql, update_data) + def reset_rows_by_sql(self, sql) -> None: + primary_cols = [getattr(self.sql_table.c, col) for col in self.primary_keys] + update_sql = sa.update(self.sql_table).values({ + "process_ts": 0, + "status": TransformStatus.PENDING.value, + "error": None, + }).where( + sa.tuple_(*primary_cols).in_(sql) + ) + + with self.dbconn.con.begin() as con: + con.execute(update_sql) + def reset_all_rows(self) -> None: """ Difference from mark_all_rows_unprocessed: mark_all_rows_unporocessed checks status @@ -780,6 +808,20 @@ def _make_agg_of_agg( return sql.cte(name=f"all__{agg_col}") +def _all_input_tables_from_same_sql_db(input_tables): + table_stores = [] + is_all_table_store_db = True + for input_table in input_tables: + if isinstance(input_table.dt.table_store, TableStoreDB): + table_stores.append(input_table.dt.table_store.dbconn.connstr) + else: + is_all_table_store_db = False + table_stores.append(type(input_table.dt.table_store)) + + # all tables are from the same database and it is sql database + return len(set(table_stores)) == 1 and is_all_table_store_db + + def _split_tables_into_connected_groups( input_dts: List["ComputeInput"] ) -> Dict[Tuple[str], List["ComputeInput"]]: @@ -833,39 +875,6 @@ def _split_tables_into_connected_groups( return cast(dict, table_groups) -def extract_transformation_meta(input_dts: List["ComputeInput"], transform_keys: List[str]) -> IndexDF: - """ - This function takes all the input tables to the transformation - and creates index of rows to be created in transformation meta table - to initialize it if transformation was added after the input tables were already filled in - """ - table_groups = _split_tables_into_connected_groups(input_dts) - - # within each group join the tables how=inner by intersecting keys - within_group_results = {} - for group_key, table_group in table_groups.items(): - first_table = table_group[0].dt - first_table_index = first_table.get_index_data() - first_table_keys = set(first_table.primary_keys) - for table in table_group[1:]: - key_intersection = list(first_table_keys.intersection(set(table.dt.primary_keys))) - current_table_index = table.dt.get_index_data() - first_table_index = pd.merge( - first_table_index, current_table_index, how='inner', on=key_intersection - ) - first_table_keys = set(list(first_table_keys) + list(table.dt.primary_keys)) - within_group_results[group_key] = first_table_index - - # cross join as final step - within_group_indexes = list(within_group_results.values()) - final_result = within_group_indexes[0] - for table_data in within_group_indexes[1:]: - final_result = pd.merge(final_result, table_data, how="cross") - - # this probably can be done other way during merge without calculating full result - return cast(IndexDF, final_result[transform_keys].drop_duplicates()) - - def _join_delta( result_df: IndexDF, table_df: IndexDF, result_pkeys: Set[str], table_pkeys: Set[str], result_is_required_table: bool, table_is_required_table: bool @@ -877,6 +886,8 @@ def _join_delta( delta_is_required_table - this is flag that says that delta change is applied to Required table table_is_required_table - this is flag that says that the table_df is Required table """ + if result_df.empty: + return result_df, result_pkeys key_intersection = list(result_pkeys.intersection(set(table_pkeys))) if len(key_intersection) > 0: @@ -910,38 +921,177 @@ def _join_delta( return result, result_pkeys_copy -def create_transformation_meta_for_changes( - transformations, current_table_name, primary_keys, +def _join_input_tables_in_sql(all_input_tables, transform_keys): + """ + This function calculates sql query that joins all input tables + """ + select_columns_dict = {} + for input_table in all_input_tables: + for key in input_table.dt.primary_keys: + if key in transform_keys: + select_columns_dict[key] = input_table.dt.table_store.data_table.c[key] + # sort keys because the order of selected cols here and in + # TransformMetaTable.insert_rows_by_sql must be the same + sorted_keys = sorted(select_columns_dict.keys()) + select_columns = [select_columns_dict[key].label(key) for key in sorted_keys] + + first_table = all_input_tables[0] + sql = sa.select(*select_columns).select_from(first_table.dt.table_store.data_table) + prev_tables = [first_table] + for table in all_input_tables[1:]: + onclause = [] + + for prev_table in prev_tables: + for key in table.dt.primary_keys: + if key in prev_table.dt.primary_keys: + onclause.append( + prev_table.dt.table_store.data_table.c[key] == table.dt.table_store.data_table.c[key] + ) + + if len(onclause) > 0: + sql = sql.join(table.dt.table_store.data_table, onclause=sa.and_(*onclause)) + else: + sql = sql.join(table.dt.table_store.data_table, onclause=sa.literal(True)) + + return sql + + +def _calculate_changes_in_sql( + transf, current_table, all_input_tables, new_df, changed_df +): + sql = _join_input_tables_in_sql(all_input_tables, transf.transform_keys) + + primary_key_columns = [ + current_table.dt.table_store.data_table.c[pk] + for pk in current_table.dt.primary_keys + ] + + if not new_df.empty: + new_df_sql = sql.where( + sa.tuple_(*primary_key_columns).in_( + new_df[current_table.dt.primary_keys].values.tolist() + ) + ) + transf.meta_table.insert_rows_by_sql(new_df_sql) + + if not changed_df.empty: + changed_df_sql = sql.where( + sa.tuple_(*primary_key_columns).in_( + changed_df[current_table.dt.primary_keys].values.tolist() + ) + ) + transf.meta_table.reset_rows_by_sql(changed_df_sql) + + +def _calculate_changes_in_pandas( + transf, current_table_name, all_input_tables, primary_keys, new_df, changed_df ): + """ + See note for extract_transformation_meta function + """ + table_groups = _split_tables_into_connected_groups(all_input_tables) + filtered_groups = { + key_tuple: table_group + for key_tuple, table_group in table_groups.items() + if any(transf_key in set(key_tuple) for transf_key in transf.transform_keys) + } + + current_table_is_required = any([ + dt.dt for dt in all_input_tables + if dt.dt.name == current_table_name + and dt.join_type == 'inner' + ]) + remaining_tables = [ + dt.dt for dt in itertools.chain(*filtered_groups.values()) + if dt.dt.name != current_table_name + ] + required_tables = set([ + dt.dt.name for dt in itertools.chain(*filtered_groups.values()) + if dt.dt.name != current_table_name + and dt.join_type == 'inner' + ]) + + new_result, changed_result = new_df[primary_keys], changed_df[primary_keys] + result_keys = set(primary_keys) + + for remaining_table in remaining_tables: + remaining_table_df = remaining_table.get_index_data() + result_keys_cache = result_keys.copy() + new_result, result_keys = _join_delta( + new_result, remaining_table_df[remaining_table.primary_keys], + result_keys_cache, set(remaining_table.primary_keys), + current_table_is_required, remaining_table.name in required_tables + ) + changed_result, _ = _join_delta( + changed_result, remaining_table_df[remaining_table.primary_keys], + result_keys_cache, set(remaining_table.primary_keys), + current_table_is_required, remaining_table.name in required_tables + ) + + if not new_result.empty: + transf.meta_table.insert_rows(new_result) + if not changed_result.empty: + transf.meta_table.reset_rows(changed_result) + + +def _initial_transformation_meta_extract_in_pandas( + transf: "BaseBatchTransformStep", input_dts: List["ComputeInput"], transform_keys: List[str] +) -> None: + """ + This function takes all the input tables to the transformation + and creates index of rows to be created in transformation meta table + to initialize it if transformation was added after the input tables were already filled in + + Note: (very important) This method does the join between multiple tables in some cases + - if tables are in the same database, then method uses in-database join + - if tables reside in different stores, then data is fetched to python code and joined manually + this can cause out-of memory errors if data is too large + """ + table_groups = _split_tables_into_connected_groups(input_dts) + + # within each group join the tables how=inner by intersecting keys + within_group_results = {} + for group_key, table_group in table_groups.items(): + first_table = table_group[0].dt + first_table_index = first_table.get_index_data() + first_table_keys = set(first_table.primary_keys) + for table in table_group[1:]: + key_intersection = list(first_table_keys.intersection(set(table.dt.primary_keys))) + current_table_index = table.dt.get_index_data() + first_table_index = pd.merge( + first_table_index, current_table_index, how='inner', on=key_intersection + ) + first_table_keys = set(list(first_table_keys) + list(table.dt.primary_keys)) + within_group_results[group_key] = first_table_index + + # cross join as final step + within_group_indexes = list(within_group_results.values()) + final_result = within_group_indexes[0] + for table_data in within_group_indexes[1:]: + final_result = pd.merge(final_result, table_data, how="cross") + + idx_to_write = cast(IndexDF, final_result[transform_keys].drop_duplicates()) + transf.meta_table.insert_rows(idx_to_write) + + +def _initial_transformation_meta_extract_in_sql( + transf: "BaseBatchTransformStep", all_input_tables, transform_keys +) -> None: + sql = _join_input_tables_in_sql(all_input_tables, transform_keys) + transf.meta_table.insert_rows_by_sql(sql) + + +def create_transformation_meta_for_changes( + transformations: List["BaseBatchTransformStep"], current_table_name: str, primary_keys, + new_df: pd.DataFrame, changed_df: pd.DataFrame +) -> None: for transf in transformations: output_tables = set([dt.name for dt in transf.output_dts]) all_input_tables = [dt for dt in transf.input_dts if dt.dt.name not in output_tables] + current_table = [dt for dt in transf.input_dts if dt.dt.name == current_table_name][0] + all_input_tables_from_sql_db = _all_input_tables_from_same_sql_db(all_input_tables) - table_groups = _split_tables_into_connected_groups(all_input_tables) - filtered_groups = { - key_tuple: table_group - for key_tuple, table_group in table_groups.items() - if any(transf_key in set(key_tuple) for transf_key in transf.transform_keys) - } - - current_table_is_required = any([ - dt.dt for dt in all_input_tables - if dt.dt.name == current_table_name - and dt.join_type == 'inner' - ]) - remaining_tables = [ - dt.dt for dt in itertools.chain(*filtered_groups.values()) - if dt.dt.name != current_table_name - ] - required_tables = set([ - dt.dt.name for dt in itertools.chain(*filtered_groups.values()) - if dt.dt.name != current_table_name - and dt.join_type == 'inner' - ]) - - new_result, changed_result = new_df[primary_keys], changed_df[primary_keys] - result_keys = set(primary_keys) if not set(primary_keys).issubset(set(transf.transform_keys)): # what happens here: # update to a table that participates in a join and later in aggregation @@ -949,40 +1099,25 @@ def create_transformation_meta_for_changes( # so I reset all the rows - this causes the aggregation to be recomputed # this is the same for both required and normal tables transf.meta_table.reset_all_rows() + changed_df = pd.DataFrame(columns=changed_df.columns) # don't process it - # no need to process changed_df, I reset previous rows above - # but there can be newly added data - # this describes interaction of transform keys and newly added data - # with key that is missing in transform_keys - for remaining_table in remaining_tables: - remaining_table_df = remaining_table.get_index_data() - new_result, result_keys = _join_delta( - new_result, remaining_table_df[remaining_table.primary_keys], - result_keys, set(remaining_table.primary_keys), - current_table_is_required, remaining_table.name in required_tables - ) - - if not new_result.empty: - transf.meta_table.insert_rows(new_result) + if all_input_tables_from_sql_db: + _calculate_changes_in_sql(transf, current_table, all_input_tables, new_df, changed_df) else: - for remaining_table in remaining_tables: - remaining_table_df = remaining_table.get_index_data() - result_keys_cache = result_keys.copy() - new_result, result_keys = _join_delta( - new_result, remaining_table_df[remaining_table.primary_keys], - result_keys_cache, set(remaining_table.primary_keys), - current_table_is_required, remaining_table.name in required_tables - ) - changed_result, _ = _join_delta( - changed_result, remaining_table_df[remaining_table.primary_keys], - result_keys_cache, set(remaining_table.primary_keys), - current_table_is_required, remaining_table.name in required_tables - ) + _calculate_changes_in_pandas( + transf, current_table_name, all_input_tables, primary_keys, new_df, changed_df + ) - if not new_result.empty: - transf.meta_table.insert_rows(new_result) - if not changed_result.empty: - transf.meta_table.reset_rows(changed_result) + +def init_transformation_meta(transf: "BaseBatchTransformStep") -> None: + output_tables = set([dt.name for dt in transf.output_dts]) + all_input_tables = [dt for dt in transf.input_dts if dt.dt.name not in output_tables] + all_input_tables_from_sql_db = _all_input_tables_from_same_sql_db(all_input_tables) + + if all_input_tables_from_sql_db: + _initial_transformation_meta_extract_in_sql(transf, all_input_tables, transf.transform_keys) + else: + _initial_transformation_meta_extract_in_pandas(transf, all_input_tables, transf.transform_keys) def build_changed_idx_sql( diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 34adc66c..10e07dfa 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -35,7 +35,7 @@ ) from datapipe.datatable import DataStore, DataTable, MetaTable from datapipe.executor import Executor, ExecutorConfig, SingleThreadExecutor -from datapipe.meta.sql_meta import TransformMetaTable, build_changed_idx_sql, extract_transformation_meta +from datapipe.meta.sql_meta import TransformMetaTable, build_changed_idx_sql, init_transformation_meta from datapipe.run_config import LabelDict, RunConfig from datapipe.types import ( ChangeList, @@ -395,9 +395,7 @@ def init_metadata(self): Call this method if you add a new transformation to tables where there is already some saved data """ - meta_index = extract_transformation_meta(self.input_dts, self.transform_keys) - if not meta_index.empty: - self.meta_table.insert_rows(meta_index) + init_transformation_meta(self) def fill_metadata(self, ds: DataStore) -> None: idx_len, idx_gen = self.get_full_process_ids(ds=ds, chunk_size=1000) From 242536bcabb7b9ce21df3548859e8d5e358132a0 Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Thu, 13 Nov 2025 18:03:08 +0300 Subject: [PATCH 7/8] fix for sqlite --- datapipe/meta/sql_meta.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 4fba5a60..e65412b3 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -1079,6 +1079,7 @@ def _initial_transformation_meta_extract_in_sql( transf: "BaseBatchTransformStep", all_input_tables, transform_keys ) -> None: sql = _join_input_tables_in_sql(all_input_tables, transform_keys) + sql = sql.where(sa.text('TRUE')) transf.meta_table.insert_rows_by_sql(sql) From bd2dbfb33c558a5f270b38ce314dfd9e8a60d8ca Mon Sep 17 00:00:00 2001 From: vladimir-m <23121066+swsvc@users.noreply.github.com> Date: Tue, 18 Nov 2025 23:06:52 +0300 Subject: [PATCH 8/8] calculate transaction meta on data meta using sql --- datapipe/meta/sql_meta.py | 25 ++++++++++++------------- tests/test_table_store_qdrant.py | 1 - 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e65412b3..fb809905 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -822,6 +822,10 @@ def _all_input_tables_from_same_sql_db(input_tables): return len(set(table_stores)) == 1 and is_all_table_store_db +def _all_input_tables_have_meta_table(input_tables): + pass + + def _split_tables_into_connected_groups( input_dts: List["ComputeInput"] ) -> Dict[Tuple[str], List["ComputeInput"]]: @@ -929,14 +933,14 @@ def _join_input_tables_in_sql(all_input_tables, transform_keys): for input_table in all_input_tables: for key in input_table.dt.primary_keys: if key in transform_keys: - select_columns_dict[key] = input_table.dt.table_store.data_table.c[key] + select_columns_dict[key] = input_table.dt.meta_table.sql_table.c[key] # sort keys because the order of selected cols here and in # TransformMetaTable.insert_rows_by_sql must be the same sorted_keys = sorted(select_columns_dict.keys()) select_columns = [select_columns_dict[key].label(key) for key in sorted_keys] first_table = all_input_tables[0] - sql = sa.select(*select_columns).select_from(first_table.dt.table_store.data_table) + sql = sa.select(*select_columns).select_from(first_table.dt.meta_table.sql_table) prev_tables = [first_table] for table in all_input_tables[1:]: onclause = [] @@ -945,13 +949,13 @@ def _join_input_tables_in_sql(all_input_tables, transform_keys): for key in table.dt.primary_keys: if key in prev_table.dt.primary_keys: onclause.append( - prev_table.dt.table_store.data_table.c[key] == table.dt.table_store.data_table.c[key] + prev_table.dt.meta_table.sql_table.c[key] == table.dt.meta_table.sql_table.c[key] ) if len(onclause) > 0: - sql = sql.join(table.dt.table_store.data_table, onclause=sa.and_(*onclause)) + sql = sql.join(table.dt.meta_table.sql_table, onclause=sa.and_(*onclause)) else: - sql = sql.join(table.dt.table_store.data_table, onclause=sa.literal(True)) + sql = sql.join(table.dt.meta_table.sql_table, onclause=sa.literal(True)) return sql @@ -962,7 +966,7 @@ def _calculate_changes_in_sql( sql = _join_input_tables_in_sql(all_input_tables, transf.transform_keys) primary_key_columns = [ - current_table.dt.table_store.data_table.c[pk] + current_table.dt.meta_table.sql_table.c[pk] for pk in current_table.dt.primary_keys ] @@ -1091,7 +1095,6 @@ def create_transformation_meta_for_changes( output_tables = set([dt.name for dt in transf.output_dts]) all_input_tables = [dt for dt in transf.input_dts if dt.dt.name not in output_tables] current_table = [dt for dt in transf.input_dts if dt.dt.name == current_table_name][0] - all_input_tables_from_sql_db = _all_input_tables_from_same_sql_db(all_input_tables) if not set(primary_keys).issubset(set(transf.transform_keys)): # what happens here: @@ -1102,12 +1105,7 @@ def create_transformation_meta_for_changes( transf.meta_table.reset_all_rows() changed_df = pd.DataFrame(columns=changed_df.columns) # don't process it - if all_input_tables_from_sql_db: - _calculate_changes_in_sql(transf, current_table, all_input_tables, new_df, changed_df) - else: - _calculate_changes_in_pandas( - transf, current_table_name, all_input_tables, primary_keys, new_df, changed_df - ) + _calculate_changes_in_sql(transf, current_table, all_input_tables, new_df, changed_df) def init_transformation_meta(transf: "BaseBatchTransformStep") -> None: @@ -1118,6 +1116,7 @@ def init_transformation_meta(transf: "BaseBatchTransformStep") -> None: if all_input_tables_from_sql_db: _initial_transformation_meta_extract_in_sql(transf, all_input_tables, transf.transform_keys) else: + # this case is required for test_chunked_processing_pipeline.py::test_table_store_json_line_reading _initial_transformation_meta_extract_in_pandas(transf, all_input_tables, transf.transform_keys) diff --git a/tests/test_table_store_qdrant.py b/tests/test_table_store_qdrant.py index 8db0dcda..d64445e6 100644 --- a/tests/test_table_store_qdrant.py +++ b/tests/test_table_store_qdrant.py @@ -24,7 +24,6 @@ def generate_data() -> Generator[pd.DataFrame, None, None]: yield pd.DataFrame({"id": [1], "embedding": [[0.1]], "str_payload": ["foo"], "int_payload": [42]}) -@pytest.mark.skip(reason="qdrant store cannot read all the rows from the index") def test_qdrant_table_to_json(dbconn: DBConn, tmp_dir: Path) -> None: ds = DataStore(dbconn, create_meta_table=True) catalog = Catalog(