Skip to content

Commit

Permalink
Pushed in progress for friend to pull.
Browse files Browse the repository at this point in the history
  • Loading branch information
nikothomas committed Jul 18, 2024
1 parent 5bf5392 commit 2563130
Show file tree
Hide file tree
Showing 3 changed files with 577 additions and 1,148 deletions.
61 changes: 22 additions & 39 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from concurrent.futures import ProcessPoolExecutor, as_completed
from contextlib import ExitStack
import argparse
import pandas as pd
import polars as pl
import loggersetup
import ctdfjorder
from ctdfjorder import CTDError
import signal
import psutil
import os
import enlighten

import pandas as pd

def handler(signal_received, frame):
if signal_received == signal.SIGINT:
Expand All @@ -28,48 +28,25 @@ def handler(signal_received, frame):
manager = enlighten.get_manager()


def _process_ctd_file(file, plot=False, cached_master_sheet=pd.DataFrame(), master_sheet_path=None, verbosity=0):
def _process_ctd_file(file, plot=False, cached_master_sheet=pl.DataFrame(), master_sheet_path=None, verbosity=0):
logger = loggersetup.setup_logging(verbosity)
try:
my_data = ctdfjorder.CTD(file, cached_master_sheet=cached_master_sheet, master_sheet_path=master_sheet_path,
add_unique_id=True)
file = os.path.basename(file)
my_data.remove_upcasts()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after remove_upcasts")
my_data.remove_non_positive_samples()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after remove_non_positive_samples")
my_data.remove_invalid_salinity_values()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after remove_invalid_salinity_values")
my_data.clean("practicalsalinity", 'autoencoder')
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after clean")
my_data.clean('salinity_ai')
my_data.add_surface_salinity_temp_meltwater()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated duplicated after surface_density")
my_data.add_absolute_salinity()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after add_absolute_salinity")
my_data.add_density()
if my_data.get_pandas_df().duplicated().any():
logger.critical(f"{file} - Duplicated after add_density")
my_data.add_potential_density()
if my_data.get_pandas_df()['depth_00'].duplicated().any():
logger.critical(f"{file} - Duplicated after add_potential density")
my_data.add_mld(20, "potentialdensityavg")
if my_data.get_pandas_df()['depth_00'].duplicated().any():
logger.critical(f"{file} - Duplicated after add_mld")
my_data.add_mld(20, "potential_density_avg")
my_data.add_stratification(20)
if my_data.get_pandas_df()['depth_00'].duplicated().any():
logger.critical(f"{file} - Duplicated after add_stratification")
if plot:
my_data.plot('potentialdensity')
my_data.plot('potential_density')
my_data.plot('salinity')
if my_data.get_pandas_df()['depth_00'].duplicated().any():
logger.critical(f"{file} - Duplicated after plotting")
return my_data.get_pandas_df()
return my_data.get_df()
except CTDError as e:
logger.error(e)
except Exception as e:
Expand All @@ -78,9 +55,17 @@ def _process_ctd_file(file, plot=False, cached_master_sheet=pd.DataFrame(), mast

def _run_default(plot=False, master_sheet_path=None, max_workers=1, verbosity=0):
logger = loggersetup.setup_logging(verbosity)
ctd_files_list = _get_rsk_filenames_in_dir(get_cwd())
ctd_files_list.extend(_get_csv_filenames_in_dir(get_cwd()))
cached_master_sheet = pd.read_excel(master_sheet_path)
# Retrieve and slice the first 10 items of each list
rsk_files = _get_rsk_filenames_in_dir(get_cwd())
csv_files = _get_csv_filenames_in_dir(get_cwd())

# Initialize the ctd_files_list and extend it with the sliced lists
ctd_files_list = rsk_files
ctd_files_list.extend(csv_files)
cached_master_sheet = pl.read_excel(master_sheet_path, infer_schema_length=None, schema_overrides={"time_local": pl.String,
"date_local": pl.String,
"time (UTC)": pl.String,
"date (UTC)": pl.String})
cached_master_sheet = ctdfjorder.CTD.Utility.load_master_sheet(cached_master_sheet)
total_files = len(ctd_files_list)
bar_format = u'{desc}{desc_pad}{percentage:3.0f}%|{bar}| ' + \
Expand All @@ -91,7 +76,7 @@ def _run_default(plot=False, master_sheet_path=None, max_workers=1, verbosity=0)
color='green', bar_format=bar_format)
errors = success.add_subcounter('red')
executor = ProcessPoolExecutor(max_workers=max_workers)
results = []
results: list[pl.DataFrame] = []
if not ctd_files_list:
logger.debug("No files to process")
return
Expand All @@ -107,15 +92,13 @@ def _run_default(plot=False, master_sheet_path=None, max_workers=1, verbosity=0)
verbosity=verbosity): file for file in ctd_files_list}
for future in as_completed(futures):
result = future.result()
if type(result) is not type(None):
if type(result) is type(pl.DataFrame()):
results.append(result)
success.update(1)
else:
errors.update(1)
futures.pop(future)
gc.collect()
df = ctdfjorder.merge_dataframes(results)
ctdfjorder.CTD.Utility.save_to_csv(df, "outputclean.csv")
df = pl.concat(results, how='diagonal_relaxed')
ctdfjorder.CTD.Utility.save_to_csv(df, 'outputclean.csv')
except KeyboardInterrupt:
loggersetup.setup_logging(0)
print()
Expand Down
Loading

0 comments on commit 2563130

Please sign in to comment.