diff --git a/.vscode/launch.json b/.vscode/launch.json index c7890906..1edc02ff 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -267,8 +267,8 @@ //"args": ["-v", "1", "--noinput", "--num_cores", "8"] //"args": ["-v", "1", "--noinput", "--no_cleanup", "--mission", "2011.256.02", "--clobber"] //"args": ["-v", "1", "--noinput", "--no_cleanup", "--mission", "2010.341.00", "--download_process", "--local"] - //"args": ["-v", "1", "--noinput", "--no_cleanup", "--mission", "2020.337.00", "--local"] - "args": ["-v", "1", "--mission", "2023.123.00", "--noinput", "--no_cleanup"] + "args": ["-v", "1", "--noinput", "--no_cleanup", "--mission", "2020.337.00", "--clobber"] + }, ] } diff --git a/src/data/archive.py b/src/data/archive.py index 05f7cd57..0005288a 100755 --- a/src/data/archive.py +++ b/src/data/archive.py @@ -11,7 +11,7 @@ import argparse import logging -import subprocess +import shutil import sys import time from pathlib import Path @@ -46,12 +46,11 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: year = self.args.mission.split(".")[0] surveynetcdfs_dir = Path(surveys_dir, year, "netcdf") - # To avoid "fchmod failed: Permission denied" message use rsync instead - # of cp: https://apple.stackexchange.com/a/206251 + # To avoid "fchmod failed: Permission denied" message use shutil.copyfile if not self.args.archive_only_products: self.logger.info("Archiving %s files to %s", nc_file_base, surveynetcdfs_dir) - # Rsync netCDF files to AUVCTD/surveys/YYYY/netcdf + # Copy netCDF files to AUVCTD/surveys/YYYY/netcdf if hasattr(self.args, "flash_threshold"): if self.args.flash_threshold and self.args.resample: ft_ending = f"{freq}_ft{self.args.flash_threshold:.0E}.nc".replace( @@ -71,11 +70,8 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: self.logger.info("Removing %s", dst_file) dst_file.unlink() if src_file.exists(): - subprocess.run( # noqa: S603 - ["/usr/bin/rsync", str(src_file), str(surveynetcdfs_dir)], - check=True, - ) - self.logger.info("rsync %s %s done.", src_file, surveynetcdfs_dir) + shutil.copyfile(src_file, dst_file) + self.logger.info("copyfile %s %s done.", src_file, surveynetcdfs_dir) else: self.logger.info( "%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501 @@ -83,7 +79,7 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: ) if not hasattr(self.args, "resample") or not self.args.resample: - # Rsync intermediate files to AUVCTD/missionnetcdfs/YYYY/YYYYJJJ + # Copy intermediate files to AUVCTD/missionnetcdfs/YYYY/YYYYJJJ YYYYJJJ = "".join(self.args.mission.split(".")[:2]) missionnetcdfs_dir = Path( AUVCTD_VOL, @@ -100,18 +96,15 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: src_file = Path(src_dir, f"{log.replace('.log', '')}.nc") if self.args.clobber: if src_file.exists(): - subprocess.run( # noqa: S603 - ["/usr/bin/rsync", str(src_file), str(missionnetcdfs_dir)], - check=True, - ) - self.logger.info("rsync %s %s done.", src_file, missionnetcdfs_dir) + shutil.copyfile(src_file, missionnetcdfs_dir / src_file.name) + self.logger.info("copyfile %s %s done.", src_file, missionnetcdfs_dir) else: self.logger.info( "%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501 src_file.name, ) - # Rsync files created by create_products.py + # Copy files created by create_products.py self.logger.info("Archiving product files") for src_dir, dst_dir in ((MISSIONODVS, "odv"), (MISSIONIMAGES, "images")): src_dir = Path( # noqa: PLW2901 @@ -124,10 +117,20 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: dst_dir = Path(surveys_dir, year, dst_dir) # noqa: PLW2901 Path(dst_dir).mkdir(parents=True, exist_ok=True) if self.args.clobber: - subprocess.run( # noqa: S603 - ["/usr/bin/rsync", "-r", f"{src_dir}/", f"{dst_dir}/"], check=True - ) - self.logger.info("rsync %s/* %s done.", src_dir, dst_dir) + # Copy files individually to avoid permission issues with copytree. + # This will not copy subdirectories, but we don't expect any. + for src_file in src_dir.glob("*"): + dst_file = Path(dst_dir, src_file.name) + if dst_file.exists(): + self.logger.info("Removing %s", dst_file) + dst_file.unlink() + shutil.copyfile(src_file, dst_file) + self.logger.info("copyfile %s %s done.", src_file, dst_dir) + + # shutil.copytree( + # src_dir, dst_dir, dirs_exist_ok=True, copy_function=shutil.copy + # ) + # self.logger.info("copytree %s/* %s done.", src_dir, dst_dir) else: self.logger.info( "%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501 @@ -136,23 +139,20 @@ def copy_to_AUVTCD(self, nc_file_base: Path, freq: str = FREQ) -> None: # noqa: else: self.logger.debug("%s not found", src_dir) if self.args.create_products or (hasattr(self.args, "resample") and self.args.resample): - # Do not rsync processing.log file if only partial processing was done + # Do not copy processing.log file if only partial processing was done self.logger.info( "Partial processing, not archiving %s", f"{nc_file_base}_{LOG_NAME}", ) else: - # Rsync the processing.log file last so that we get everything + # Copy the processing.log file last so that we get everything src_file = Path(f"{nc_file_base}_{LOG_NAME}") dst_file = Path(surveynetcdfs_dir, src_file.name) if src_file.exists(): if self.args.clobber: - self.logger.info("rsync %s %s", src_file, surveynetcdfs_dir) - subprocess.run( # noqa: S603 - ["/usr/bin/rsync", str(src_file), str(surveynetcdfs_dir)], - check=True, - ) - self.logger.info("rsync %s %s done.", src_file, surveynetcdfs_dir) + self.logger.info("copyfile %s %s", src_file, surveynetcdfs_dir) + shutil.copyfile(src_file, dst_file) + self.logger.info("copyfile %s %s done.", src_file, surveynetcdfs_dir) else: self.logger.info( "%26s exists, but is not being archived because --clobber is not specified.", # noqa: E501 @@ -203,12 +203,12 @@ def process_command_line(self): parser.add_argument( "--clobber", action="store_true", - help="Remove existing netCDF files before rsyncing to the AUVCTD directory", + help="Remove existing netCDF files before copying to the AUVCTD directory", ) parser.add_argument( "--archive_only_products", action="store_true", - help="Rsync to AUVCTD directory only the products, not the netCDF files", + help="Copy to AUVCTD directory only the products, not the netCDF files", ) parser.add_argument( "--create_products", diff --git a/src/data/process.py b/src/data/process.py index 95b1b0d2..4205f8c4 100755 --- a/src/data/process.py +++ b/src/data/process.py @@ -639,7 +639,7 @@ def process_command_line(self): action="store_true", help="Use with --noinput to overwrite existing downloaded" " log files and to remove existing netCDF files before" - " rsyncing to the AUVCTD directory", + " copying to the AUVCTD directory", ) parser.add_argument( "--noinput", diff --git a/src/data/resample.py b/src/data/resample.py index 162a5028..8a6e2c87 100755 --- a/src/data/resample.py +++ b/src/data/resample.py @@ -461,7 +461,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 flash_threshold: float = FLASH_THRESHOLD, proxy_ratio_adinos: float = 3.9811e13, # 4-Oct-2010 to 2-Dec-2020 value proxy_cal_factor=0.00470, # Same as used in 5.2-mpm-bg_biolume-PiO-paper.ipynb - ) -> None: + ) -> tuple[pd.Series, list[datetime], list[datetime]]: # Add variables via the calculations according to Appendix B in # "Using fluorescence and bioluminescence sensors to characterize # auto- and heterotrophic plankton communities" by Messie et al." @@ -610,6 +610,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 self.df_r["biolume_bg_biolume"].attrs["units"] = "photons/liter" self.df_r["biolume_bg_biolume"].attrs["comment"] = zero_note + fluo = None nighttime_bl_raw, sunsets, sunrises = self.select_nighttime_bl_raw() if nighttime_bl_raw.empty: self.logger.info( @@ -621,7 +622,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 self.logger.info( "No hs2_fl700 data. Not computing adinos, diatoms, and hdinos", ) - return + return fluo, sunsets, sunrises fluo = ( self.resampled_nc["hs2_fl700"] .where( @@ -664,6 +665,232 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 f" = {proxy_ratio_adinos:.4e} and proxy_cal_factor = {proxy_cal_factor:.6f}" ) + return fluo, sunsets, sunrises + + def correct_biolume_proxies( # noqa: PLR0913, PLR0915 + self, + biolume_fluo: pd.Series, # from add_biolume_proxies + biolume_sunsets: list[datetime], # from add_biolume_proxies + biolume_sunrises: list[datetime], # from add_biolume_proxies + adinos_threshold: float = 0.1, + correction_threshold: int = 3, + fluo_bl_threshold: float = 0.35, # use 0.45 for pearson corr + corr_type: str = "pearson", # "spearman" or "pearson" + depth_threshold: float = 2.0, + minutes_from_surface_threshold: int = 5, + ) -> None: + variables = [ + "biolume_proxy_diatoms", + "biolume_proxy_adinos", + "biolume_proxy_hdinos", + "biolume_bg_biolume", + ] + try: + df_p = self.df_r[variables].copy(deep=True) + except KeyError: + # We didn't add biolum proxies this round... + return + + df_p["biolume_fluo"] = biolume_fluo + df_p["fluoBL_corr"] = np.full_like(df_p.biolume_fluo, np.nan) + + depth_series = self.resampled_nc["depth"].to_series() + # df_p["depth"] = depth_series.reindex(df_p.index, method="ffill") + df_p = pd.merge_asof( + df_p, depth_series.to_frame(), left_index=True, right_index=True, direction="nearest" + ) + + self.logger.info( + "correct proxies: df_p depth count=%d nans=%d, resampled_nc depth count=%d nans=%d", + df_p.depth.count(), + df_p.depth.isna().sum(), + self.resampled_nc.depth.count(), + self.resampled_nc.depth.isnull().sum(), # noqa: PD003 + ) + + profile_series = self.resampled_nc["profile_number"].to_series() + # df_p["profile_number"] = profile_series.reindex(df_p.index, method="ffill") + df_p = pd.merge_asof( + df_p, profile_series.to_frame(), left_index=True, right_index=True, direction="nearest" + ) + + self.logger.info( + "correct proxies: df_p max profile=%d (nans=%d), resampled_nc max profile=%d (nans=%d)", + df_p.profile_number.max(), + df_p.profile_number.isna().sum(), + self.resampled_nc.profile_number.max(), + self.resampled_nc.profile_number.isnull().sum(), # noqa: PD003 + ) + + # new proxies are the "N" fields + for new, old in zip( + ["diatomsN", "adinosN", "hdinosN"], + ["biolume_proxy_diatoms", "biolume_proxy_adinos", "biolume_proxy_hdinos"], + strict=False, + ): + df_p[new] = np.full_like(df_p[old], np.nan) + + def _interval_contains_sunevent( + start: pd.Timestamp, end: pd.Timestamp, events: pd.DatetimeIndex + ) -> bool: + mask = (events >= start) & (events <= end) + return mask.any() + + biolume_sunsets = pd.DatetimeIndex(biolume_sunsets).sort_values() + biolume_sunrises = pd.DatetimeIndex(biolume_sunrises).sort_values() + profile_intervals = ( + df_p.groupby("profile_number") + .apply(lambda g: (g.index.min(), g.index.max())) # pandas2.2.3: include_groups=False + .rename("interval") + .apply(pd.Series) + .rename(columns={0: "start", 1: "end"}) + ) + profile_intervals["has_sunset"] = profile_intervals.apply( + lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunsets), + axis=1, + ) + profile_intervals["has_sunrise"] = profile_intervals.apply( + lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunrises), + axis=1, + ) + profile_intervals["has_sunevent"] = ( + profile_intervals["has_sunrise"] | profile_intervals["has_sunset"] + ) + df_p["has_sunset"] = df_p["profile_number"].map(profile_intervals["has_sunset"]) + df_p["has_sunrise"] = df_p["profile_number"].map(profile_intervals["has_sunrise"]) + df_p["has_sunevent"] = df_p["profile_number"].map(profile_intervals["has_sunevent"]) + + # compute correlation per profil and then correct proxies + profil = df_p.profile_number + dt_5mins = np.timedelta64(timedelta(minutes=minutes_from_surface_threshold)) + for iprofil_ in range(1, int(np.max(profil)) + 1): + iprofil = profil == iprofil_ + has_sunevent = df_p.loc[iprofil, "has_sunevent"].any() + if has_sunevent: # set proxies for this profile to NaN + self.logger.info( + "Processing profile=%d for proxy correction: found sun event -- set NaN", + iprofil_, + ) + target_indices = df_p.index[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_adinos"] = np.nan + self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = np.nan + self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = np.nan + continue + # excludes surface, must be within 5 min of it + ideep = iprofil & (df_p.depth > depth_threshold) + itime = (df_p.index > (df_p.index[ideep].min() - dt_5mins)) & ( + df_p.index < (df_p.index[ideep].max() + dt_5mins) + ) + iprofil = iprofil & itime + if not np.any(iprofil): + # print(f'no corrections possible for {iprofil_=}') + continue + auv_profil = df_p.loc[iprofil] + self.logger.info( + "Processing profile=%d for proxy correction: total_points=%d > thresh=%d ?", + iprofil_, + auv_profil.shape[0], + correction_threshold, + ) + if auv_profil.shape[0] > correction_threshold: + if ( + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold) + < correction_threshold + ): + if auv_profil.biolume_proxy_adinos.count() == 0: # all proxies are NaN so skip + self.logger.info( + "Correcting proxies: valid adinos=%d < thresh=%d -- all NaN so skip", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + ) + continue + # no correction for low fluo & biolum values + fluoBL_corr = 1.0 + self.logger.info( + "Correcting proxies: valid adinos=%d < thresh=%d" + " -- using fluoBL_corr=%.4f, total_size_adinos=%d, nans=%d", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + fluoBL_corr, + auv_profil.biolume_proxy_adinos.shape[0], + auv_profil.biolume_proxy_adinos.isna().sum(), + ) + else: + # correlation between fluo and bg_biolum computed on high + # adino values for each profile + idepth = ( + auv_profil.depth + <= auv_profil.depth[ + auv_profil.biolume_proxy_adinos > adinos_threshold + ].max() + ) + auv_profil_idepth = auv_profil[ + ["biolume_fluo", "biolume_bg_biolume", "depth"] + ].loc[idepth] + # pandas' corr ignores NaN + fluoBL_corr = auv_profil_idepth.biolume_fluo.corr( + auv_profil_idepth.biolume_bg_biolume, method=corr_type + ) + self.logger.info( + "Correcting proxies: valid adinos=%d > thresh=%d" + " -- using fluoBL_corr=%.4f, total_size_idepth=%d, nans=%d," + " min_depth=%.4f, max_depth=%.4f", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + fluoBL_corr, + auv_profil_idepth.shape[0], + auv_profil.biolume_proxy_adinos.isna().sum(), + auv_profil_idepth.depth.min(), + auv_profil_idepth.depth.max(), + ) + + # save correlation + df_p.loc[iprofil, "fluoBL_corr"] = fluoBL_corr + # self.logger.info( + # "Correcting proxies for profile=%d using fluoBL_corr=%.4f", + # iprofil_, + # fluoBL_corr, + # ) + + # scale between 0 and 1 first + fluoBL_correctionfactor = (fluoBL_corr + 1.0) / 2.0 + + # then scale between fluo_bl_threshold and 1 + fluoBL_correctionfactor = ( + fluoBL_correctionfactor * (1.0 - fluo_bl_threshold) + fluo_bl_threshold + ) + + # can happen if fluo_bl_threshold is negative + fluoBL_correctionfactor = max(fluoBL_correctionfactor, 0.0) + + df_p.loc[iprofil, "adinosN"] = ( + df_p.biolume_proxy_adinos[iprofil] * fluoBL_correctionfactor + ) + + # preserving adinos+diatoms + df_p.loc[iprofil, "diatomsN"] = ( + df_p.biolume_proxy_adinos[iprofil] + + df_p.biolume_proxy_diatoms[iprofil] + - df_p.adinosN[iprofil] + ) + + # preserving adinos+hdinos + df_p.loc[iprofil, "hdinosN"] = ( + df_p.biolume_proxy_adinos[iprofil] + + df_p.biolume_proxy_hdinos[iprofil] + - df_p.adinosN[iprofil] + ) + + target_indices = df_p.index[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_adinos"] = df_p.adinosN.loc[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = df_p.diatomsN.loc[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = df_p.hdinosN.loc[iprofil] + else: + self.logger.info( + "profile=%d skipped for proxy correction", + iprofil_, + ) + def resample_variable( # noqa: PLR0913 self, instr: str, @@ -678,7 +905,8 @@ def resample_variable( # noqa: PLR0913 if instr == "biolume" and variable == "biolume_raw": # Only biolume_avg_biolume and biolume_flow treated like other data # All other biolume variables in self.df_r[] are computed from biolume_raw - self.add_biolume_proxies(freq) + biolume_fluo, biolume_sunsets, biolume_sunrises = self.add_biolume_proxies(freq) + self.correct_biolume_proxies(biolume_fluo, biolume_sunsets, biolume_sunrises) else: self.df_o[variable] = self.ds[variable].to_pandas() self.df_o[f"{variable}_mf"] = (