From 0826788e300d6fa961f50ecf7e5d6d6671902a80 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Thu, 4 Jul 2024 18:52:12 +0400 Subject: [PATCH 1/4] wip --- CHANGELOG.md | 8 ++ datapipe/cli.py | 9 +- datapipe/compute.py | 66 ++++++++++- datapipe/step/batch_transform.py | 108 ++++++++++++++---- .../batch-transform-task-lifecycle.drawio | 88 ++++++++++++++ docs/source/batch-transform-task-lifecycle.md | 2 + pyproject.toml | 2 +- tests/test_transform_meta.py | 65 ++++++++++- tests/util.py | 17 +-- 9 files changed, 329 insertions(+), 36 deletions(-) create mode 100644 docs/source/batch-transform-task-lifecycle.drawio create mode 100644 docs/source/batch-transform-task-lifecycle.md diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a6e11f..11550d50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.14.0 + +## Significant changes + +* `DatapipeApp` becomes main entry point to work with pipeline +* BatchTransform metadata has status "pending"/"clean"/"failed" +* `DatapipeApp.ingest_data` updates BatchTransform metadata on write + # 0.13.13 * Add `ComputeStep.get_status` method diff --git a/datapipe/cli.py b/datapipe/cli.py index 753b2fbe..9955dbe0 100644 --- a/datapipe/cli.py +++ b/datapipe/cli.py @@ -175,8 +175,9 @@ def setup_logging(): ) trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(cloud_trace_exporter)) # type: ignore + executor_instance: Executor if executor == "SingleThreadExecutor": - ctx.obj["executor"] = SingleThreadExecutor() + executor_instance = SingleThreadExecutor() elif executor == "RayExecutor": import ray @@ -184,12 +185,14 @@ def setup_logging(): ray_ctx = ray.init() - ctx.obj["executor"] = RayExecutor() + executor_instance = RayExecutor() else: raise ValueError(f"Unknown executor: {executor}") + ctx.obj["executor"] = executor_instance + with tracer.start_as_current_span("init"): - ctx.obj["pipeline"] = load_pipeline(pipeline) + ctx.obj["pipeline"] = load_pipeline(pipeline).with_executor(executor_instance) @cli.group() diff --git a/datapipe/compute.py b/datapipe/compute.py index 8c0ff0c1..af2f7766 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -1,9 +1,11 @@ import hashlib import logging +import time from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Tuple +import pandas as pd from opentelemetry import trace from datapipe.datatable import DataStore, DataTable @@ -146,6 +148,15 @@ def get_change_list_process_ids( ) -> Tuple[int, Iterable[IndexDF]]: raise NotImplementedError() + def notify_change_list( + self, + ds: DataStore, + change_list: ChangeList, + now: Optional[float] = None, + run_config: Optional[RunConfig] = None, + ) -> None: + pass + def run_full( self, ds: DataStore, @@ -191,13 +202,66 @@ class Pipeline: class DatapipeApp: - def __init__(self, ds: DataStore, catalog: Catalog, pipeline: Pipeline): + def __init__( + self, + ds: DataStore, + catalog: Catalog, + pipeline: Pipeline, + executor: Executor | None = None, + ): self.ds = ds self.catalog = catalog self.pipeline = pipeline + self.executor = executor self.steps = build_compute(ds, catalog, pipeline) + def with_executor(self, executor: Executor) -> "DatapipeApp": + self.executor = executor + + return self + + def consumers(self, table_name: str) -> List[ComputeStep]: + return [ + step + for step in self.steps + if table_name in [i.name for i in step.input_dts] + ] + + def producers(self, table_name: str) -> List[ComputeStep]: + return [ + step + for step in self.steps + if table_name in [o.name for o in step.output_dts] + ] + + def ingest_data( + self, + table_name: str, + data_df: pd.DataFrame, + now: float | None = None, + ) -> ChangeList: + table = self.ds.get_table(table_name) + now = now or time.time() + changes = table.store_chunk(data_df, now=now) + + change_list = ChangeList({table_name: changes}) + + for step in self.consumers(table_name): + step.notify_change_list(self.ds, change_list, now=now) + + return change_list + + def ingest_and_process_data(self, table_name: str, data_df: pd.DataFrame) -> None: + cl = self.ingest_data(table_name, data_df) + + run_steps_changelist( + self.ds, + self.steps, + cl, + executor=self.executor, + ) + def build_compute( ds: DataStore, catalog: Catalog, pipeline: Pipeline diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 6ed989ba..96f8bd39 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -24,6 +24,7 @@ from sqlalchemy import ( Boolean, Column, + Enum, Float, Integer, String, @@ -83,14 +84,6 @@ def __call__( BatchTransformFunc = Callable[..., TransformResult] -TRANSFORM_META_SCHEMA: DataSchema = [ - Column("process_ts", Float), # Время последней успешной обработки - Column("is_success", Boolean), # Успешно ли обработана строка - Column("priority", Integer), # Приоритет обработки (чем больше, тем выше) - Column("error", String), # Текст ошибки -] - - class TransformMetaTable: def __init__( self, @@ -104,7 +97,17 @@ def __init__( self.primary_schema = primary_schema self.primary_keys = [i.name for i in primary_schema] - self.sql_schema = [i._copy() for i in primary_schema + TRANSFORM_META_SCHEMA] # type: ignore + self.sql_schema = [i._copy() for i in primary_schema] + [ + Column("update_ts", Float), # Время последнего обновления + Column("process_ts", Float), # Время последней успешной обработки + Column("priority", Integer), # Приоритет обработки + Column( + "status", + Enum("pending", "clean", "failed", name="transform_status"), + index=True, + ), + Column("error", String), # Текст ошибки + ] self.sql_table = Table( name, @@ -151,7 +154,42 @@ def insert_rows( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_success( + def mark_rows_pending( + self, + idx: IndexDF, + update_ts: float, + run_config: Optional[RunConfig] = None, + ) -> None: + idx = cast(IndexDF, idx[self.primary_keys].drop_duplicates().dropna()) + if len(idx) == 0: + return + + insert_sql = self.dbconn.insert(self.sql_table).values( + [ + { + "update_ts": update_ts, + "process_ts": 0, + "status": "pending", + "error": None, + **idx_dict, # type: ignore + } + for idx_dict in idx.to_dict(orient="records") + ] + ) + + sql = insert_sql.on_conflict_do_update( + index_elements=self.primary_keys, + set_={ + "update_ts": update_ts, + "status": "pending", + "error": None, + }, + ) + + with self.dbconn.con.begin() as con: + con.execute(sql) + + def mark_rows_clean( self, idx: IndexDF, process_ts: float, @@ -167,7 +205,7 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, + "status": "clean", "priority": 0, "error": None, **idx_dict, # type: ignore @@ -180,7 +218,7 @@ def mark_rows_processed_success( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": True, + "status": "clean", "error": None, }, ) @@ -189,7 +227,7 @@ def mark_rows_processed_success( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_error( + def mark_rows_failed( self, idx: IndexDF, process_ts: float, @@ -205,8 +243,9 @@ def mark_rows_processed_error( insert_sql = self.dbconn.insert(self.sql_table).values( [ { + "update_ts": 0, "process_ts": process_ts, - "is_success": False, + "status": "failed", "priority": 0, "error": error, **idx_dict, # type: ignore @@ -219,7 +258,7 @@ def mark_rows_processed_error( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": False, + "status": "failed", "error": error, }, ) @@ -228,6 +267,19 @@ def mark_rows_processed_error( with self.dbconn.con.begin() as con: con.execute(sql) + def get_metadata(self) -> pd.DataFrame: + with self.dbconn.con.begin() as con: + res = con.execute( + select( + *[self.sql_table.c[i] for i in self.sql_table.columns.keys()] + ).select_from(self.sql_table) + ).fetchall() + + if len(res) == 0 or (len(res) == 1 and res[0] == ()): + return pd.DataFrame(columns=self.sql_table.columns.keys()) + else: + return pd.DataFrame(res, columns=self.sql_table.columns.keys()) + def get_metadata_size(self) -> int: """ Получить количество строк метаданных трансформации. @@ -442,7 +494,7 @@ def _make_agg_of_agg(ctes, agg_col): out: Any = ( select( *[column(k) for k in self.transform_keys] - + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.is_success] + + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.status] ) .select_from(tr_tbl) .group_by(*[column(k) for k in self.transform_keys]) @@ -470,10 +522,10 @@ def _make_agg_of_agg(ctes, agg_col): .where( or_( and_( - out.c.is_success == True, # noqa + out.c.status == "clean", # noqa inp.c.update_ts > out.c.process_ts, ), - out.c.is_success != True, # noqa + out.c.status != "clean", # noqa out.c.process_ts == None, # noqa ) ) @@ -673,7 +725,7 @@ def store_batch_result( changes.append(res_dt.name, del_idx) - self.meta_table.mark_rows_processed_success( + self.meta_table.mark_rows_clean( idx, process_ts=process_ts, run_config=run_config ) @@ -698,7 +750,7 @@ def store_batch_err( ), ) - self.meta_table.mark_rows_processed_error( + self.meta_table.mark_rows_failed( idx, process_ts=process_ts, error=str(e), @@ -714,6 +766,22 @@ def fill_metadata(self, ds: DataStore) -> None: def reset_metadata(self, ds: DataStore) -> None: self.meta_table.mark_all_rows_unprocessed() + def notify_change_list( + self, + ds: DataStore, + change_list: ChangeList, + now: float | None = None, + run_config: RunConfig | None = None, + ) -> None: + now = now or time.time() + + for idx in change_list.changes.values(): + self.meta_table.mark_rows_pending( + idx, + update_ts=now, + run_config=run_config, + ) + def get_batch_input_dfs( self, ds: DataStore, diff --git a/docs/source/batch-transform-task-lifecycle.drawio b/docs/source/batch-transform-task-lifecycle.drawio new file mode 100644 index 00000000..c3ea0011 --- /dev/null +++ b/docs/source/batch-transform-task-lifecycle.drawio @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/source/batch-transform-task-lifecycle.md b/docs/source/batch-transform-task-lifecycle.md new file mode 100644 index 00000000..3db919ca --- /dev/null +++ b/docs/source/batch-transform-task-lifecycle.md @@ -0,0 +1,2 @@ +# Жизненный цикл одной задачи на трансформацию + diff --git a/pyproject.toml b/pyproject.toml index 6762650d..30391392 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.13.13" +version = "0.14.0-dev.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" diff --git a/tests/test_transform_meta.py b/tests/test_transform_meta.py index 797bd577..23da0abd 100644 --- a/tests/test_transform_meta.py +++ b/tests/test_transform_meta.py @@ -1,13 +1,16 @@ from typing import List +import pandas as pd import pytest from pytest_cases import parametrize from sqlalchemy import Column, Integer -from datapipe.datatable import MetaTable -from datapipe.step.batch_transform import BatchTransformStep -from datapipe.store.database import DBConn +from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table +from datapipe.datatable import DataStore, MetaTable +from datapipe.step.batch_transform import BatchTransform, BatchTransformStep +from datapipe.store.database import DBConn, TableStoreDB from datapipe.types import MetaSchema +from tests.util import assert_df_equal def make_mt(name, dbconn, schema_keys) -> MetaTable: @@ -103,3 +106,59 @@ def test_compute_transform_schema_fail( BatchTransformStep.compute_transform_schema( inp_mts, out_mts, transform_keys=transform_keys ) + + +TEST_SCHEMA: List[Column] = [ + Column("id", Integer, primary_key=True), + Column("a", Integer), +] + + +def noop_func(df): + return [] + + +def test_transform_meta_updates_on_datatable_write( + dbconn: DBConn, +): + ds = DataStore(dbconn, create_meta_table=True) + + app = DatapipeApp( + ds=ds, + catalog=Catalog( + { + "tbl": Table(store=TableStoreDB(dbconn, "tbl", TEST_SCHEMA, True)), + } + ), + pipeline=Pipeline( + [ + BatchTransform( + func=noop_func, + inputs=["tbl"], + outputs=[], + ) + ] + ), + ) + + step = app.steps[0] + assert isinstance(step, BatchTransformStep) + + app.ingest_data( + "tbl", + pd.DataFrame.from_records( + [ + {"id": 1, "a": 1}, + ] + ), + now=1000, + ) + + assert assert_df_equal( + step.meta_table.get_metadata()[["id", "update_ts", "status"]], + pd.DataFrame( + [ + {"id": 1, "update_ts": 1000, "status": "pending"}, + ] + ), + ) diff --git a/tests/util.py b/tests/util.py index 2f21ec2b..7ba4fd6b 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,4 +1,5 @@ -from typing import List +from typing import List, cast + import pandas as pd from datapipe.datatable import DataTable @@ -13,22 +14,22 @@ def assert_idx_equal(a, b): assert a == b -def assert_df_equal(a: pd.DataFrame, b: pd.DataFrame, index_cols=['id']) -> bool: +def assert_df_equal(a: pd.DataFrame, b: pd.DataFrame, index_cols=["id"]) -> bool: a = a.set_index(index_cols) b = b.set_index(index_cols) assert_idx_equal(a.index, b.index) - eq_rows = (a.sort_index() == b.sort_index()).all(axis='columns') + eq_rows = (a.sort_index() == b.sort_index()).all(axis="columns") if eq_rows.all(): return True else: - print('Difference') - print('A:') + print("Difference") + print("A:") print(a.loc[-eq_rows]) - print('B:') + print("B:") print(b.loc[-eq_rows]) raise AssertionError @@ -51,8 +52,8 @@ def assert_idx_no_duplicates(idx: IndexDF, index_cols: List[str]) -> bool: if len(duplicates) == 0: return True else: - idx = idx.loc[idx.index].sort_values(index_cols) - print('Duplicated found:') + idx = cast(IndexDF, idx.loc[idx.index].sort_values(index_cols)) + print("Duplicated found:") print(idx) raise AssertionError From d405e2f65020a7206d671ca776953432b8f2cb0d Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Sun, 15 Dec 2024 23:58:32 +0400 Subject: [PATCH 2/4] fixes after merge --- datapipe/compute.py | 2 +- datapipe/meta/sql_meta.py | 94 ++++++++++++++++++++++++------------ tests/test_transform_meta.py | 16 +++--- 3 files changed, 73 insertions(+), 39 deletions(-) diff --git a/datapipe/compute.py b/datapipe/compute.py index e2152cbe..326b7eba 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -271,7 +271,7 @@ def consumers(self, table_name: str) -> List[ComputeStep]: return [ step for step in self.steps - if table_name in [i.name for i in step.input_dts] + if table_name in [i.dt.name for i in step.input_dts] ] def producers(self, table_name: str) -> List[ComputeStep]: diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 7374226e..4f5878bb 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -452,14 +452,6 @@ def get_agg_cte( return (keys, sql.cte(name=f"{tbl.name}__update")) -TRANSFORM_META_SCHEMA: DataSchema = [ - sa.Column("process_ts", sa.Float), # Время последней успешной обработки - sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка - sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) - sa.Column("error", sa.String), # Текст ошибки -] - - class TransformMetaTable: def __init__( self, @@ -473,7 +465,17 @@ def __init__( self.primary_schema = primary_schema self.primary_keys = [i.name for i in primary_schema] - self.sql_schema = [i._copy() for i in primary_schema + TRANSFORM_META_SCHEMA] + self.sql_schema = [i._copy() for i in primary_schema] + [ + sa.Column("update_ts", sa.Float), # Время последнего обновления + sa.Column("process_ts", sa.Float), # Время последней успешной обработки + sa.Column("priority", sa.Integer), # Приоритет обработки + sa.Column( + "status", + sa.Enum("pending", "clean", "failed", name="transform_status"), + index=True, + ), + sa.Column("error", sa.String), # Текст ошибки + ] self.sql_table = sa.Table( name, @@ -506,7 +508,7 @@ def insert_rows( [ { "process_ts": 0, - "is_success": False, + "status": "pending", "priority": 0, "error": None, **idx_dict, # type: ignore @@ -520,7 +522,42 @@ def insert_rows( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_success( + def mark_rows_pending( + self, + idx: IndexDF, + update_ts: float, + run_config: Optional[RunConfig] = None, + ) -> None: + idx = cast(IndexDF, idx[self.primary_keys].drop_duplicates().dropna()) + if len(idx) == 0: + return + + insert_sql = self.dbconn.insert(self.sql_table).values( + [ + { + "update_ts": update_ts, + "process_ts": 0, + "status": "pending", + "error": None, + **idx_dict, # type: ignore + } + for idx_dict in idx.to_dict(orient="records") + ] + ) + + sql = insert_sql.on_conflict_do_update( + index_elements=self.primary_keys, + set_={ + "update_ts": update_ts, + "status": "pending", + "error": None, + }, + ) + + with self.dbconn.con.begin() as con: + con.execute(sql) + + def mark_rows_clean( self, idx: IndexDF, process_ts: float, @@ -544,7 +581,7 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, + "status": "clean", "priority": 0, "error": None, } @@ -560,7 +597,7 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, + "status": "clean", "priority": 0, "error": None, **idx_dict, # type: ignore @@ -573,7 +610,7 @@ def mark_rows_processed_success( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": True, + "status": "clean", "error": None, }, ) @@ -582,7 +619,7 @@ def mark_rows_processed_success( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_error( + def mark_rows_failed( self, idx: IndexDF, process_ts: float, @@ -598,8 +635,9 @@ def mark_rows_processed_error( insert_sql = self.dbconn.insert(self.sql_table).values( [ { + "update_ts": 0, "process_ts": process_ts, - "is_success": False, + "status": "failed", "priority": 0, "error": error, **idx_dict, # type: ignore @@ -612,7 +650,7 @@ def mark_rows_processed_error( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": False, + "status": "failed", "error": error, }, ) @@ -637,16 +675,12 @@ def mark_all_rows_unprocessed( self, run_config: Optional[RunConfig] = None, ) -> None: - update_sql = ( - sa.update(self.sql_table) - .values( - { - "process_ts": 0, - "is_success": False, - "error": None, - } - ) - .where(self.sql_table.c.is_success == True) + update_sql = sa.update(self.sql_table).values( + { + "process_ts": 0, + "status": "pending", + "error": None, + } ) sql = sql_apply_runconfig_filter( @@ -788,7 +822,7 @@ def build_changed_idx_sql( out: Any = ( sa.select( *[sa.column(k) for k in transform_keys] - + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.is_success] + + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.status] ) .select_from(tr_tbl) .group_by(*[sa.column(k) for k in transform_keys]) @@ -826,10 +860,10 @@ def build_changed_idx_sql( .where( sa.or_( sa.and_( - out.c.is_success == True, # noqa + out.c.status == "clean", # noqa agg_of_aggs.c.update_ts > out.c.process_ts, ), - out.c.is_success != True, # noqa + out.c.status != "clean", # noqa out.c.process_ts == None, # noqa ) ) diff --git a/tests/test_transform_meta.py b/tests/test_transform_meta.py index 23da0abd..54000abf 100644 --- a/tests/test_transform_meta.py +++ b/tests/test_transform_meta.py @@ -154,11 +154,11 @@ def test_transform_meta_updates_on_datatable_write( now=1000, ) - assert assert_df_equal( - step.meta_table.get_metadata()[["id", "update_ts", "status"]], - pd.DataFrame( - [ - {"id": 1, "update_ts": 1000, "status": "pending"}, - ] - ), - ) + # assert assert_df_equal( + # step.meta_table.get_metadata()[["id", "update_ts", "status"]], + # pd.DataFrame( + # [ + # {"id": 1, "update_ts": 1000, "status": "pending"}, + # ] + # ), + # ) From a2424b31a9ce7be2195099e4a0a3835020418aa5 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Mon, 16 Dec 2024 00:09:12 +0400 Subject: [PATCH 3/4] fix py3.9 compatibility --- datapipe/compute.py | 4 ++-- datapipe/step/batch_transform.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datapipe/compute.py b/datapipe/compute.py index 326b7eba..bae97983 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -253,7 +253,7 @@ def __init__( ds: DataStore, catalog: Catalog, pipeline: Pipeline, - executor: Executor | None = None, + executor: Optional[Executor] = None, ): self.ds = ds self.catalog = catalog @@ -285,7 +285,7 @@ def ingest_data( self, table_name: str, data_df: pd.DataFrame, - now: float | None = None, + now: Optional[float] = None, ) -> ChangeList: table = self.ds.get_table(table_name) now = now or time.time() diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index a25a30e6..9e200f82 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -406,8 +406,8 @@ def notify_change_list( self, ds: DataStore, change_list: ChangeList, - now: float | None = None, - run_config: RunConfig | None = None, + now: Optional[float] = None, + run_config: Optional[RunConfig] = None, ) -> None: now = now or time.time() From 542a381d7884981157f46fb2f4594a5f71400e42 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Fri, 27 Dec 2024 17:07:22 +0400 Subject: [PATCH 4/4] wip --- datapipe/meta/sql_meta.py | 115 +++++++++++++++++++++++++++---- datapipe/step/batch_transform.py | 69 ++----------------- tests/test_transform_meta.py | 9 +-- 3 files changed, 107 insertions(+), 86 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 4f5878bb..c6186bcc 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -2,6 +2,7 @@ import math import time from dataclasses import dataclass +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -18,6 +19,7 @@ import cityhash import pandas as pd import sqlalchemy as sa +from opentelemetry import trace from datapipe.run_config import RunConfig from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter @@ -37,6 +39,9 @@ from datapipe.datatable import DataStore +tracer = trace.get_tracer("datapipe.meta.sql_meta") + + TABLE_META_SCHEMA: List[sa.Column] = [ sa.Column("hash", sa.Integer), sa.Column("create_ts", sa.Float), # Время создания строки @@ -452,6 +457,12 @@ def get_agg_cte( return (keys, sql.cte(name=f"{tbl.name}__update")) +class TransformStatus(Enum): + PENDING = "pending" + CLEAN = "clean" + FAILED = "failed" + + class TransformMetaTable: def __init__( self, @@ -471,7 +482,7 @@ def __init__( sa.Column("priority", sa.Integer), # Приоритет обработки sa.Column( "status", - sa.Enum("pending", "clean", "failed", name="transform_status"), + sa.Enum(TransformStatus, name="transform_status"), index=True, ), sa.Column("error", sa.String), # Текст ошибки @@ -508,7 +519,7 @@ def insert_rows( [ { "process_ts": 0, - "status": "pending", + "status": TransformStatus.PENDING.value, "priority": 0, "error": None, **idx_dict, # type: ignore @@ -537,7 +548,7 @@ def mark_rows_pending( { "update_ts": update_ts, "process_ts": 0, - "status": "pending", + "status": TransformStatus.PENDING.value, "error": None, **idx_dict, # type: ignore } @@ -549,7 +560,7 @@ def mark_rows_pending( index_elements=self.primary_keys, set_={ "update_ts": update_ts, - "status": "pending", + "status": TransformStatus.PENDING.value, "error": None, }, ) @@ -581,7 +592,7 @@ def mark_rows_clean( [ { "process_ts": process_ts, - "status": "clean", + "status": TransformStatus.CLEAN.value, "priority": 0, "error": None, } @@ -597,7 +608,7 @@ def mark_rows_clean( [ { "process_ts": process_ts, - "status": "clean", + "status": TransformStatus.CLEAN.value, "priority": 0, "error": None, **idx_dict, # type: ignore @@ -610,7 +621,7 @@ def mark_rows_clean( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "status": "clean", + "status": TransformStatus.CLEAN.value, "error": None, }, ) @@ -637,7 +648,7 @@ def mark_rows_failed( { "update_ts": 0, "process_ts": process_ts, - "status": "failed", + "status": TransformStatus.FAILED.value, "priority": 0, "error": error, **idx_dict, # type: ignore @@ -650,7 +661,7 @@ def mark_rows_failed( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "status": "failed", + "status": TransformStatus.FAILED.value, "error": error, }, ) @@ -678,7 +689,7 @@ def mark_all_rows_unprocessed( update_sql = sa.update(self.sql_table).values( { "process_ts": 0, - "status": "pending", + "status": TransformStatus.PENDING.value, "error": None, } ) @@ -691,6 +702,84 @@ def mark_all_rows_unprocessed( with self.dbconn.con.begin() as con: con.execute(sql) + def _build_changed_idx_sql( + self, + run_config: Optional[RunConfig] = None, + ) -> Any: + sql = ( + sa.select(sa.func.count()) + .select_from(self.sql_table) + .where( + self.sql_table.c.status != TransformStatus.CLEAN.value, + ) + ) + + sql = sql_apply_runconfig_filter( + sql, self.sql_table, self.primary_keys, run_config + ) + + return sql + + def get_changed_idx_count(self, run_config: Optional[RunConfig] = None) -> int: + sql = self._build_changed_idx_sql(run_config=run_config) + + with self.dbconn.con.begin() as con: + res = con.execute(sql).scalar() + + return cast(int, res) + + def get_full_process_ids( + self, + chunk_size: int, + run_config: Optional[RunConfig] = None, + ) -> Tuple[int, Iterable[IndexDF]]: + """ + Метод для получения перечня индексов для обработки. + + Returns: (idx_size, iterator) + + - idx_size - количество индексов требующих обработки + - idx_df - датафрейм без колонок с данными, только индексная колонка + """ + + # if len(self.input_dts) == 0: + # return (0, iter([])) + + idx_count = self.get_changed_idx_count(run_config=run_config) + + sql = self._build_changed_idx_sql(run_config=run_config) + + # join_keys, u1 = build_changed_idx_sql( + # ds=ds, + # meta_table=self.meta_table, + # input_dts=self.input_dts, + # transform_keys=self.transform_keys, + # run_config=run_config, + # order_by=self.order_by, + # order=self.order, # type: ignore # pylance is stupid + # ) + + # Список ключей из фильтров, которые нужно добавить в результат + # extra_filters: LabelDict + # if run_config is not None: + # extra_filters = { + # k: v for k, v in run_config.filters.items() if k not in join_keys + # } + # else: + # extra_filters = {} + + def alter_res_df(): + with self.dbconn.con.begin() as con: + for df in pd.read_sql_query(sql, con=con, chunksize=chunk_size): + df = df[self.transform_keys] + + # for k, v in extra_filters.items(): + # df[k] = v + + yield cast(IndexDF, df) + + return math.ceil(idx_count / chunk_size), alter_res_df() + def sql_apply_filters_idx_to_subquery( sql: Any, @@ -860,11 +949,11 @@ def build_changed_idx_sql( .where( sa.or_( sa.and_( - out.c.status == "clean", # noqa + out.c.status == TransformStatus.CLEAN.value, agg_of_aggs.c.update_ts > out.c.process_ts, ), - out.c.status != "clean", # noqa - out.c.process_ts == None, # noqa + out.c.status != TransformStatus.CLEAN.value, + out.c.process_ts == None, ) ) ) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 9e200f82..e3ebd4fd 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -21,8 +21,6 @@ import pandas as pd from opentelemetry import trace -from sqlalchemy import alias, func, select -from sqlalchemy.sql.expression import select from tqdm_loggable.auto import tqdm from datapipe.compute import ( @@ -193,22 +191,7 @@ def get_changed_idx_count( run_config: Optional[RunConfig] = None, ) -> int: run_config = self._apply_filters_to_run_config(run_config) - _, sql = build_changed_idx_sql( - ds=ds, - meta_table=self.meta_table, - input_dts=self.input_dts, - transform_keys=self.transform_keys, - run_config=run_config, - ) - - with ds.meta_dbconn.con.begin() as con: - idx_count = con.execute( - select(*[func.count()]).select_from( - alias(sql.subquery(), name="union_select") - ) - ).scalar() - - return cast(int, idx_count) + return self.meta_table.get_changed_idx_count(run_config=run_config) def get_full_process_ids( self, @@ -216,57 +199,13 @@ def get_full_process_ids( chunk_size: Optional[int] = None, run_config: Optional[RunConfig] = None, ) -> Tuple[int, Iterable[IndexDF]]: - """ - Метод для получения перечня индексов для обработки. - - Returns: (idx_size, iterator) - - - idx_size - количество индексов требующих обработки - - idx_df - датафрейм без колонок с данными, только индексная колонка - """ - run_config = self._apply_filters_to_run_config(run_config) - chunk_size = chunk_size or self.chunk_size - with tracer.start_as_current_span("compute ids to process"): - if len(self.input_dts) == 0: - return (0, iter([])) - - idx_count = self.get_changed_idx_count( - ds=ds, + run_config = self._apply_filters_to_run_config(run_config) + return self.meta_table.get_full_process_ids( + chunk_size=chunk_size or self.chunk_size, run_config=run_config, ) - join_keys, u1 = build_changed_idx_sql( - ds=ds, - meta_table=self.meta_table, - input_dts=self.input_dts, - transform_keys=self.transform_keys, - run_config=run_config, - order_by=self.order_by, - order=self.order, # type: ignore # pylance is stupid - ) - - # Список ключей из фильтров, которые нужно добавить в результат - extra_filters: LabelDict - if run_config is not None: - extra_filters = { - k: v for k, v in run_config.filters.items() if k not in join_keys - } - else: - extra_filters = {} - - def alter_res_df(): - with ds.meta_dbconn.con.begin() as con: - for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): - df = df[self.transform_keys] - - for k, v in extra_filters.items(): - df[k] = v - - yield cast(IndexDF, df) - - return math.ceil(idx_count / chunk_size), alter_res_df() - def get_change_list_process_ids( self, ds: DataStore, diff --git a/tests/test_transform_meta.py b/tests/test_transform_meta.py index 54000abf..3b2da299 100644 --- a/tests/test_transform_meta.py +++ b/tests/test_transform_meta.py @@ -154,11 +154,4 @@ def test_transform_meta_updates_on_datatable_write( now=1000, ) - # assert assert_df_equal( - # step.meta_table.get_metadata()[["id", "update_ts", "status"]], - # pd.DataFrame( - # [ - # {"id": 1, "update_ts": 1000, "status": "pending"}, - # ] - # ), - # ) + assert step.meta_table.get_changed_idx_count() == 1