Skip to content

Commit

Permalink
Added cosmos_status enum and methods for joining loose .dat into once…
Browse files Browse the repository at this point in the history
… CSV file
  • Loading branch information
lewis-chambers committed Oct 21, 2024
1 parent 3c25279 commit 22920b8
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 55 deletions.
117 changes: 117 additions & 0 deletions src/iotswarm/processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from pathlib import Path
from typing import List, Optional
import pandas as pd
import sqlite3
from glob import glob
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

def build_database_from_csv(
csv_file: str | Path,
database: str | Path,
table_name: str,
timestamp_header: str,
sort_by: str | None = None,
date_time_format: str = r"%d-%b-%y %H.%M.%S",
) -> None:
"""Adds a database table using a csv file with headers.
Args:
csv_file: A path to the csv.
database: Output destination of the database. File is created if not
existing.
table_name: Name of the table to add into database.
timestamp_header: Name of the column with a timestamp
sort_by: Column to sort by
date_time_format: Format of datetime column
"""

if not isinstance(csv_file, Path):
csv_file = Path(csv_file)

if not isinstance(database, Path):
database = Path(database)

if not csv_file.exists():
raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"')

if not database.parent.exists():
raise NotADirectoryError(f'Database directory not found: "{database.parent}"')

with sqlite3.connect(database) as conn:
print(
f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"'
)
print("Loading csv")
df = pd.read_csv(csv_file)
print("Done")
print("Formatting dates")
# print(df.loc[782794])
df[timestamp_header] = pd.to_datetime(df[timestamp_header], format=date_time_format)
print("Done")
if sort_by is not None:
print("Sorting.")
df = df.sort_values(by=sort_by)
print("Done")

print("Writing to db.")
df.to_sql(table_name, conn, if_exists="replace", index=False)
print("Writing complete.")


def _read_csv_file(file_path):
return pd.read_csv(file_path, delimiter=",", skiprows=[0,2,3])

def _write_batch_to_file(batch_df, dst, mode='a', header=False):
batch_df.to_csv(dst, mode=mode, index=False, header=header)

def process_csv_files_parallel(src, dst, batch_size=1000, sort_columns: Optional[List[str]|str]=None, extension:str=".dat"):
"""Converts a directory of .dat files into a combined .csv file
Args:
src: The source directory
dst: The output file
sort_columns: Column to sort the values by
extension: The file extension to match
"""

if not isinstance(src, Path):
src = Path(src)
if not isinstance(dst, Path):
dst = Path(dst)
if not isinstance(sort_columns, list) and sort_columns is not None:
sort_columns = [sort_columns]

# Get the list of all CSV files
files = [Path(x) for x in glob(f"{src}/**/*{extension}", recursive=True)]
# Create the output file and write the header from the first file
header_written = False
total_files = len(files)

# Use a ProcessPoolExecutor to parallelize the loading of files
with ProcessPoolExecutor() as executor, tqdm(total=total_files, desc="Processing files") as progress_bar:
# Process in batches
for i in range(0, total_files, batch_size):
# Select a batch of files
batch_files = files[i:i + batch_size]

# Read the files in parallel
batch_dfs = list(executor.map(_read_csv_file, batch_files))

# Concatenate the batch into one DataFrame
combined_batch_df = pd.concat(batch_dfs, ignore_index=True)

# Write the batch to the output file (only write header once)
_write_batch_to_file(combined_batch_df, dst, mode='a', header=not header_written)
header_written = True # Header written after the first batch

# Update the progress bar
progress_bar.update(len(batch_files))

# Optionally clear memory if batches are large
del combined_batch_df, batch_dfs

if sort_columns is not None:
print(f"Sorting by {sort_columns}")
df = pd.read_csv(dst)
df = df.sort_values(by=sort_columns)
df.to_csv(dst, index=False, header=True, mode="w")
1 change: 1 addition & 0 deletions src/iotswarm/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class CosmosTable(StrEnum):
LEVEL_1_NMDB_1HOUR = "LEVEL1_NMDB_1HOUR"
LEVEL_1_PRECIP_1MIN = "LEVEL1_PRECIP_1MIN"
LEVEL_1_PRECIP_RAINE_1MIN = "LEVEL1_PRECIP_RAINE_1MIN"
COSMOS_STATUS = "COSMOS_STATUS_1HOUR"


@enum.unique
Expand Down
56 changes: 1 addition & 55 deletions src/iotswarm/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
"""Module for handling commonly reused utility functions."""

from datetime import date, datetime
from pathlib import Path
import pandas
import sqlite3


def json_serial(obj: object):
"""Serializes an unknown object into a json format."""
Expand All @@ -15,54 +11,4 @@ def json_serial(obj: object):
if obj.__class__.__module__ != "builtins":
return obj.__json__()

raise TypeError(f"Type {type(obj)} is not serializable.")


def build_database_from_csv(
csv_file: str | Path,
database: str | Path,
table_name: str,
sort_by: str | None = None,
date_time_format: str = r"%d-%b-%y %H.%M.%S",
) -> None:
"""Adds a database table using a csv file with headers.
Args:
csv_file: A path to the csv.
database: Output destination of the database. File is created if not
existing.
table_name: Name of the table to add into database.
sort_by: Column to sort by
date_time_format: Format of datetime column
"""

if not isinstance(csv_file, Path):
csv_file = Path(csv_file)

if not isinstance(database, Path):
database = Path(database)

if not csv_file.exists():
raise FileNotFoundError(f'csv_file does not exist: "{csv_file}"')

if not database.parent.exists():
raise NotADirectoryError(f'Database directory not found: "{database.parent}"')

with sqlite3.connect(database) as conn:
print(
f'Writing table: "{table_name}" from csv_file: "{csv_file}" to db: "{database}"'
)
print("Loading csv")
df = pandas.read_csv(csv_file)
print("Done")
print("Formatting dates")
df["DATE_TIME"] = pandas.to_datetime(df["DATE_TIME"], format=date_time_format)
print("Done")
if sort_by is not None:
print("Sorting.")
df = df.sort_values(by=sort_by)
print("Done")

print("Writing to db.")
df.to_sql(table_name, conn, if_exists="replace", index=False)
print("Writing complete.")
raise TypeError(f"Type {type(obj)} is not serializable.")

0 comments on commit 22920b8

Please sign in to comment.