Skip to content

Commit

Permalink
run nbdev_prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
majoma7 committed Jan 28, 2025
1 parent 75510db commit bbd0581
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 73 deletions.
9 changes: 4 additions & 5 deletions inventory_foundation_sdk/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
'inventory_foundation_sdk/custom_datasets.py'),
'inventory_foundation_sdk.custom_datasets.DynamicPathJSONDataset._save': ( 'custom_datasets.html#dynamicpathjsondataset._save',
'inventory_foundation_sdk/custom_datasets.py')},
<<<<<<< HEAD
'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.check_in_scope_entries': ( 'db_mgmt.html#check_in_scope_entries',
'inventory_foundation_sdk/db_mgmt.py'),
=======
'inventory_foundation_sdk.db_mgmt': { 'inventory_foundation_sdk.db_mgmt.SQLDatabase': ( 'db_mgmt.html#sqldatabase',
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.SQLDatabase.__init__': ( 'db_mgmt.html#sqldatabase.__init__',
Expand All @@ -42,7 +38,8 @@
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.SQLDatabase.execute_query': ( 'db_mgmt.html#sqldatabase.execute_query',
'inventory_foundation_sdk/db_mgmt.py'),
>>>>>>> origin/refactor-if-sdk
'inventory_foundation_sdk.db_mgmt.check_in_scope_entries': ( 'db_mgmt.html#check_in_scope_entries',
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.get_db_credentials': ( 'db_mgmt.html#get_db_credentials',
'inventory_foundation_sdk/db_mgmt.py'),
'inventory_foundation_sdk.db_mgmt.insert_multi_rows': ( 'db_mgmt.html#insert_multi_rows',
Expand Down Expand Up @@ -99,6 +96,8 @@
'inventory_foundation_sdk/etl_nodes.py'),
'inventory_foundation_sdk.etl_nodes.input_output_node': ( 'etl_nodes.html#input_output_node',
'inventory_foundation_sdk/etl_nodes.py')},
'inventory_foundation_sdk.kedro_orchestration': { 'inventory_foundation_sdk.kedro_orchestration.verify_db_write_status': ( 'kedro_orchestration.html#verify_db_write_status',
'inventory_foundation_sdk/kedro_orchestration.py')},
'inventory_foundation_sdk.state_mgmnt': { 'inventory_foundation_sdk.state_mgmnt.Flag': ( 'state_mgmt.html#flag',
'inventory_foundation_sdk/state_mgmnt.py'),
'inventory_foundation_sdk.state_mgmnt.Flag.__init__': ( 'state_mgmt.html#flag.__init__',
Expand Down
148 changes: 120 additions & 28 deletions inventory_foundation_sdk/db_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/10_db_mgmt.ipynb.

# %% auto 0
<<<<<<< HEAD
__all__ = ['get_db_credentials', 'insert_multi_rows', 'check_in_scope_entries']
=======
__all__ = ['get_db_credentials', 'insert_multi_rows', 'SQLDatabase']
>>>>>>> origin/refactor-if-sdk
__all__ = ['get_db_credentials', 'insert_multi_rows', 'check_in_scope_entries', 'SQLDatabase']

# %% ../nbs/10_db_mgmt.ipynb 3
from kedro.config import OmegaConfigLoader
Expand All @@ -20,13 +16,7 @@

import psycopg2

<<<<<<< HEAD
import psycopg2

# %% ../nbs/10_db_mgmt.ipynb 5
=======
# %% ../nbs/10_db_mgmt.ipynb 4
>>>>>>> origin/refactor-if-sdk
def get_db_credentials():
"""
Fetch PostgreSQL database credentials from the configuration file of the kedro project.
Expand All @@ -43,11 +33,7 @@ def get_db_credentials():

return db_credentials

<<<<<<< HEAD
# %% ../nbs/10_db_mgmt.ipynb 6
=======
# %% ../nbs/10_db_mgmt.ipynb 5
>>>>>>> origin/refactor-if-sdk
def insert_multi_rows(
data_to_insert: pd.DataFrame,
table_name: str,
Expand Down Expand Up @@ -89,23 +75,15 @@ def insert_multi_rows(
"Number of types does not match the number of columns in the DataFrame."
)

<<<<<<< HEAD
# logger.info("-- in insert multi rows -- converting data to list of tuples")
=======
logger.info("-- in insert multi rows -- converting data to list of tuples")
>>>>>>> origin/refactor-if-sdk
# Convert to list of tuples and apply type casting

data_values = data_to_insert.values.tolist()
data_values = [
tuple(typ(val) for typ, val in zip(types, row)) for row in data_values
]

<<<<<<< HEAD
# logger.info("-- in insert multi rows -- preparing SQL")
=======
logger.info("-- in insert multi rows -- preparing SQL")
>>>>>>> origin/refactor-if-sdk
# Create SQL placeholders and query
placeholders = ", ".join(["%s"] * len(column_names))
column_names_str = ", ".join(f'"{col}"' for col in column_names)
Expand Down Expand Up @@ -171,8 +149,7 @@ def insert_multi_rows(

return None

<<<<<<< HEAD
# %% ../nbs/10_db_mgmt.ipynb 7
# %% ../nbs/10_db_mgmt.ipynb 6
def check_in_scope_entries(
target_table,
dataset_column,
Expand Down Expand Up @@ -286,8 +263,124 @@ def check_in_scope_entries(
except Exception as e:
logger.error(f"Error checking in-scope entries for {target_table}: {e}")
raise e
=======
# %% ../nbs/10_db_mgmt.ipynb 6

# %% ../nbs/10_db_mgmt.ipynb 7
def insert_multi_rows(
data_to_insert: pd.DataFrame,
table_name: str,
column_names: list,
types: list,
cur,
conn,
return_with_ids: bool = False,
unique_columns: list = None, # mandatory if return_with_ids is True
) -> pd.DataFrame | None:
"""
Inserts data into the specified database table, with an optional return of database-assigned IDs.
Args:
data_to_insert (pd.DataFrame): DataFrame containing the data to be inserted.
table_name (str): Name of the target database table.
column_names (list): List of column names for the target table.
types (list): List of Python types (e.g., [int, float]) for data conversion.
cur (psycopg2.cursor): Database cursor for executing SQL commands.
conn (psycopg2.connection): Database connection for committing transactions.
return_with_ids (bool): If True, returns the original DataFrame with an additional "ID" column.
Returns:
pd.DataFrame | None: Original DataFrame with an "ID" column if `return_with_ids` is True; otherwise, None.
"""
# logger.info("-- in insert multi rows -- checking data")

# Check for NaN values and log a warning if any are found
if data_to_insert.isnull().values.any():
logger.warning("There are NaNs in the data")

# Ensure the DataFrame has the correct number of columns
if len(column_names) != data_to_insert.shape[1]:
raise ValueError(
"Number of column names does not match the number of columns in the DataFrame."
)
if len(types) != data_to_insert.shape[1]:
raise ValueError(
"Number of types does not match the number of columns in the DataFrame."
)

# logger.info("-- in insert multi rows -- converting data to list of tuples")
# Convert to list of tuples and apply type casting

data_values = data_to_insert.values.tolist()
data_values = [
tuple(typ(val) for typ, val in zip(types, row)) for row in data_values
]

# logger.info("-- in insert multi rows -- preparing SQL")
# Create SQL placeholders and query
placeholders = ", ".join(["%s"] * len(column_names))
column_names_str = ", ".join(f'"{col}"' for col in column_names)

batch_size_for_commit = (
1_000_000 # Adjust this based on your dataset size and transaction tolerance
)
row_count = 0

if return_with_ids:
if not unique_columns:
raise ValueError(
"unique_columns must be provided when return_with_ids is True"
)

unique_columns_str = ", ".join(f'"{col}"' for col in unique_columns)
insert_query = f"""
INSERT INTO {table_name} ({column_names_str})
VALUES ({placeholders})
ON CONFLICT ({unique_columns_str})
DO UPDATE SET "{unique_columns[0]}" = EXCLUDED."{unique_columns[0]}"
RETURNING "ID";
"""
ids = []

# Insert row by row and collect IDs
with tqdm(total=len(data_values), desc="Inserting rows") as pbar:
for row in data_values:
cur.execute(insert_query, row)
row_id = cur.fetchone()
if row_id:
ids.append(row_id[0])
row_count += 1
pbar.update(1) # Update progress bar for each row

# Commit every batch_size_for_commit rows
if row_count % batch_size_for_commit == 0:
conn.commit() # Commit the transaction
conn.commit()

# Add IDs back to the original DataFrame
data_with_ids = data_to_insert.copy()
data_with_ids["ID"] = ids
return data_with_ids

else:
insert_query = f"""
INSERT INTO {table_name} ({column_names_str})
VALUES ({placeholders})
ON CONFLICT DO NOTHING;
"""

# Insert row by row without returning IDs
with tqdm(total=len(data_values), desc="Inserting rows") as pbar:
for row in data_values:
cur.execute(insert_query, row)
row_count += 1
pbar.update(1) # Update progress bar for each row
if row_count % batch_size_for_commit == 0:
conn.commit() # Commit the transaction

conn.commit() # Commit all changes after processing

return None

# %% ../nbs/10_db_mgmt.ipynb 8
class SQLDatabase:
"""
A class to represent a SQL database.
Expand Down Expand Up @@ -437,4 +530,3 @@ def execute_multiple_queries(
self.connection.commit()

return results if fetchrows else None
>>>>>>> origin/refactor-if-sdk
32 changes: 10 additions & 22 deletions nbs/10_db_mgmt.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -20,7 +20,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -30,7 +30,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -70,7 +70,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -96,7 +96,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -215,7 +215,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -328,7 +328,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -446,7 +446,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -600,7 +600,7 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -611,21 +611,9 @@
],
"metadata": {
"kernelspec": {
"display_name": "if_sdk",
"display_name": "python3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.10"
}
},
"nbformat": 4,
Expand Down
Loading

0 comments on commit bbd0581

Please sign in to comment.