Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ def __init__(
self._labels = labels
self.executor_config = executor_config

for input_dt in input_dts:
dt = input_dt if isinstance(input_dt, DataTable) else input_dt.dt
dt.add_transformations([self])

def get_name(self) -> str:
ss = [
self.__class__.__name__,
Expand Down
29 changes: 28 additions & 1 deletion datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentelemetry import trace

from datapipe.event_logger import EventLogger
from datapipe.meta.sql_meta import MetaTable
from datapipe.meta.sql_meta import MetaTable, create_transformation_meta_for_changes
from datapipe.run_config import RunConfig
from datapipe.store.database import DBConn
from datapipe.store.table_store import TableStore
Expand Down Expand Up @@ -46,6 +46,7 @@ def get_metadata(self, idx: Optional[IndexDF] = None) -> MetadataDF:
def get_data(self, idx: Optional[IndexDF] = None) -> DataDF:
return self.table_store.read_rows(self.meta_table.get_existing_idx(idx))


def reset_metadata(self):
with self.meta_dbconn.con.begin() as con:
con.execute(self.meta_table.sql_table.update().values(process_ts=0, update_ts=0))
Expand All @@ -56,6 +57,17 @@ def get_size(self) -> int:
"""
return self.meta_table.get_metadata_size(idx=None, include_deleted=False)

def add_transformations(self, transformations):
if not hasattr(self, "transformations"):
self.transformations = []
self.transformations.extend(transformations)

def get_index_data(self) -> DataDF:
# this can be optimized with new method in table_store that reads only index values
# however this causes large refactoring
all_data = self.table_store.read_rows()
return all_data[self.primary_keys]

def store_chunk(
self,
data_df: DataDF,
Expand Down Expand Up @@ -112,6 +124,14 @@ def store_chunk(
changes.append(data_to_index(new_df, self.primary_keys))
if not changed_df.empty:
changes.append(data_to_index(changed_df, self.primary_keys))

with tracer.start_as_current_span("store transformation metadata"):
transformations = getattr(self, 'transformations', [])
create_transformation_meta_for_changes(
transformations, self.name, self.primary_keys,
new_df, changed_df
)

else:
data_df = pd.DataFrame(columns=self.primary_keys)

Expand All @@ -138,9 +158,16 @@ def delete_by_idx(
if len(idx) > 0:
logger.debug(f"Deleting {len(idx.index)} rows from {self.name} data")

transformations = getattr(self, 'transformations', [])
create_transformation_meta_for_changes(
transformations, self.name, self.primary_keys,
pd.DataFrame(columns=self.primary_keys), idx
)

self.table_store.delete_rows(idx)
self.meta_table.mark_rows_deleted(idx, now=now)


def delete_stale_by_process_ts(
self,
process_ts: float,
Expand Down
Loading