From 2ab25d2475e817fb6ba95244133b810ed6801813 Mon Sep 17 00:00:00 2001 From: sergeyz06 Date: Sat, 26 Oct 2024 19:51:08 +0300 Subject: [PATCH 1/4] bq: insert_rows implemented --- datapipe/store/bigquery.py | 162 +++++++++++++++++++++++++++++++++++++ pyproject.toml | 4 + 2 files changed, 166 insertions(+) create mode 100644 datapipe/store/bigquery.py diff --git a/datapipe/store/bigquery.py b/datapipe/store/bigquery.py new file mode 100644 index 00000000..fa517934 --- /dev/null +++ b/datapipe/store/bigquery.py @@ -0,0 +1,162 @@ +from typing import Iterator, List, Optional, Union + +from datapipe.store.table_store import TableStore +from datapipe.run_config import RunConfig +from datapipe.types import DataDF, DataSchema, IndexDF, MetaSchema, data_to_index + +from google.cloud import bigquery +from google.oauth2 import service_account +from google.cloud.exceptions import NotFound + +from sqlalchemy import Column + + +# sqlalchemy types to GoogleSQL data types +SCHEMA_MAPPING = { + "ARRAY": "ARRAY", + "BIGINT": "INT64", + "BINARY": "BYTES", + "BLOB": "STRING", + "BOOLEAN": "BOOL", + "CHAR": "STRING", + "CLOB": "STRING", + "DATE": "DATE", + "DATETIME": "DATETIME", + "DECIMAL": "FLOAT64", + "DOUBLE": "FLOAT64", + "DOUBLE_PRECISION": "FLOAT64", + "FLOAT": "FLOAT64", + "INT": "INT64", + "JSON": "JSON", + "INTEGER": "INT64", + "NCHAR": "STRING", + "NVARCHAR": "STRING", + "NUMERIC": "NUMERIC", + "REAL": "FLOAT64", + "SMALLINT": "INT64", + "TEXT": "STRING", + "TIME": "TIME", + "TIMESTAMP": "TIMESTAMP", + "UUID": "STRING", + "VARBINARY": "BYTES", + "VARCHAR": "STRING", +} + + +def is_table_exists(client: bigquery.Client, table) -> bool: + try: + client.get_table(table) + return True + except NotFound: + return False + + +def is_dataset_exists( + client: bigquery.Client, dataset: bigquery.Client.dataset +) -> bool: + try: + client.get_dataset(dataset) + return True + except NotFound: + return False + + +class BQClient: + def __init__(self, service_account_file): + self.bq_client = bigquery.Client( + credentials=service_account.Credentials.from_service_account_file( + service_account_file + ) + ) + + def __call__(self, *args, **kwds): + return self.bq_client(*args, **kwds) + + +class TableStoreBQ(TableStore): + def __init__( + self, + bq_client: str, + name: str, + data_sql_schema: List[Column], + dataset_id: str, + table_id: str, + ) -> None: + self.bq_client = bq_client + self.name = name + self.data_sql_schema = data_sql_schema + self.prim_keys = [ + column for column in self.data_sql_schema if column.primary_key + ] + self.value_cols = [ + column for column in self.data_sql_schema if not column.primary_key + ] + self.dataset_id = dataset_id + self.table_id = table_id + + + def get_primary_schema(self) -> DataSchema: + raise NotImplementedError + + + def get_meta_schema(self) -> MetaSchema: + raise NotImplementedError + + + def get_schema(self) -> DataSchema: + raise NotImplementedError + + + @property + def primary_keys(self) -> List[str]: + return [i.name for i in self.get_primary_schema()] + + + def delete_rows(self, idx: IndexDF) -> None: + raise NotImplementedError + + + def insert_rows(self, df: DataDF) -> None: + dataset_ref = self.bq_client.dataset(self.dataset_id) + table_ref = dataset_ref.table(str(self.table_id)) + + if not is_dataset_exists(self.bq_client, dataset_ref): + self.bq_client.create_dataset(dataset_ref) + + schema_prim_keys = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="REQUIRED") for column in self.prim_keys] + schema_value_cols = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="NULLABLE") for column in self.value_cols] + + table = bigquery.Table( + table_ref=table_ref, + schema=schema_prim_keys+schema_value_cols + ) + + table.clustering_fields = [column.name for column in self.data_sql_schema if column.primary_key][0:4] + table = self.bq_client.create_table(table, exists_ok=True) + + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND + + self.bq_client.load_table_from_dataframe( + df, + table, + job_config=job_config, + ).result() + + + def update_rows(self, df: DataDF) -> None: + if df.empty: + return + self.delete_rows(data_to_index(df, self.primary_keys)) + self.insert_rows(df) + + + def read_rows(self, idx: Optional[IndexDF] = None) -> DataDF: + raise NotImplementedError + + + def read_rows_meta_pseudo_df( + self, chunksize: int = 1000, run_config: Optional[RunConfig] = None + ) -> Iterator[DataDF]: + # FIXME сделать честную чанкированную реализацию во всех сторах + yield self.read_rows() diff --git a/pyproject.toml b/pyproject.toml index 4557b41a..661cc98c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,10 @@ click = ">=7.1.2" rich = "^13.3.2" ray = { version = "^2.5.0", optional = true, extras = ["default"] } +google = "^3.0.0" +google-auth = "^2.35.0" +google-cloud-bigquery = "^3.26.0" +pyarrow = "^17.0.0" [tool.poetry.extras] From 91d93516381b36f09e05101a20c6171416e0fb70 Mon Sep 17 00:00:00 2001 From: sergeyz06 Date: Sat, 26 Oct 2024 20:07:00 +0300 Subject: [PATCH 2/4] bq test --- .gitignore | 1 + .vscode/settings.json | 2 +- test.py | 44 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 test.py diff --git a/.gitignore b/.gitignore index 1ea9651f..690cce51 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ dist/ test_data/ poetry.lock +client_secrets.json diff --git a/.vscode/settings.json b/.vscode/settings.json index b4767b67..b2f56af5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ ], "[python]": { "editor.defaultFormatter": "ms-python.black-formatter", - "editor.formatOnSave": true, + "editor.formatOnSave": false, "editor.codeActionsOnSave": { "source.organizeImports": "explicit" }, diff --git a/test.py b/test.py new file mode 100644 index 00000000..859f6744 --- /dev/null +++ b/test.py @@ -0,0 +1,44 @@ +from datapipe.store.bigquery import BQClient +from datapipe.store.bigquery import TableStoreBQ + +import pandas as pd + +from sqlalchemy import Column +from sqlalchemy import types + + +BQ_CREDENTIALS = r"./client_secrets.json" + +STORE_NAME = r"test_transformation" +STORE_DATA_SQL_SCHEMA = [ + Column("col_1", types.BIGINT, primary_key=True), + Column("col_2", types.CHAR), + Column("col_3", types.BOOLEAN), +] +STORE_DATASET_ID = r"datapipe_test" +STORE_TABLE_ID = r"test" + + +bq_client = BQClient(service_account_file=BQ_CREDENTIALS) + +table_store_bq = TableStoreBQ( + bq_client=bq_client.bq_client, + name=STORE_NAME, + data_sql_schema=STORE_DATA_SQL_SCHEMA, + dataset_id=STORE_DATASET_ID, + table_id=STORE_TABLE_ID, +) + + +df = pd.DataFrame( + data={ + "col_1": [1, 2, 3], + "col_2": ["a", "b", "c"], + "col_3": [False, True, None], + } +) + +print(df) + + +table_store_bq.insert_rows(df) From 96105f3c8fa2ede004e2f99056596730c7781c73 Mon Sep 17 00:00:00 2001 From: sergeyz06 Date: Fri, 1 Nov 2024 18:50:32 +0300 Subject: [PATCH 3/4] bq: + read, delete, update rows --- datapipe/store/bigquery.py | 63 ++++++++++++++++++++------------------ pyproject.toml | 1 + test.py | 5 ++- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/datapipe/store/bigquery.py b/datapipe/store/bigquery.py index fa517934..6dff72b9 100644 --- a/datapipe/store/bigquery.py +++ b/datapipe/store/bigquery.py @@ -76,7 +76,7 @@ def __call__(self, *args, **kwds): class TableStoreBQ(TableStore): def __init__( self, - bq_client: str, + bq_client: bigquery.Client, name: str, data_sql_schema: List[Column], dataset_id: str, @@ -85,14 +85,33 @@ def __init__( self.bq_client = bq_client self.name = name self.data_sql_schema = data_sql_schema - self.prim_keys = [ + + dataset_ref = self.bq_client.dataset(dataset_id) + table_ref = dataset_ref.table(str(table_id)) + + if not is_dataset_exists(self.bq_client, dataset_ref): + self.bq_client.create_dataset(dataset_ref) + + prim_keys = [ column for column in self.data_sql_schema if column.primary_key ] - self.value_cols = [ + value_cols = [ column for column in self.data_sql_schema if not column.primary_key ] - self.dataset_id = dataset_id - self.table_id = table_id + + schema_prim_keys = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="REQUIRED") for column in prim_keys] + schema_value_cols = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="NULLABLE") for column in value_cols] + + self.table = bigquery.Table( + table_ref=table_ref, + schema=schema_prim_keys+schema_value_cols + ) + + self.table.clustering_fields = [column.name for column in self.data_sql_schema if column.primary_key][0:4] + self.table = self.bq_client.create_table(self.table, exists_ok=True) + + self.job_config = bigquery.LoadJobConfig() + self.job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND def get_primary_schema(self) -> DataSchema: @@ -113,46 +132,32 @@ def primary_keys(self) -> List[str]: def delete_rows(self, idx: IndexDF) -> None: - raise NotImplementedError + dml = f"DELETE FROM `{self.table.project}`.`{self.table.dataset_id}`.`{self.table.table_id}` WHERE TRUE;" + self.bq_client.query(dml) def insert_rows(self, df: DataDF) -> None: - dataset_ref = self.bq_client.dataset(self.dataset_id) - table_ref = dataset_ref.table(str(self.table_id)) - - if not is_dataset_exists(self.bq_client, dataset_ref): - self.bq_client.create_dataset(dataset_ref) - - schema_prim_keys = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="REQUIRED") for column in self.prim_keys] - schema_value_cols = [bigquery.SchemaField(column.name, SCHEMA_MAPPING.get(f"{column.type}", "STRING"), mode="NULLABLE") for column in self.value_cols] - - table = bigquery.Table( - table_ref=table_ref, - schema=schema_prim_keys+schema_value_cols - ) - - table.clustering_fields = [column.name for column in self.data_sql_schema if column.primary_key][0:4] - table = self.bq_client.create_table(table, exists_ok=True) - - job_config = bigquery.LoadJobConfig() - job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND - self.bq_client.load_table_from_dataframe( df, - table, - job_config=job_config, + self.table, + job_config=self.job_config, ).result() def update_rows(self, df: DataDF) -> None: if df.empty: return + self.delete_rows(data_to_index(df, self.primary_keys)) self.insert_rows(df) def read_rows(self, idx: Optional[IndexDF] = None) -> DataDF: - raise NotImplementedError + sql = f"SELECT * FROM `{self.table.project}`.`{self.table.dataset_id}`.`{self.table.table_id}`;" + result = self.bq_client.query_and_wait(sql) + df = result.to_dataframe() + + return df def read_rows_meta_pseudo_df( diff --git a/pyproject.toml b/pyproject.toml index 661cc98c..49893121 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,7 @@ google = "^3.0.0" google-auth = "^2.35.0" google-cloud-bigquery = "^3.26.0" pyarrow = "^17.0.0" +db-dtypes = "^1.3.0" [tool.poetry.extras] diff --git a/test.py b/test.py index 859f6744..68c40e40 100644 --- a/test.py +++ b/test.py @@ -41,4 +41,7 @@ print(df) -table_store_bq.insert_rows(df) +# table_store_bq.insert_rows(df) + +df = table_store_bq.read_rows() +print(df) From 6f2dc65b42bfc64cad22833c5df0348ea0f729d1 Mon Sep 17 00:00:00 2001 From: sergeyz06 Date: Mon, 10 Nov 2025 23:07:24 +0300 Subject: [PATCH 4/4] bq: + comments --- datapipe/store/bigquery.py | 15 +++++++++++++++ test.py | 10 +++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/datapipe/store/bigquery.py b/datapipe/store/bigquery.py index 6dff72b9..bbaa5335 100644 --- a/datapipe/store/bigquery.py +++ b/datapipe/store/bigquery.py @@ -86,6 +86,12 @@ def __init__( self.name = name self.data_sql_schema = data_sql_schema + # Труба оперирует партициями BQ - так наиболее эффективно и экономно. + # Для трубы это индекс, для BQ это партиция. + # IndexDF - pandas dataframe, где указаны индексы всех строк, в которых были изменения и которые нужно пересчитать. + + # При инициализации TableStoreBQ можно указывать ключи кластеризации для ещё большей оптимизации. + dataset_ref = self.bq_client.dataset(dataset_id) table_ref = dataset_ref.table(str(table_id)) @@ -115,6 +121,7 @@ def __init__( def get_primary_schema(self) -> DataSchema: + # Нужно реализовать raise NotImplementedError @@ -123,6 +130,7 @@ def get_meta_schema(self) -> MetaSchema: def get_schema(self) -> DataSchema: + # Нужно реализовать raise NotImplementedError @@ -164,4 +172,11 @@ def read_rows_meta_pseudo_df( self, chunksize: int = 1000, run_config: Optional[RunConfig] = None ) -> Iterator[DataDF]: # FIXME сделать честную чанкированную реализацию во всех сторах + + # Актуализация метаданных о таблице, которая может быть обновлена не только трубой - для BQ возможно пригодится. + # Подсчёт чек-суммы и сравнить: изменились ли данные. + # То есть, если таблица-источник изменилась в части некоторых партиций, то эти партиции нужно пересчитать. + + # Понять, как дёшего получать информацию об изменениях в партициях. + yield self.read_rows() diff --git a/test.py b/test.py index 68c40e40..966f203b 100644 --- a/test.py +++ b/test.py @@ -16,7 +16,7 @@ Column("col_3", types.BOOLEAN), ] STORE_DATASET_ID = r"datapipe_test" -STORE_TABLE_ID = r"test" +STORE_TABLE_ID = r"test_2" bq_client = BQClient(service_account_file=BQ_CREDENTIALS) @@ -33,7 +33,7 @@ df = pd.DataFrame( data={ "col_1": [1, 2, 3], - "col_2": ["a", "b", "c"], + "col_2": ["a", "b", "cc"], "col_3": [False, True, None], } ) @@ -41,7 +41,7 @@ print(df) -# table_store_bq.insert_rows(df) +table_store_bq.insert_rows(df) -df = table_store_bq.read_rows() -print(df) +# df = table_store_bq.read_rows() +# print(df)