From 22920b81dc236df3d2f2a0f2e50a59d134097ec1 Mon Sep 17 00:00:00 2001 From: Lewis Chambers Date: Mon, 21 Oct 2024 11:58:26 +0100 Subject: [PATCH 1/4] Added cosmos_status enum and methods for joining loose .dat into once CSV file --- src/iotswarm/processing.py | 117 +++++++++++++++++++++++++++++++++++++ src/iotswarm/queries.py | 1 + src/iotswarm/utils.py | 56 +----------------- 3 files changed, 119 insertions(+), 55 deletions(-) create mode 100644 src/iotswarm/processing.py diff --git a/src/iotswarm/processing.py b/src/iotswarm/processing.py new file mode 100644 index 0000000..f6df412 --- /dev/null +++ b/src/iotswarm/processing.py @@ -0,0 +1,117 @@ +from pathlib import Path +from typing import List, Optional +import pandas as pd +import sqlite3 +from glob import glob +from concurrent.futures import ProcessPoolExecutor +from tqdm import tqdm + +def build_database_from_csv( + csv_file: str | Path, + database: str | Path, + table_name: str, + timestamp_header: str, + sort_by: str | None = None, + date_time_format: str = r"%d-%b-%y %H.%M.%S", +) -> None: + """Adds a database table using a csv file with headers. + + Args: + csv_file: A path to the csv. + database: Output destination of the database. File is created if not + existing. + table_name: Name of the table to add into database. + timestamp_header: Name of the column with a timestamp + sort_by: Column to sort by + date_time_format: Format of datetime column + """ + + if not isinstance(csv_file, Path): + csv_file = Path(csv_file) + + if not isinstance(database, Path): + database = Path(database) + + if not csv_file.exists(): + raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"') + + if not database.parent.exists(): + raise NotADirectoryError(f'Database directory not found: "{database.parent}"') + + with sqlite3.connect(database) as conn: + print( + f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"' + ) + print("Loading csv") + df = pd.read_csv(csv_file) + print("Done") + print("Formatting dates") + # print(df.loc[782794]) + df[timestamp_header] = pd.to_datetime(df[timestamp_header], format=date_time_format) + print("Done") + if sort_by is not None: + print("Sorting.") + df = df.sort_values(by=sort_by) + print("Done") + + print("Writing to db.") + df.to_sql(table_name, conn, if_exists="replace", index=False) + print("Writing complete.") + + +def _read_csv_file(file_path): + return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3]) + +def _write_batch_to_file(batch_df, dst, mode='a', header=False): + batch_df.to_csv(dst, mode=mode, index=False, header=header) + +def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional[List[str]|str]=None, extension:str=".dat"): + """Converts a directory of .dat files into a combined .csv file + Args: + src: The source directory + dst: The output file + sort_columns: Column to sort the values by + extension: The file extension to match + """ + + if not isinstance(src, Path): + src = Path(src) + if not isinstance(dst, Path): + dst = Path(dst) + if not isinstance(sort_columns, list) and sort_columns is not None: + sort_columns = [sort_columns] + + # Get the list of all CSV files + files = [Path(x) for x in glob(f"{src}/**/*{extension}", recursive=True)] + # Create the output file and write the header from the first file + header_written = False + total_files = len(files) + + # Use a ProcessPoolExecutor to parallelize the loading of files + with ProcessPoolExecutor() as executor, tqdm(total=total_files, desc="Processing files") as progress_bar: + # Process in batches + for i in range(0, total_files, batch_size): + # Select a batch of files + batch_files = files[i:i + batch_size] + + # Read the files in parallel + batch_dfs = list(executor.map(_read_csv_file, batch_files)) + + # Concatenate the batch into one DataFrame + combined_batch_df = pd.concat(batch_dfs, ignore_index=True) + + # Write the batch to the output file (only write header once) + _write_batch_to_file(combined_batch_df, dst, mode='a', header=not header_written) + header_written = True # Header written after the first batch + + # Update the progress bar + progress_bar.update(len(batch_files)) + + # Optionally clear memory if batches are large + del combined_batch_df, batch_dfs + + if sort_columns is not None: + print(f"Sorting by {sort_columns}") + df = pd.read_csv(dst) + df = df.sort_values(by=sort_columns) + df.to_csv(dst, index=False, header=True, mode="w") \ No newline at end of file diff --git a/src/iotswarm/queries.py b/src/iotswarm/queries.py index dfda5c8..2c7fd2a 100644 --- a/src/iotswarm/queries.py +++ b/src/iotswarm/queries.py @@ -12,6 +12,7 @@ class CosmosTable(StrEnum): LEVEL_1_NMDB_1HOUR = "LEVEL1_NMDB_1HOUR" LEVEL_1_PRECIP_1MIN = "LEVEL1_PRECIP_1MIN" LEVEL_1_PRECIP_RAINE_1MIN = "LEVEL1_PRECIP_RAINE_1MIN" + COSMOS_STATUS = "COSMOS_STATUS_1HOUR" @enum.unique diff --git a/src/iotswarm/utils.py b/src/iotswarm/utils.py index e1f8a9e..c199a4b 100644 --- a/src/iotswarm/utils.py +++ b/src/iotswarm/utils.py @@ -1,10 +1,6 @@ """Module for handling commonly reused utility functions.""" from datetime import date, datetime -from pathlib import Path -import pandas -import sqlite3 - def json_serial(obj: object): """Serializes an unknown object into a json format.""" @@ -15,54 +11,4 @@ def json_serial(obj: object): if obj.__class__.__module__ != "builtins": return obj.__json__() - raise TypeError(f"Type {type(obj)} is not serializable.") - - -def build_database_from_csv( - csv_file: str | Path, - database: str | Path, - table_name: str, - sort_by: str | None = None, - date_time_format: str = r"%d-%b-%y %H.%M.%S", -) -> None: - """Adds a database table using a csv file with headers. - - Args: - csv_file: A path to the csv. - database: Output destination of the database. File is created if not - existing. - table_name: Name of the table to add into database. - sort_by: Column to sort by - date_time_format: Format of datetime column - """ - - if not isinstance(csv_file, Path): - csv_file = Path(csv_file) - - if not isinstance(database, Path): - database = Path(database) - - if not csv_file.exists(): - raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"') - - if not database.parent.exists(): - raise NotADirectoryError(f'Database directory not found: "{database.parent}"') - - with sqlite3.connect(database) as conn: - print( - f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"' - ) - print("Loading csv") - df = pandas.read_csv(csv_file) - print("Done") - print("Formatting dates") - df["DATE_TIME"] = pandas.to_datetime(df["DATE_TIME"], format=date_time_format) - print("Done") - if sort_by is not None: - print("Sorting.") - df = df.sort_values(by=sort_by) - print("Done") - - print("Writing to db.") - df.to_sql(table_name, conn, if_exists="replace", index=False) - print("Writing complete.") + raise TypeError(f"Type {type(obj)} is not serializable.") \ No newline at end of file From ec5d87958604ced16670c0bcfbd502ca80bae3ae Mon Sep 17 00:00:00 2001 From: Lewis Chambers Date: Mon, 21 Oct 2024 13:45:05 +0100 Subject: [PATCH 2/4] Added methods to allow parallel reads/writes to be easier to expand --- src/iotswarm/processing.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/iotswarm/processing.py b/src/iotswarm/processing.py index f6df412..227c232 100644 --- a/src/iotswarm/processing.py +++ b/src/iotswarm/processing.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Callable, List, Optional import pandas as pd import sqlite3 from glob import glob @@ -59,19 +59,28 @@ def build_database_from_csv( print("Writing complete.") -def _read_csv_file(file_path): +def _read_cosmos_status_file(file_path): return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3]) -def _write_batch_to_file(batch_df, dst, mode='a', header=False): +def _write_batch_to_csv(batch_df, dst, mode='a', header=False): batch_df.to_csv(dst, mode=mode, index=False, header=header) -def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional[List[str]|str]=None, extension:str=".dat"): +def process_csv_files_parallel( + src, + dst, + batch_size=1000, + sort_columns: Optional[List[str]|str]=None, + extension:str=".dat", + read_method:Callable=_read_cosmos_status_file, + write_method:Callable=_write_batch_to_csv): """Converts a directory of .dat files into a combined .csv file Args: src: The source directory dst: The output file sort_columns: Column to sort the values by extension: The file extension to match + read_method: The method used to read the files + write_method: The method used to write the files """ if not isinstance(src, Path): @@ -95,13 +104,13 @@ def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional batch_files = files[i:i + batch_size] # Read the files in parallel - batch_dfs = list(executor.map(_read_csv_file, batch_files)) + batch_dfs = list(executor.map(write_method, batch_files)) # Concatenate the batch into one DataFrame combined_batch_df = pd.concat(batch_dfs, ignore_index=True) # Write the batch to the output file (only write header once) - _write_batch_to_file(combined_batch_df, dst, mode='a', header=not header_written) + write_method(combined_batch_df, dst, mode='a', header=not header_written) header_written = True # Header written after the first batch # Update the progress bar From bcbb6da747ee847529ddc9169e78275bff6800b2 Mon Sep 17 00:00:00 2001 From: Lewis Chambers Date: Mon, 21 Oct 2024 14:31:12 +0100 Subject: [PATCH 3/4] Updated queries to include new table --- src/iotswarm/queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iotswarm/queries.py b/src/iotswarm/queries.py index 2c7fd2a..7b57459 100644 --- a/src/iotswarm/queries.py +++ b/src/iotswarm/queries.py @@ -12,7 +12,7 @@ class CosmosTable(StrEnum): LEVEL_1_NMDB_1HOUR = "LEVEL1_NMDB_1HOUR" LEVEL_1_PRECIP_1MIN = "LEVEL1_PRECIP_1MIN" LEVEL_1_PRECIP_RAINE_1MIN = "LEVEL1_PRECIP_RAINE_1MIN" - COSMOS_STATUS = "COSMOS_STATUS_1HOUR" + COSMOS_STATUS_1HOUR = "COSMOS_STATUS_1HOUR" @enum.unique From 784859464409103c76ce6819b43ee558c2dce71c Mon Sep 17 00:00:00 2001 From: Lewis Chambers Date: Mon, 21 Oct 2024 16:30:56 +0100 Subject: [PATCH 4/4] Added setuptools dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index d3bcc7a..b928bad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "oracledb", "pandas", "platformdirs", + "setuptools", ] name = "iot-swarm" dynamic = ["version"]