diff --git a/src/data/resample.py b/src/data/resample.py index 162a5028..64b7b73e 100755 --- a/src/data/resample.py +++ b/src/data/resample.py @@ -461,7 +461,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 flash_threshold: float = FLASH_THRESHOLD, proxy_ratio_adinos: float = 3.9811e13, # 4-Oct-2010 to 2-Dec-2020 value proxy_cal_factor=0.00470, # Same as used in 5.2-mpm-bg_biolume-PiO-paper.ipynb - ) -> None: + ) -> tuple[pd.Series, list[datetime], list[datetime]]: # Add variables via the calculations according to Appendix B in # "Using fluorescence and bioluminescence sensors to characterize # auto- and heterotrophic plankton communities" by Messie et al." @@ -610,6 +610,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 self.df_r["biolume_bg_biolume"].attrs["units"] = "photons/liter" self.df_r["biolume_bg_biolume"].attrs["comment"] = zero_note + fluo = None nighttime_bl_raw, sunsets, sunrises = self.select_nighttime_bl_raw() if nighttime_bl_raw.empty: self.logger.info( @@ -621,7 +622,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 self.logger.info( "No hs2_fl700 data. Not computing adinos, diatoms, and hdinos", ) - return + return fluo, sunsets, sunrises fluo = ( self.resampled_nc["hs2_fl700"] .where( @@ -664,6 +665,230 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915 f" = {proxy_ratio_adinos:.4e} and proxy_cal_factor = {proxy_cal_factor:.6f}" ) + return fluo, sunsets, sunrises + + def correct_biolume_proxies( # noqa: PLR0913, PLR0915 + self, + biolume_fluo: pd.Series, # from add_biolume_proxies + biolume_sunsets: list[datetime], # from add_biolume_proxies + biolume_sunrises: list[datetime], # from add_biolume_proxies + adinos_threshold: float = 0.1, + correction_threshold: int = 3, + fluo_bl_threshold: float = 0.35, # use 0.45 for pearson corr + corr_type: str = "pearson", # "spearman" or "pearson" + depth_threshold: float = 2.0, + minutes_from_surface_threshold: int = 5, + ) -> None: + variables = [ + "biolume_proxy_diatoms", + "biolume_proxy_adinos", + "biolume_proxy_hdinos", + "biolume_bg_biolume", + ] + try: + df_p = self.df_r[variables].copy(deep=True) + except KeyError: + # We didn't add biolum proxies this round... + return + + df_p["biolume_fluo"] = biolume_fluo + df_p["fluoBL_corr"] = np.full_like(df_p.biolume_fluo, np.nan) + + depth_series = self.resampled_nc["depth"].to_series() + # df_p["depth"] = depth_series.reindex(df_p.index, method="ffill") + df_p = pd.merge_asof( + df_p, depth_series.to_frame(), left_index=True, right_index=True, direction="nearest" + ) + + self.logger.info( + "correct proxies: df_p depth count=%d nans=%d, resampled_nc depth count=%d nans=%d", + df_p.depth.count(), + df_p.depth.isna().sum(), + self.resampled_nc.depth.count(), + self.resampled_nc.depth.isnull().sum(), # noqa: PD003 + ) + + profile_series = self.resampled_nc["profile_number"].to_series() + # df_p["profile_number"] = profile_series.reindex(df_p.index, method="ffill") + df_p = pd.merge_asof( + df_p, profile_series.to_frame(), left_index=True, right_index=True, direction="nearest" + ) + + self.logger.info( + "correct proxies: df_p max profile=%d (nans=%d), resampled_nc max profile=%d (nans=%d)", + df_p.profile_number.max(), + df_p.profile_number.isna().sum(), + self.resampled_nc.profile_number.max(), + self.resampled_nc.profile_number.isnull().sum(), # noqa: PD003 + ) + + # new proxies are the "N" fields + for new, old in zip( + ["diatomsN", "adinosN", "hdinosN"], + ["biolume_proxy_diatoms", "biolume_proxy_adinos", "biolume_proxy_hdinos"], + strict=False, + ): + df_p[new] = np.full_like(df_p[old], np.nan) + + def _interval_contains_sunevent( + start: pd.Timestamp, end: pd.Timestamp, events: pd.DatetimeIndex + ) -> bool: + mask = (events >= start) & (events <= end) + return mask.any() + + biolume_sunsets = pd.DatetimeIndex(biolume_sunsets).sort_values() + biolume_sunrises = pd.DatetimeIndex(biolume_sunrises).sort_values() + profile_intervals = ( + df_p.groupby("profile_number") + .apply(lambda g: (g.index.min(), g.index.max())) # pandas2.2.3: include_groups=False + .rename("interval") + .apply(pd.Series) + .rename(columns={0: "start", 1: "end"}) + ) + profile_intervals["has_sunset"] = profile_intervals.apply( + lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunsets), + axis=1, + ) + profile_intervals["has_sunrise"] = profile_intervals.apply( + lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunrises), + axis=1, + ) + profile_intervals["has_sunevent"] = ( + profile_intervals["has_sunrise"] | profile_intervals["has_sunset"] + ) + df_p["has_sunset"] = df_p["profile_number"].map(profile_intervals["has_sunset"]) + df_p["has_sunrise"] = df_p["profile_number"].map(profile_intervals["has_sunrise"]) + df_p["has_sunevent"] = df_p["profile_number"].map(profile_intervals["has_sunevent"]) + + # compute correlation per profil and then correct proxies + profil = df_p.profile_number + dt_5mins = np.timedelta64(timedelta(minutes=minutes_from_surface_threshold)) + for iprofil_ in range(1, int(np.max(profil)) + 1): + iprofil = profil == iprofil_ + has_sunevent = df_p.loc[iprofil, "has_sunevent"].any() + if has_sunevent: # set proxies for this profile to NaN + self.logger.info( + "Processing profile=%d for proxy correction: found sun event -- set NaN", + iprofil_, + ) + target_indices = df_p.index[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_adinos"] = np.nan + self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = np.nan + self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = np.nan + continue + # excludes surface, must be within 5 min of it + ideep = iprofil & (df_p.depth > depth_threshold) + itime = (df_p.index > (df_p.index[ideep].min() - dt_5mins)) & ( + df_p.index < (df_p.index[ideep].max() + dt_5mins) + ) + iprofil = iprofil & itime + if not np.any(iprofil): + # print(f'no corrections possible for {iprofil_=}') + continue + auv_profil = df_p.loc[iprofil] + self.logger.info( + "Processing profile=%d for proxy correction: total_points=%d > thresh=%d ?", + iprofil_, + auv_profil.shape[0], + correction_threshold, + ) + if auv_profil.shape[0] > correction_threshold: + if ( + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold) + < correction_threshold + ): + if auv_profil.biolume_proxy_adinos.count() == 0: # all proxies are NaN so skip + self.logger.info( + "Correcting proxies: valid adinos=%d < thresh=%d -- all NaN so skip", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + ) + continue + # no correction for low fluo & biolum values + fluoBL_corr = 1.0 + self.logger.info( + "Correcting proxies: valid adinos=%d < thresh=%d" + " -- using fluoBL_corr=%.4f, total_size_adinos=%d, nans=%d", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + fluoBL_corr, + auv_profil.biolume_proxy_adinos.shape[0], + auv_profil.biolume_proxy_adinos.isna().sum(), + ) + else: + # correlation between fluo and bg_biolum computed on high + # adino values for each profile + idepth = ( + auv_profil.depth + <= auv_profil.depth[auv_profil.biolume_proxy_adinos > adinos_threshold].max() + ) + auv_profil_idepth = auv_profil[ + ["biolume_fluo", "biolume_bg_biolume", "depth"] + ].loc[idepth] + # pandas' corr ignores NaN + fluoBL_corr = auv_profil_idepth.biolume_fluo.corr( + auv_profil_idepth.biolume_bg_biolume, method=corr_type + ) + self.logger.info( + "Correcting proxies: valid adinos=%d > thresh=%d" + " -- using fluoBL_corr=%.4f, total_size_idepth=%d, nans=%d," + " min_depth=%.4f, max_depth=%.4f", + np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold), + correction_threshold, + fluoBL_corr, + auv_profil_idepth.shape[0], + auv_profil.biolume_proxy_adinos.isna().sum(), + auv_profil_idepth.depth.min(), + auv_profil_idepth.depth.max(), + ) + + # save correlation + df_p.loc[iprofil, "fluoBL_corr"] = fluoBL_corr + # self.logger.info( + # "Correcting proxies for profile=%d using fluoBL_corr=%.4f", + # iprofil_, + # fluoBL_corr, + # ) + + # scale between 0 and 1 first + fluoBL_correctionfactor = (fluoBL_corr + 1.0) / 2.0 + + # then scale between fluo_bl_threshold and 1 + fluoBL_correctionfactor = ( + fluoBL_correctionfactor * (1.0 - fluo_bl_threshold) + fluo_bl_threshold + ) + + # can happen if fluo_bl_threshold is negative + fluoBL_correctionfactor = max(fluoBL_correctionfactor, 0.0) + + df_p.loc[iprofil, "adinosN"] = ( + df_p.biolume_proxy_adinos[iprofil] * fluoBL_correctionfactor + ) + + # preserving adinos+diatoms + df_p.loc[iprofil, "diatomsN"] = ( + df_p.biolume_proxy_adinos[iprofil] + + df_p.biolume_proxy_diatoms[iprofil] + - df_p.adinosN[iprofil] + ) + + # preserving adinos+hdinos + df_p.loc[iprofil, "hdinosN"] = ( + df_p.biolume_proxy_adinos[iprofil] + + df_p.biolume_proxy_hdinos[iprofil] + - df_p.adinosN[iprofil] + ) + + target_indices = df_p.index[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_adinos"] = df_p.adinosN.loc[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = df_p.diatomsN.loc[iprofil] + self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = df_p.hdinosN.loc[iprofil] + else: + self.logger.info( + "profile=%d skipped for proxy correction", + iprofil_, + ) + def resample_variable( # noqa: PLR0913 self, instr: str, @@ -678,7 +903,8 @@ def resample_variable( # noqa: PLR0913 if instr == "biolume" and variable == "biolume_raw": # Only biolume_avg_biolume and biolume_flow treated like other data # All other biolume variables in self.df_r[] are computed from biolume_raw - self.add_biolume_proxies(freq) + biolume_fluo, biolume_sunsets, biolume_sunrises = self.add_biolume_proxies(freq) + self.correct_biolume_proxies(biolume_fluo, biolume_sunsets, biolume_sunrises) else: self.df_o[variable] = self.ds[variable].to_pandas() self.df_o[f"{variable}_mf"] = (