Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 107 additions & 10 deletions python/lsst/pipe/tasks/prettyPictureMaker/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

__all__ = (
"ChannelRGBConfig",
"PrettyPictureTask",
"PrettyPictureConnections",
"PrettyPictureConfig",
Expand All @@ -40,9 +41,8 @@
from typing import TYPE_CHECKING, cast, Any
from lsst.skymap import BaseSkyMap

from scipy.stats import norm
from scipy.stats import norm, halfnorm, mode
from scipy.ndimage import binary_dilation
from scipy.optimize import minimize
from scipy.interpolate import RBFInterpolator
from skimage.restoration import inpaint_biharmonic

Expand Down Expand Up @@ -84,7 +84,7 @@ class PrettyPictureConnections(
"Model of the static sky, used to find temporal artifacts. Typically a PSF-Matched, "
"sigma-clipped coadd. Written if and only if assembleStaticSkyModel.doWrite=True"
),
name="{coaddTypeName}CoaddPsfMatched",
name="pretty_coadd",
storageClass="ExposureF",
dimensions=("tract", "patch", "skymap", "band"),
multiple=True,
Expand Down Expand Up @@ -252,6 +252,17 @@ class PrettyPictureConfig(PipelineTaskConfig, pipelineConnections=PrettyPictureC
"inpaint": "Use surrounding pixels to determine likely value",
},
)
recenterNoise = Field[float](
doc="Recenter the noise away from zero. Supplied value is in units of sigma",
optional=True,
default=None,
)
noiseSearchThreshold = Field[float](
doc=(
"Flux threshold below which most flux will be considered noise, used to estimate noise properties"
),
default=10,
)

def setDefaults(self):
self.channelConfig["i"] = ChannelRGBConfig(r=1, g=0, b=0)
Expand All @@ -268,6 +279,92 @@ class PrettyPictureTask(PipelineTask):

config: ConfigClass

def _find_normal_stats(self, array):
"""Calculate standard deviation from negative values using half-normal distribution.

Parameters
----------
array : `numpy.array`
Input array of numerical values.

Returns
-------
mean : `float`
The central moment of the distribution
sigma : `float`
Estimated standard deviation from negative values. Returns np.inf if:
- No negative values exist in the array
- Half-normal fitting fails
"""
# Extract negative values efficiently
values_noise = array[array < self.config.noiseSearchThreshold]

# find the mode
center = mode(np.round(values_noise, 2)).mode

# extract the negative values
values_neg = array[array < center]

# Return infinity if no negative values found
if values_neg.size == 0:
return 0, np.inf

try:
# Fit half-normal distribution to absolute negative values
mu, sigma = halfnorm.fit(np.abs(values_neg))
except (ValueError, RuntimeError):
# Handle fitting failures (e.g., constant data, optimization issues)
return 0, np.inf

return center, sigma

def _match_sigmas_and_recenter(self, *arrays, factor=1):
"""Scale array values to match minimum standard deviation across arrays
and recenter noise.

Adjusts values below each array's sigma by scaling and shifting them to
align with the minimum sigma value across all input arrays. This operates
in-place for efficiency.

Parameters
----------
*arrays : any number of `numpy.array`
Variable number of input arrays to process.
factor : float, optional
Scaling factor for adjustments (default: 1).

"""
# Calculate standard deviations for all arrays
sigmas = []
mus = []
for arr in arrays:
m, s = self._find_normal_stats(arr)
mus.append(m)
sigmas.append(s)
mus = np.array(mus)
sigmas = np.array(sigmas)

# If no sigmas could be determined, return the original
# arrays.
if not np.any(np.isfinite(sigmas)):
return

min_sig = np.min(sigmas)

for mu, sigma, array in zip(mus, sigmas, arrays):
# Identify values below the array's sigma threshold
lower_pos = (array - mu) < sigma

# Skip processing if sigma is invalid
if not np.isfinite(sigma):
continue

# Calculate scaling ratio relative to minimum sigma
sigma_ratio = min_sig / sigma

# Apply adjustment to qualifying values
array[lower_pos] = (array[lower_pos] - mu) * sigma_ratio + min_sig * factor

def run(self, images: Mapping[str, Exposure]) -> Struct:
"""Turns the input arguments in arguments into an RGB array.

Expand Down Expand Up @@ -337,6 +434,11 @@ def run(self, images: Mapping[str, Exposure]) -> Struct:
except Exception:
psf = None

if self.config.recenterNoise:
self._match_sigmas_and_recenter(
imageRArray, imageGArray, imageBArray, factor=self.config.recenterNoise
)

# assert for typing reasons
assert jointMask is not None
# Run any image level correction plugins
Expand Down Expand Up @@ -635,13 +737,8 @@ def fixBackground(self, image):
if np.any(mask):
# use a minimizer to determine best mu and sigma for a Gaussian
# given only samples below the mean of the Gaussian.
result = minimize(
self._neg_log_likelihood,
(maxLikely, initial_std),
args=(image[mask]),
bounds=((maxLikely, None), (1e-8, None)),
)
mu_hat, sigma_hat = result.x
_, sigma_hat = halfnorm.fit(np.abs(image[mask] - maxLikely))
mu_hat = maxLikely
else:
mu_hat, sigma_hat = (maxLikely, 2 * initial_std)

Expand Down