diff --git a/src/iotswarm/processing.py b/src/iotswarm/processing.py index f6df412..227c232 100644 --- a/src/iotswarm/processing.py +++ b/src/iotswarm/processing.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Callable, List, Optional import pandas as pd import sqlite3 from glob import glob @@ -59,19 +59,28 @@ def build_database_from_csv( print("Writing complete.") -def _read_csv_file(file_path): +def _read_cosmos_status_file(file_path): return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3]) -def _write_batch_to_file(batch_df, dst, mode='a', header=False): +def _write_batch_to_csv(batch_df, dst, mode='a', header=False): batch_df.to_csv(dst, mode=mode, index=False, header=header) -def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional[List[str]|str]=None, extension:str=".dat"): +def process_csv_files_parallel( + src, + dst, + batch_size=1000, + sort_columns: Optional[List[str]|str]=None, + extension:str=".dat", + read_method:Callable=_read_cosmos_status_file, + write_method:Callable=_write_batch_to_csv): """Converts a directory of .dat files into a combined .csv file Args: src: The source directory dst: The output file sort_columns: Column to sort the values by extension: The file extension to match + read_method: The method used to read the files + write_method: The method used to write the files """ if not isinstance(src, Path): @@ -95,13 +104,13 @@ def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional batch_files = files[i:i + batch_size] # Read the files in parallel - batch_dfs = list(executor.map(_read_csv_file, batch_files)) + batch_dfs = list(executor.map(write_method, batch_files)) # Concatenate the batch into one DataFrame combined_batch_df = pd.concat(batch_dfs, ignore_index=True) # Write the batch to the output file (only write header once) - _write_batch_to_file(combined_batch_df, dst, mode='a', header=not header_written) + write_method(combined_batch_df, dst, mode='a', header=not header_written) header_written = True # Header written after the first batch # Update the progress bar