diff --git a/inventory_foundation_sdk/_modidx.py b/inventory_foundation_sdk/_modidx.py index dacbcd6..9927da2 100644 --- a/inventory_foundation_sdk/_modidx.py +++ b/inventory_foundation_sdk/_modidx.py @@ -26,10 +26,6 @@ 'inventory_foundation_sdk/custom_datasets.py'), 'inventory_foundation_sdk.custom_datasets.DynamicPathJSONDataset._save': ( 'custom_datasets.html#dynamicpathjsondataset._save', 'inventory_foundation_sdk/custom_datasets.py')}, -<<<<<<< HEAD - 'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.check_in_scope_entries': ( 'db_mgmt.html#check_in_scope_entries', - 'inventory_foundation_sdk/db_mgmt.py'), -======= 'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.SQLDatabase': ( 'db_mgmt.html#sqldatabase', 'inventory_foundation_sdk/db_mgmt.py'), 'inventory_foundation_sdk.db_mgmt.SQLDatabase.__init__': ( 'db_mgmt.html#sqldatabase.__init__', @@ -42,7 +38,8 @@ 'inventory_foundation_sdk/db_mgmt.py'), 'inventory_foundation_sdk.db_mgmt.SQLDatabase.execute_query': ( 'db_mgmt.html#sqldatabase.execute_query', 'inventory_foundation_sdk/db_mgmt.py'), ->>>>>>> origin/refactor-if-sdk + 'inventory_foundation_sdk.db_mgmt.check_in_scope_entries': ( 'db_mgmt.html#check_in_scope_entries', + 'inventory_foundation_sdk/db_mgmt.py'), 'inventory_foundation_sdk.db_mgmt.get_db_credentials': ( 'db_mgmt.html#get_db_credentials', 'inventory_foundation_sdk/db_mgmt.py'), 'inventory_foundation_sdk.db_mgmt.insert_multi_rows': ( 'db_mgmt.html#insert_multi_rows', @@ -99,6 +96,8 @@ 'inventory_foundation_sdk/etl_nodes.py'), 'inventory_foundation_sdk.etl_nodes.input_output_node': ( 'etl_nodes.html#input_output_node', 'inventory_foundation_sdk/etl_nodes.py')}, + 'inventory_foundation_sdk.kedro_orchestration': { 'inventory_foundation_sdk.kedro_orchestration.verify_db_write_status': ( 'kedro_orchestration.html#verify_db_write_status', + 'inventory_foundation_sdk/kedro_orchestration.py')}, 'inventory_foundation_sdk.state_mgmnt': { 'inventory_foundation_sdk.state_mgmnt.Flag': ( 'state_mgmt.html#flag', 'inventory_foundation_sdk/state_mgmnt.py'), 'inventory_foundation_sdk.state_mgmnt.Flag.__init__': ( 'state_mgmt.html#flag.__init__', diff --git a/inventory_foundation_sdk/db_mgmt.py b/inventory_foundation_sdk/db_mgmt.py index 793b60c..2017e86 100644 --- a/inventory_foundation_sdk/db_mgmt.py +++ b/inventory_foundation_sdk/db_mgmt.py @@ -3,11 +3,7 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/10_db_mgmt.ipynb. # %% auto 0 -<<<<<<< HEAD -__all__ = ['get_db_credentials', 'insert_multi_rows', 'check_in_scope_entries'] -======= -__all__ = ['get_db_credentials', 'insert_multi_rows', 'SQLDatabase'] ->>>>>>> origin/refactor-if-sdk +__all__ = ['get_db_credentials', 'insert_multi_rows', 'check_in_scope_entries', 'SQLDatabase'] # %% ../nbs/10_db_mgmt.ipynb 3 from kedro.config import OmegaConfigLoader @@ -20,13 +16,7 @@ import psycopg2 -<<<<<<< HEAD -import psycopg2 - -# %% ../nbs/10_db_mgmt.ipynb 5 -======= # %% ../nbs/10_db_mgmt.ipynb 4 ->>>>>>> origin/refactor-if-sdk def get_db_credentials(): """ Fetch PostgreSQL database credentials from the configuration file of the kedro project. @@ -43,11 +33,7 @@ def get_db_credentials(): return db_credentials -<<<<<<< HEAD -# %% ../nbs/10_db_mgmt.ipynb 6 -======= # %% ../nbs/10_db_mgmt.ipynb 5 ->>>>>>> origin/refactor-if-sdk def insert_multi_rows( data_to_insert: pd.DataFrame, table_name: str, @@ -89,11 +75,7 @@ def insert_multi_rows( "Number of types does not match the number of columns in the DataFrame." ) -<<<<<<< HEAD # logger.info("-- in insert multi rows -- converting data to list of tuples") -======= - logger.info("-- in insert multi rows -- converting data to list of tuples") ->>>>>>> origin/refactor-if-sdk # Convert to list of tuples and apply type casting data_values = data_to_insert.values.tolist() @@ -101,11 +83,7 @@ def insert_multi_rows( tuple(typ(val) for typ, val in zip(types, row)) for row in data_values ] -<<<<<<< HEAD # logger.info("-- in insert multi rows -- preparing SQL") -======= - logger.info("-- in insert multi rows -- preparing SQL") ->>>>>>> origin/refactor-if-sdk # Create SQL placeholders and query placeholders = ", ".join(["%s"] * len(column_names)) column_names_str = ", ".join(f'"{col}"' for col in column_names) @@ -171,8 +149,7 @@ def insert_multi_rows( return None -<<<<<<< HEAD -# %% ../nbs/10_db_mgmt.ipynb 7 +# %% ../nbs/10_db_mgmt.ipynb 6 def check_in_scope_entries( target_table, dataset_column, @@ -286,8 +263,124 @@ def check_in_scope_entries( except Exception as e: logger.error(f"Error checking in-scope entries for {target_table}: {e}") raise e -======= -# %% ../nbs/10_db_mgmt.ipynb 6 + +# %% ../nbs/10_db_mgmt.ipynb 7 +def insert_multi_rows( + data_to_insert: pd.DataFrame, + table_name: str, + column_names: list, + types: list, + cur, + conn, + return_with_ids: bool = False, + unique_columns: list = None, # mandatory if return_with_ids is True +) -> pd.DataFrame | None: + """ + Inserts data into the specified database table, with an optional return of database-assigned IDs. + + Args: + data_to_insert (pd.DataFrame): DataFrame containing the data to be inserted. + table_name (str): Name of the target database table. + column_names (list): List of column names for the target table. + types (list): List of Python types (e.g., [int, float]) for data conversion. + cur (psycopg2.cursor): Database cursor for executing SQL commands. + conn (psycopg2.connection): Database connection for committing transactions. + return_with_ids (bool): If True, returns the original DataFrame with an additional "ID" column. + + Returns: + pd.DataFrame | None: Original DataFrame with an "ID" column if `return_with_ids` is True; otherwise, None. + """ + # logger.info("-- in insert multi rows -- checking data") + + # Check for NaN values and log a warning if any are found + if data_to_insert.isnull().values.any(): + logger.warning("There are NaNs in the data") + + # Ensure the DataFrame has the correct number of columns + if len(column_names) != data_to_insert.shape[1]: + raise ValueError( + "Number of column names does not match the number of columns in the DataFrame." + ) + if len(types) != data_to_insert.shape[1]: + raise ValueError( + "Number of types does not match the number of columns in the DataFrame." + ) + + # logger.info("-- in insert multi rows -- converting data to list of tuples") + # Convert to list of tuples and apply type casting + + data_values = data_to_insert.values.tolist() + data_values = [ + tuple(typ(val) for typ, val in zip(types, row)) for row in data_values + ] + + # logger.info("-- in insert multi rows -- preparing SQL") + # Create SQL placeholders and query + placeholders = ", ".join(["%s"] * len(column_names)) + column_names_str = ", ".join(f'"{col}"' for col in column_names) + + batch_size_for_commit = ( + 1_000_000 # Adjust this based on your dataset size and transaction tolerance + ) + row_count = 0 + + if return_with_ids: + if not unique_columns: + raise ValueError( + "unique_columns must be provided when return_with_ids is True" + ) + + unique_columns_str = ", ".join(f'"{col}"' for col in unique_columns) + insert_query = f""" + INSERT INTO {table_name} ({column_names_str}) + VALUES ({placeholders}) + ON CONFLICT ({unique_columns_str}) + DO UPDATE SET "{unique_columns[0]}" = EXCLUDED."{unique_columns[0]}" + RETURNING "ID"; + """ + ids = [] + + # Insert row by row and collect IDs + with tqdm(total=len(data_values), desc="Inserting rows") as pbar: + for row in data_values: + cur.execute(insert_query, row) + row_id = cur.fetchone() + if row_id: + ids.append(row_id[0]) + row_count += 1 + pbar.update(1) # Update progress bar for each row + + # Commit every batch_size_for_commit rows + if row_count % batch_size_for_commit == 0: + conn.commit() # Commit the transaction + conn.commit() + + # Add IDs back to the original DataFrame + data_with_ids = data_to_insert.copy() + data_with_ids["ID"] = ids + return data_with_ids + + else: + insert_query = f""" + INSERT INTO {table_name} ({column_names_str}) + VALUES ({placeholders}) + ON CONFLICT DO NOTHING; + """ + + # Insert row by row without returning IDs + with tqdm(total=len(data_values), desc="Inserting rows") as pbar: + for row in data_values: + cur.execute(insert_query, row) + row_count += 1 + pbar.update(1) # Update progress bar for each row + if row_count % batch_size_for_commit == 0: + conn.commit() # Commit the transaction + + conn.commit() # Commit all changes after processing + + return None + +# %% ../nbs/10_db_mgmt.ipynb 8 class SQLDatabase: """ A class to represent a SQL database. @@ -437,4 +530,3 @@ def execute_multiple_queries( self.connection.commit() return results if fetchrows else None ->>>>>>> origin/refactor-if-sdk diff --git a/nbs/10_db_mgmt.ipynb b/nbs/10_db_mgmt.ipynb index 98c4cb9..2afdecb 100644 --- a/nbs/10_db_mgmt.ipynb +++ b/nbs/10_db_mgmt.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -20,7 +20,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -30,7 +30,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -70,7 +70,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -96,7 +96,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -215,7 +215,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -328,7 +328,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -446,7 +446,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -600,7 +600,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -611,21 +611,9 @@ ], "metadata": { "kernelspec": { - "display_name": "if_sdk", + "display_name": "python3", "language": "python", "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.10" } }, "nbformat": 4, diff --git a/nbs/20_state_mgmt.ipynb b/nbs/20_state_mgmt.ipynb index dcdb212..d015007 100644 --- a/nbs/20_state_mgmt.ipynb +++ b/nbs/20_state_mgmt.ipynb @@ -10,7 +10,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -19,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -29,7 +29,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -101,7 +101,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -122,7 +122,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -133,21 +133,9 @@ ], "metadata": { "kernelspec": { - "display_name": "if_sdk", + "display_name": "python3", "language": "python", "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.11" } }, "nbformat": 4,