Skip to content

Commit

Permalink
Added methods to allow parallel reads/writes to be easier to expand
Browse files Browse the repository at this point in the history
  • Loading branch information
lewis-chambers committed Oct 21, 2024
1 parent 22920b8 commit ec5d879
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions src/iotswarm/processing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import List, Optional
from typing import Callable, List, Optional
import pandas as pd
import sqlite3
from glob import glob
Expand Down Expand Up @@ -59,19 +59,28 @@ def build_database_from_csv(
print("Writing complete.")


def _read_csv_file(file_path):
def _read_cosmos_status_file(file_path):
return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3])

def _write_batch_to_file(batch_df, dst, mode='a', header=False):
def _write_batch_to_csv(batch_df, dst, mode='a', header=False):
batch_df.to_csv(dst, mode=mode, index=False, header=header)

def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional[List[str]|str]=None, extension:str=".dat"):
def process_csv_files_parallel(
src,
dst,
batch_size=1000,
sort_columns: Optional[List[str]|str]=None,
extension:str=".dat",
read_method:Callable=_read_cosmos_status_file,
write_method:Callable=_write_batch_to_csv):
"""Converts a directory of .dat files into a combined .csv file
Args:
src: The source directory
dst: The output file
sort_columns: Column to sort the values by
extension: The file extension to match
read_method: The method used to read the files
write_method: The method used to write the files
"""

if not isinstance(src, Path):
Expand All @@ -95,13 +104,13 @@ def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional
batch_files = files[i:i + batch_size]

# Read the files in parallel
batch_dfs = list(executor.map(_read_csv_file, batch_files))
batch_dfs = list(executor.map(write_method, batch_files))

# Concatenate the batch into one DataFrame
combined_batch_df = pd.concat(batch_dfs, ignore_index=True)

# Write the batch to the output file (only write header once)
_write_batch_to_file(combined_batch_df, dst, mode='a', header=not header_written)
write_method(combined_batch_df, dst, mode='a', header=not header_written)
header_written = True # Header written after the first batch

# Update the progress bar
Expand Down

0 comments on commit ec5d879

Please sign in to comment.