Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modularised cpdAssign #68

Draft
wants to merge 23 commits into
base: ustar_cp_refactor_main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions oneflux_steps/ustar_cp_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .cpdFmax2pCore import interpolate_FmaxCritical, calculate_p_low, calculate_p_interpolate
from .fcDatenum import datenum
from .fcBin import fcBin
from .cpdAssignUStarTh import identifyOutliers

from os.path import dirname, basename, isfile, join
import glob
Expand Down
76 changes: 76 additions & 0 deletions oneflux_steps/ustar_cp_python/aggregateSeasonalAndAnnualValues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import numpy as np

def aggregateSeasonalAndAnnualValues(
xCp,
iSelect,
nDim,
nWindows,
nStrata,
nBoot
):
"""
Python equivalent of the MATLAB function aggregateSeasonalAndAnnualValues.

Parameters
----------
xCp : array-like
In MATLAB, can be 2D [nWindows, nBoot] or 3D [nWindows, nStrata, nBoot].
In Python, pass a nested list or NumPy array with the same shape.
iSelect : array-like of int
1-based linear indices in MATLAB's column-major ordering that should be selected.
We subtract 1 and interpret them in the same column-major order in Python.
nDim : int
2 or 3 (to indicate if xCp is 2D or 3D).
nWindows : int
nStrata : int
nBoot : int

Returns
-------
CpA : np.ndarray
Aggregated mean of selected change points along the appropriate dimension.
For nDim=2, shape is (nBoot,). For nDim=3, shape is (nBoot,).
nA : np.ndarray
Count of non-NaN selected points. Same shape as CpA.
xCpSelect : np.ndarray
Same shape as xCp, with NaN everywhere except the selected positions.
"""
# # Convert inputs to float arrays
# xCp = np.asarray(xCp, dtype=float)

# Prepare an output array (same shape) filled with NaNs
xCpSelect = np.full_like(xCp, np.nan, dtype=float)

# Flatten both arrays in column-major (Fortran) order to replicate MATLAB indexing
# xCp_flat_F = xCp.flatten(order='F')
# xCpSelect_flat_F = xCpSelect.flatten(order='F')

iSelect_array = np.asarray(iSelect, dtype=int)

# Assign the selected change points in the flattened array
# mask xCp using iSelect_array
xCpSelect = xCp.copy()
for i in range(len(xCp)):
if iSelect_array[i] == 0:
xCpSelect[i] = np.nan

# Reshape back to original shape (column-major)
#xCpSelect = xCpSelect_flat_F.reshape(xCp.shape, order='F')
xCpGF = xCpSelect # Just like the MATLAB code

# Aggregate values based on dimensions
if nDim == 2:
# xCp shape = [nWindows, nBoot]
# MATLAB default nanmean(xCpGF) => mean across axis=0 in Python
CpA = np.nanmean(xCpGF, axis=0)
nA = np.sum(~np.isnan(xCpSelect), axis=0)
elif nDim == 3:
# xCp shape = [nWindows, nStrata, nBoot]
# reshape => (nWindows*nStrata, nBoot) in column-major
xCpGF_reshaped = xCpGF.reshape(nWindows * nStrata, nBoot, order='F')
CpA = np.nanmean(xCpGF_reshaped, axis=0)
nA = np.sum(~np.isnan(xCpGF_reshaped), axis=0)
else:
raise ValueError("Invalid number of dimensions: Expected 2D or 3D Stats.")

return CpA, nA, xCpSelect
109 changes: 109 additions & 0 deletions oneflux_steps/ustar_cp_python/aggregateSeasonalMeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from typing import Tuple
import numpy as np
from numpy.typing import NDArray
from oneflux_steps.ustar_cp_python.fcBin import fcBin

def aggregateSeasonalMeans(mt: NDArray, Cp: NDArray, xmt: NDArray, iSelect: NDArray, nWindows: int, nStrata: int, nBoot: int)-> Tuple[NDArray, NDArray]:
"""
Python equivalent of the MATLAB function aggregateSeasonalMeans.
Aggregates seasonal means for time and change points.

Parameters
----------
mt : array-like
The time data.
Cp : array-like
The corresponding change point values.
xmt : array-like
A reference array that must reshape cleanly to (nWindows, nStrata * nBoot).
Used to compute the median number of windows (nW).
iSelect : array-like of int
Numeric indices (1-based in MATLAB) of selected measurements to use.
For Python (0-based), ensure you pass valid 0-based indices.
nWindows : int
nStrata : int
nBoot : int

Returns
-------
tW : np.ndarray
The seasonal mean times for each bin.
CpW : np.ndarray
The seasonal mean change points for each bin.
"""

# ----------------------------------------------------------
# 1) Calculate Median Number of Windows
# In MATLAB:
# reshape(xmt, nWindows, nStrata * nBoot)
# Here we reshape the 1D array xmt => shape (nWindows, nStrata*nBoot)
# ----------------------------------------------------------
try:
xmt_reshaped = xmt.reshape(nWindows, nStrata * nBoot)
except ValueError:
raise ValueError(
f"Cannot reshape xmt of length {xmt.size} "
f"to shape ({nWindows}, {nStrata*nBoot})."
)

# sum(~isnan(...)) in MATLAB => np.sum(~np.isnan(...)) in NumPy
# nanmedian(...) => np.nanmedian(...)
# => sum over axis=1 if you want row sums (assuming typical usage)
# but in the MATLAB code it's sum( (nWindows, nStrata*nBoot) ) => axis=0 by default
nW_array = np.sum(~np.isnan(xmt_reshaped), axis=0) # shape: (nStrata*nBoot,)
nW = np.nanmedian(nW_array)

# ----------------------------------------------------------
# 2) Sort Selected Measurements
# MATLAB:
# [mtSelect, i] = sort(mt(iSelect));
# CpSelect = Cp(iSelect(i));
# ----------------------------------------------------------
# *In Python*, iSelect is an array of *0-based* indices.
# We then do:
iSelect = iSelect.astype(int)
selected_mt = mt[iSelect]
# Sort them while keeping track of the sorted order
sort_order = np.argsort(selected_mt) # array of int
mtSelect = selected_mt[sort_order]
# Now reorder Cp similarly
selected_Cp = Cp[iSelect]
CpSelect = selected_Cp[sort_order]

# ----------------------------------------------------------
# 3) Define Bins Based on Percentiles
# xBins = prctile(mtSelect, 0:(100/nW):100);
# In Python, np.percentile(...)
# and we create an array of percentiles from 0 to 100 in steps of (100/nW)
# But nW might be float => we do something like
# np.linspace(0, 100, int(nW)+1) or something similar. However,
# to replicate MATLAB's 0:(100/nW):100 exactly, we must handle floats carefully.
# We'll do:
if nW < 1:
# Fallback: if nW < 1 for some reason, do 1 bin, i.e. [0, 100]
pct_array = [0, 100]
else:
step = 100.0 / nW # step size
# The number of steps is int(nW)+1 if nW is integer-like.
# However, if nW is float, MATLAB still enumerates 0, step, 2*step,..., up to 100.
# We'll replicate that logic using np.arange, then clip at <=100
pct_array = []
current = 0.0
while current < 100.0 + 1e-9:
pct_array.append(current)
current += step
# In case floating arithmetic overshoots 100 a bit
pct_array[-1] = 100.0

# Now compute bin edges from these percentiles
xBins = np.percentile(mtSelect, pct_array)

# ----------------------------------------------------------
# 4) fcBin equivalent in Python
# [~, tW, CpW] = fcBin(mtSelect, CpSelect, xBins, 0);


# Run fcBin
nCount, tW, CpW = fcBin(mtSelect, CpSelect, xBins, 0)

return tW, CpW
85 changes: 85 additions & 0 deletions oneflux_steps/ustar_cp_python/cpdAssignUStarTh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import numpy as np
from scipy.optimize import curve_fit
from oneflux_steps.ustar_cp_python.fcNaniqr import fcNaniqr
from oneflux_steps.ustar_cp_python.fcEqnAnnualSine import fcEqnAnnualSine
from oneflux_steps.ustar_cp_python.fcr2Calc import fcr2Calc




def identifyOutliers(x_norm_x, threshold):
"""
Identifies outliers based on standardized scores.

Parameters:
x_norm_x (numpy.ndarray): Array of normalized values.
threshold (float): Threshold value for outlier detection.

Returns:
tuple: A boolean array (f_out) indicating outliers and an array (i_out) of outlier indices.
"""
f_out = (x_norm_x > threshold)
i_out = np.where(f_out)[0] # Indices of outliers

return f_out, i_out

def computeStandardizedScores(x):
"""Standardizes the matrix x by its median and interquartile range."""
mx = np.nanmedian(x, axis=0) # Compute median ignoring NaNs
iqr = fcNaniqr(x)
x_norm = (x - mx) / iqr # Standardize

return np.nanmax(np.abs(x_norm), axis=1, keepdims=True) # Max absolute standardized score per row



def fitAnnualSineCurve(mt, Cp, iSelect):
"""
Fits an annual sine curve to the selected data points and returns
the sine coefficients and R-squared value.

Parameters
----------
mt : array-like
The time data (e.g., day of year).
Cp : array-like
The corresponding values to be fitted.
iSelect : array-like of bool
A boolean mask or indices that indicate which points to use
for the fitting.

Returns
-------
sSine : np.ndarray
A 1D array of length 4:
[offset, amplitude, phase, rSquared]
"""
# Prepare the data
xdata = np.array(mt)[iSelect]
ydata = np.array(Cp)[iSelect]

# Define a local function for curve_fit: curve_fit expects
# a callable f(t, b0, b1, b2) with first arg = x, subsequent = params
def _annual_sine_for_curve_fit(t, b0, b1, b2):
return fcEqnAnnualSine(np.asarray([b0, b1, b2]), t)

# Initial guess for [offset, amplitude, phase]
initial_guess = [1.0, 1.0, 1.0]

# Perform the fit via non-linear regression
popt, _ = curve_fit(_annual_sine_for_curve_fit, xdata, ydata, p0=initial_guess)

# Compute predicted values for the fitted parameters
predictedCp = fcEqnAnnualSine(np.asarray(popt), xdata)

# Compute R-squared
r2 = fcr2Calc(ydata, predictedCp)

# Adjust phase to be in the range [0, 365.25)
popt[2] = popt[2] % 365.25

# Return fitted coefficients plus the R-squared value
# List containing ndarray to make output format match matlab for comparative testing
sSine = np.array([popt[0], popt[1], popt[2], r2], dtype=float)

return sSine
46 changes: 46 additions & 0 deletions oneflux_steps/ustar_cp_python/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import numpy as np
from scipy.stats import f
from scipy.interpolate import PchipInterpolator


Fmax = 6.89397364185411
n= 53


""" The critical F-max values as a 2D NumPy array. """
FmaxTable : np.ndarray = np.array([
[3.9293, 6.2992, 9.1471, 18.2659],
[3.7734, 5.6988, 7.8770, 13.8100],
[3.7516, 5.5172, 7.4426, 12.6481],
[3.7538, 5.3224, 7.0306, 11.4461],
[3.7941, 5.3030, 6.8758, 10.6635],
[3.8548, 5.3480, 6.8883, 10.5026],
[3.9798, 5.4465, 6.9184, 10.4527],
[4.0732, 5.5235, 6.9811, 10.3859],
[4.1467, 5.6136, 7.0624, 10.5596],
[4.2770, 5.7391, 7.2005, 10.6871],
[4.4169, 5.8733, 7.3421, 10.6751],
[4.5556, 6.0591, 7.5627, 11.0072],
[4.7356, 6.2738, 7.7834, 11.2319]
])

""" A 1D NumPy array containing sample sizes. """
# tmp run to 200 1e5 reps
nTable : np.ndarray = np.array([10, 15, 20, 30, 50, 70, 100
, 150, 200, 300, 500, 700, 1000])

""" A 1D NumPy array containing significance levels. """
pTable : np.ndarray = np.array([0.80, 0.90, 0.95, 0.99])

pTableSize = len(pTable)
FmaxCritical = np.zeros(pTableSize)
for ip in range(pTableSize):
interpolator = PchipInterpolator(nTable, FmaxTable[:, ip])
FmaxCritical[ip] = interpolator(n)
try:
interpolator = PchipInterpolator(FmaxCritical, 1 - pTable, extrapolate=True)
y = interpolator(Fmax)
except ValueError:
y = np.nan

print(y)
Loading