Skip to content
232 changes: 229 additions & 3 deletions src/data/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915
flash_threshold: float = FLASH_THRESHOLD,
proxy_ratio_adinos: float = 3.9811e13, # 4-Oct-2010 to 2-Dec-2020 value
proxy_cal_factor=0.00470, # Same as used in 5.2-mpm-bg_biolume-PiO-paper.ipynb
) -> None:
) -> tuple[pd.Series, list[datetime], list[datetime]]:
# Add variables via the calculations according to Appendix B in
# "Using fluorescence and bioluminescence sensors to characterize
# auto- and heterotrophic plankton communities" by Messie et al."
Expand Down Expand Up @@ -610,6 +610,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915
self.df_r["biolume_bg_biolume"].attrs["units"] = "photons/liter"
self.df_r["biolume_bg_biolume"].attrs["comment"] = zero_note

fluo = None
nighttime_bl_raw, sunsets, sunrises = self.select_nighttime_bl_raw()
if nighttime_bl_raw.empty:
self.logger.info(
Expand All @@ -621,7 +622,7 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915
self.logger.info(
"No hs2_fl700 data. Not computing adinos, diatoms, and hdinos",
)
return
return fluo, sunsets, sunrises
fluo = (
self.resampled_nc["hs2_fl700"]
.where(
Expand Down Expand Up @@ -664,6 +665,230 @@ def add_biolume_proxies( # noqa: PLR0913, PLR0915
f" = {proxy_ratio_adinos:.4e} and proxy_cal_factor = {proxy_cal_factor:.6f}"
)

return fluo, sunsets, sunrises

def correct_biolume_proxies( # noqa: PLR0913, PLR0915
self,
biolume_fluo: pd.Series, # from add_biolume_proxies
biolume_sunsets: list[datetime], # from add_biolume_proxies
biolume_sunrises: list[datetime], # from add_biolume_proxies
adinos_threshold: float = 0.1,
correction_threshold: int = 3,
fluo_bl_threshold: float = 0.35, # use 0.45 for pearson corr
corr_type: str = "pearson", # "spearman" or "pearson"
depth_threshold: float = 2.0,
minutes_from_surface_threshold: int = 5,
) -> None:
variables = [
"biolume_proxy_diatoms",
"biolume_proxy_adinos",
"biolume_proxy_hdinos",
"biolume_bg_biolume",
]
try:
df_p = self.df_r[variables].copy(deep=True)
except KeyError:
# We didn't add biolum proxies this round...
return

df_p["biolume_fluo"] = biolume_fluo
df_p["fluoBL_corr"] = np.full_like(df_p.biolume_fluo, np.nan)

depth_series = self.resampled_nc["depth"].to_series()
# df_p["depth"] = depth_series.reindex(df_p.index, method="ffill")
df_p = pd.merge_asof(
df_p, depth_series.to_frame(), left_index=True, right_index=True, direction="nearest"
)

self.logger.info(
"correct proxies: df_p depth count=%d nans=%d, resampled_nc depth count=%d nans=%d",
df_p.depth.count(),
df_p.depth.isna().sum(),
self.resampled_nc.depth.count(),
self.resampled_nc.depth.isnull().sum(), # noqa: PD003
)

profile_series = self.resampled_nc["profile_number"].to_series()
# df_p["profile_number"] = profile_series.reindex(df_p.index, method="ffill")
df_p = pd.merge_asof(
df_p, profile_series.to_frame(), left_index=True, right_index=True, direction="nearest"
)

self.logger.info(
"correct proxies: df_p max profile=%d (nans=%d), resampled_nc max profile=%d (nans=%d)",
df_p.profile_number.max(),
df_p.profile_number.isna().sum(),
self.resampled_nc.profile_number.max(),
self.resampled_nc.profile_number.isnull().sum(), # noqa: PD003
)

# new proxies are the "N" fields
for new, old in zip(
["diatomsN", "adinosN", "hdinosN"],
["biolume_proxy_diatoms", "biolume_proxy_adinos", "biolume_proxy_hdinos"],
strict=False,
):
df_p[new] = np.full_like(df_p[old], np.nan)

def _interval_contains_sunevent(
start: pd.Timestamp, end: pd.Timestamp, events: pd.DatetimeIndex
) -> bool:
mask = (events >= start) & (events <= end)
return mask.any()

biolume_sunsets = pd.DatetimeIndex(biolume_sunsets).sort_values()
biolume_sunrises = pd.DatetimeIndex(biolume_sunrises).sort_values()
profile_intervals = (
df_p.groupby("profile_number")
.apply(lambda g: (g.index.min(), g.index.max())) # pandas2.2.3: include_groups=False
.rename("interval")
.apply(pd.Series)
.rename(columns={0: "start", 1: "end"})
)
profile_intervals["has_sunset"] = profile_intervals.apply(
lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunsets),
axis=1,
)
profile_intervals["has_sunrise"] = profile_intervals.apply(
lambda row: _interval_contains_sunevent(row["start"], row["end"], biolume_sunrises),
axis=1,
)
profile_intervals["has_sunevent"] = (
profile_intervals["has_sunrise"] | profile_intervals["has_sunset"]
)
df_p["has_sunset"] = df_p["profile_number"].map(profile_intervals["has_sunset"])
df_p["has_sunrise"] = df_p["profile_number"].map(profile_intervals["has_sunrise"])
df_p["has_sunevent"] = df_p["profile_number"].map(profile_intervals["has_sunevent"])

# compute correlation per profil and then correct proxies
profil = df_p.profile_number
dt_5mins = np.timedelta64(timedelta(minutes=minutes_from_surface_threshold))
for iprofil_ in range(1, int(np.max(profil)) + 1):
iprofil = profil == iprofil_
has_sunevent = df_p.loc[iprofil, "has_sunevent"].any()
if has_sunevent: # set proxies for this profile to NaN
self.logger.info(
"Processing profile=%d for proxy correction: found sun event -- set NaN",
iprofil_,
)
target_indices = df_p.index[iprofil]
self.df_r.loc[target_indices, "biolume_proxy_adinos"] = np.nan
self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = np.nan
self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = np.nan
continue
# excludes surface, must be within 5 min of it
ideep = iprofil & (df_p.depth > depth_threshold)
itime = (df_p.index > (df_p.index[ideep].min() - dt_5mins)) & (
df_p.index < (df_p.index[ideep].max() + dt_5mins)
)
iprofil = iprofil & itime
if not np.any(iprofil):
# print(f'no corrections possible for {iprofil_=}')
continue
auv_profil = df_p.loc[iprofil]
self.logger.info(
"Processing profile=%d for proxy correction: total_points=%d > thresh=%d ?",
iprofil_,
auv_profil.shape[0],
correction_threshold,
)
if auv_profil.shape[0] > correction_threshold:
if (
np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold)
< correction_threshold
):
if auv_profil.biolume_proxy_adinos.count() == 0: # all proxies are NaN so skip
self.logger.info(
"Correcting proxies: valid adinos=%d < thresh=%d -- all NaN so skip",
np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold),
correction_threshold,
)
continue
# no correction for low fluo & biolum values
fluoBL_corr = 1.0
self.logger.info(
"Correcting proxies: valid adinos=%d < thresh=%d"
" -- using fluoBL_corr=%.4f, total_size_adinos=%d, nans=%d",
np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold),
correction_threshold,
fluoBL_corr,
auv_profil.biolume_proxy_adinos.shape[0],
auv_profil.biolume_proxy_adinos.isna().sum(),
)
else:
# correlation between fluo and bg_biolum computed on high
# adino values for each profile
idepth = (
auv_profil.depth
<= auv_profil.depth[auv_profil.biolume_proxy_adinos > adinos_threshold].max()
)
auv_profil_idepth = auv_profil[
["biolume_fluo", "biolume_bg_biolume", "depth"]
].loc[idepth]
# pandas' corr ignores NaN
fluoBL_corr = auv_profil_idepth.biolume_fluo.corr(
auv_profil_idepth.biolume_bg_biolume, method=corr_type
)
self.logger.info(
"Correcting proxies: valid adinos=%d > thresh=%d"
" -- using fluoBL_corr=%.4f, total_size_idepth=%d, nans=%d,"
" min_depth=%.4f, max_depth=%.4f",
np.sum(auv_profil.biolume_proxy_adinos > adinos_threshold),
correction_threshold,
fluoBL_corr,
auv_profil_idepth.shape[0],
auv_profil.biolume_proxy_adinos.isna().sum(),
auv_profil_idepth.depth.min(),
auv_profil_idepth.depth.max(),
)

# save correlation
df_p.loc[iprofil, "fluoBL_corr"] = fluoBL_corr
# self.logger.info(
# "Correcting proxies for profile=%d using fluoBL_corr=%.4f",
# iprofil_,
# fluoBL_corr,
# )

# scale between 0 and 1 first
fluoBL_correctionfactor = (fluoBL_corr + 1.0) / 2.0

# then scale between fluo_bl_threshold and 1
fluoBL_correctionfactor = (
fluoBL_correctionfactor * (1.0 - fluo_bl_threshold) + fluo_bl_threshold
)

# can happen if fluo_bl_threshold is negative
fluoBL_correctionfactor = max(fluoBL_correctionfactor, 0.0)

df_p.loc[iprofil, "adinosN"] = (
df_p.biolume_proxy_adinos[iprofil] * fluoBL_correctionfactor
)

# preserving adinos+diatoms
df_p.loc[iprofil, "diatomsN"] = (
df_p.biolume_proxy_adinos[iprofil]
+ df_p.biolume_proxy_diatoms[iprofil]
- df_p.adinosN[iprofil]
)

# preserving adinos+hdinos
df_p.loc[iprofil, "hdinosN"] = (
df_p.biolume_proxy_adinos[iprofil]
+ df_p.biolume_proxy_hdinos[iprofil]
- df_p.adinosN[iprofil]
)

target_indices = df_p.index[iprofil]
self.df_r.loc[target_indices, "biolume_proxy_adinos"] = df_p.adinosN.loc[iprofil]
self.df_r.loc[target_indices, "biolume_proxy_diatoms"] = df_p.diatomsN.loc[iprofil]
self.df_r.loc[target_indices, "biolume_proxy_hdinos"] = df_p.hdinosN.loc[iprofil]
else:
self.logger.info(
"profile=%d skipped for proxy correction",
iprofil_,
)

def resample_variable( # noqa: PLR0913
self,
instr: str,
Expand All @@ -678,7 +903,8 @@ def resample_variable( # noqa: PLR0913
if instr == "biolume" and variable == "biolume_raw":
# Only biolume_avg_biolume and biolume_flow treated like other data
# All other biolume variables in self.df_r[] are computed from biolume_raw
self.add_biolume_proxies(freq)
biolume_fluo, biolume_sunsets, biolume_sunrises = self.add_biolume_proxies(freq)
self.correct_biolume_proxies(biolume_fluo, biolume_sunsets, biolume_sunrises)
else:
self.df_o[variable] = self.ds[variable].to_pandas()
self.df_o[f"{variable}_mf"] = (
Expand Down
Loading