Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 428 additions & 0 deletions docs/pre_executed/demo_one-band.ipynb

Large diffs are not rendered by default.

144 changes: 118 additions & 26 deletions src/uncle_val/learning/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import math
from collections.abc import Callable, Sequence
from functools import cached_property
from pathlib import Path

import torch

from uncle_val.utils.mag_flux import mag2flux
from uncle_val.utils.mag_flux import fluxerr2magerr, mag2flux, magerr2fluxerr


class UncleScaler:
Expand Down Expand Up @@ -78,21 +79,11 @@ def normalizers(self, input_names) -> dict[int, Callable]:
return normalizers


class UncleModel(torch.nn.Module):
"""Base class for u-function learning
class BaseUncleModel(torch.nn.Module):
"""Abstract base class for u-function learning

You must re-implement __init__() to call super().__init__()
and to assign `.module`.

The output is either 1-d or 2-d:
- 0th index is -ln(uu), so u = exp(-output[0])
- 1st index is s
The original residual is defined as: (flux - avg_flux) / err,
where avg_flux is sum(flux / err^2) / sum(1 / err^2).
The transformed uncertainty is defined is corrected_err = u*err,
the transformed flux is corrected_flux = flux + s * u * err,
so the transformed residual is (flux + s * u * err - avg_flux) / (u * err),
where avg_flux is sum[(flux + s * u * err) / (u * err)^2] / sum(1 / (u * err)^2).
Provides common initialization and utilities. Subclasses must implement
``forward()``.

Parameters
----------
Expand All @@ -104,8 +95,6 @@ class UncleModel(torch.nn.Module):
`u` and `s`.
"""

module: torch.nn.Module

def __init__(self, *, input_names: Sequence[str], outputs_s: bool) -> None:
super().__init__()

Expand All @@ -124,14 +113,6 @@ def norm_inputs(self, inputs: torch.Tensor) -> torch.Tensor:
normed[..., idx] = f(inputs[..., idx])
return normed

def forward(self, inputs: torch.Tensor) -> torch.Tensor:
"""Compute the output of the model"""
output = self.module(self.norm_inputs(inputs))
# u, uncertainty underestimation
output[..., 0] = torch.exp(-output[..., 0])
# We don't scale s, so nothing to do here
return output

def save_onnx(self, path: Path | str) -> None:
"""Save the model to an ONNX file.

Expand All @@ -154,6 +135,43 @@ def save_onnx(self, path: Path | str) -> None:
)


class UncleModel(BaseUncleModel):
"""Base class for u-function learning

You must re-implement __init__() to call super().__init__()
and to assign `.module`.

The output is either 1-d or 2-d:
- 0th index is -ln(uu), so u = exp(-output[0])
- 1st index is s
The original residual is defined as: (flux - avg_flux) / err,
where avg_flux is sum(flux / err^2) / sum(1 / err^2).
The transformed uncertainty is defined is corrected_err = u*err,
the transformed flux is corrected_flux = flux + s * u * err,
so the transformed residual is (flux + s * u * err - avg_flux) / (u * err),
where avg_flux is sum[(flux + s * u * err) / (u * err)^2] / sum(1 / (u * err)^2).

Parameters
----------
input_names : list of str
Names of input dimensions, used for defining normalizers and for the
dimensionality of the first model layer.
outputs_s : bool
False would make the model to return `u` only, True would return both
`u` and `s`.
"""

module: torch.nn.Module

def forward(self, inputs: torch.Tensor) -> torch.Tensor:
"""Compute the output of the model"""
output = self.module(self.norm_inputs(inputs))
# u, uncertainty underestimation
output[..., 0] = torch.exp(-output[..., 0])
# We don't scale s, so nothing to do here
return output


class MLPModel(UncleModel):
"""Multi-layer Perceptron (MLP) model for the Uncle function

Expand Down Expand Up @@ -235,4 +253,78 @@ def norm_inputs(self, inputs: torch.Tensor) -> torch.Tensor:
def module(self, inputs: torch.Tensor) -> torch.Tensor:
"""Compute the output of the model"""
shape = inputs.shape[:-1]
return self.vector.repeat(*shape, 1)
return self.vector.repeat(shape + (self.d_output,))


class MagErrModel(BaseUncleModel):
"""Base class for magnitude-error physics models

Subclasses must assign ``self.module`` to a ``torch.nn.Module`` whose
``forward()`` returns the systematic magnitude error in centi-magnitudes
(i.e. the value is multiplied by ``1e-2`` to get magnitudes).

The corrected flux error is computed by adding the systematic magnitude
error in quadrature to the photon-noise magnitude error:

new_mag_err = hypot(mag_err, systematic_mag_err)
u = magerr2fluxerr(new_mag_err) / flux_err

Parameters
----------
input_names : list of str
Names of input dimensions. Must include a flux column (``'flux'`` or
``'x'``) and an error column (``'err'``).
"""

flux_floor = mag2flux(30.0)
ln10_0_4 = 0.4 * math.log(10.0)

module: torch.nn.Module

def __init__(self, *, input_names: Sequence[str]) -> None:
super().__init__(input_names=input_names, outputs_s=False)

if "flux" in input_names:
self.flux_column = self.input_names.index("flux")
elif "x" in input_names:
self.flux_column = self.input_names.index("x")
else:
raise ValueError("input_names must include flux name, either 'flux' or 'x'")

if "err" in input_names:
self.err_column = self.input_names.index("err")
else:
raise ValueError("input_names must include 'err'")

def forward(self, inputs: torch.Tensor) -> torch.Tensor:
"""Compute the output of the model"""
systematic_mag_err = 1e-2 * self.module(self.norm_inputs(inputs))

flux = torch.maximum(
inputs[..., self.flux_column], torch.tensor(self.flux_floor, device=inputs.device)
)
flux_err = inputs[..., self.err_column]
mag_err = fluxerr2magerr(flux=flux, flux_err=flux_err)
new_mag_err = torch.hypot(mag_err, systematic_mag_err)
new_flux_err = magerr2fluxerr(flux=flux, mag_err=new_mag_err)
u = new_flux_err / flux_err
return u[..., None]


class ConstantMagErrModel(MagErrModel):
"""Uncle function adds a constant systematic magnitude error in quadrature

Parameters
----------
input_names : list of str
Names of input dimensions. Must include a flux column (``'flux'`` or
``'x'``) and an error column (``'err'``).
"""

def __init__(self, input_names: Sequence[str]) -> None:
super().__init__(input_names=input_names)
self.addition_centi_mag_err = torch.nn.Parameter(torch.ones(1))

def module(self, inputs: torch.Tensor) -> torch.Tensor:
"""Trainable systematic magnitude error addition in centi-magnitudes."""
return self.addition_centi_mag_err
10 changes: 5 additions & 5 deletions src/uncle_val/learning/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from torch.optim import Optimizer

from uncle_val.learning.losses import UncleLoss
from uncle_val.learning.models import UncleModel
from uncle_val.learning.models import BaseUncleModel


def train_step(
*,
optimizer: Optimizer,
model: UncleModel,
model: BaseUncleModel,
loss: UncleLoss,
batch: Tensor,
) -> Tensor:
Expand All @@ -20,7 +20,7 @@ def train_step(
----------
optimizer : torch.optim.Optimizer
Optimizer to use for training
model : UncleModel
model : BaseUncleModel
Model to train, input vector size is d_input.
loss : callable, udf(flux, err) -> loss_value
Loss function to call on corrected fluxes and errors.
Expand Down Expand Up @@ -50,7 +50,7 @@ def train_step(

def evaluate_loss(
*,
model: UncleModel,
model: BaseUncleModel,
loss: Callable[[Tensor, Tensor, Tensor], object],
batch: Tensor,
) -> object:
Expand All @@ -60,7 +60,7 @@ def evaluate_loss(

Parameters
----------
model : UncleModel
model : BaseUncleModel
Model to evaluate.
loss : callable
func(flux, err, model_output) -> loss_value
Expand Down
2 changes: 2 additions & 0 deletions src/uncle_val/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .dp1_constant_magerr import run_dp1_constant_magerr
from .dp1_linear_flux_err import run_dp1_linear_flux_err
from .dp1_mlp import run_dp1_mlp
from .plotting import make_plots

__all__ = (
"make_plots",
"run_dp1_constant_magerr",
"run_dp1_linear_flux_err",
"run_dp1_mlp",
)
106 changes: 106 additions & 0 deletions src/uncle_val/pipelines/dp1_constant_magerr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from pathlib import Path

import torch

from uncle_val.datasets import dp1_catalog_single_band
from uncle_val.learning.losses import UncleLoss
from uncle_val.learning.models import ConstantMagErrModel
from uncle_val.pipelines.training_loop import training_loop


def run_dp1_constant_magerr(
*,
dp1_root: str | Path,
band: str,
non_extended_only: bool,
n_workers: int,
n_src: int,
n_lcs: int,
train_batch_size: int,
val_batch_size: int,
output_root: str | Path,
loss_fn: UncleLoss,
val_losses: dict[str, UncleLoss] | None = None,
start_tfboard: bool = False,
log_activations: bool = False,
snapshot_every: int = 128,
device: str | torch.device = "cpu",
) -> Path:
"""Run the training for DP1 with the linear model on fluxes and errors

Parameters
----------
dp1_root : str or Path
The root directory of the DP1 HATS catalogs.
band : str
Passband to train the model on.
non_extended_only : bool
Whether to filter out extended sources.
n_workers : int
Number of Dask workers to use.
n_src : int
Number of sources to use per light curve.
n_lcs : int
Number of light curves to train on.
train_batch_size : int
Batch size for training.
val_batch_size : int or None
Batch size for validation.
snapshot_every : int
Snapshot model and metrics every this much training batches.
loss_fn : UncleLoss
Loss function to use, by default soften Χ² is used.
val_losses : dict[str, UncleLoss] or None
Extra losses to compute on validation set and record, it maps name to
loss function. If None, an empty dictionary is used.
start_tfboard : bool
Whether to start a TensorBoard session.
log_activations : bool
Whether to log validation activations with TensorBoard session.
output_root : str or Path
Where to save the intermediate results.
device : str or torch.device, optional
Torch device to use for training, default is "cpu".

Returns
-------
Path
Path to the output model.
"""
catalog = dp1_catalog_single_band(
root=dp1_root,
band=band,
obj="science",
img="cal",
phot="PSF",
mode="forced",
)

if non_extended_only:
catalog = catalog.query("extendedness == 0.0")
catalog = catalog.map_partitions(lambda df: df[["id", "lc.x", "lc.err"]])

model = ConstantMagErrModel(["x", "err"]).to(device=device)

if val_losses is None:
val_losses = {}

return training_loop(
catalog=catalog,
columns=None,
model=model,
loss_fn=loss_fn,
val_losses=val_losses,
lr=3e-3,
n_workers=n_workers,
n_src=n_src,
n_lcs=n_lcs,
train_batch_size=train_batch_size,
val_batch_size=val_batch_size,
snapshot_every=snapshot_every,
output_root=output_root,
device=device,
start_tfboard=start_tfboard,
log_activations=log_activations,
model_name="constant_magerr",
)
4 changes: 2 additions & 2 deletions src/uncle_val/pipelines/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from scipy.stats import norm

from uncle_val.datasets.dp1 import dp1_catalog_multi_band
from uncle_val.learning.models import UncleModel
from uncle_val.learning.models import BaseUncleModel
from uncle_val.utils.hashing import uniform_hash
from uncle_val.whitening import whiten_data

Expand Down Expand Up @@ -309,7 +309,7 @@ def make_plots(
min_n_src: int,
non_extended_only: bool,
n_workers: int,
model_path: str | Path | UncleModel | None,
model_path: str | Path | BaseUncleModel | None,
model_columns: Sequence[str] = ("lc.x", "lc.err"),
n_samples: int,
device: torch.device | str = "cpu",
Expand Down
6 changes: 3 additions & 3 deletions src/uncle_val/pipelines/training_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from uncle_val.learning.losses import UncleLoss
from uncle_val.learning.lsdb_dataset import LSDBIterableDataset
from uncle_val.learning.models import UncleModel
from uncle_val.learning.models import BaseUncleModel
from uncle_val.learning.training import train_step
from uncle_val.pipelines.splits import TRAIN_SPLIT, VALIDATION_SPLIT
from uncle_val.pipelines.utils import _launch_tfboard
Expand Down Expand Up @@ -47,7 +47,7 @@ def training_loop(
*,
catalog: lsdb.Catalog,
columns: list[str] | None,
model: UncleModel,
model: BaseUncleModel,
n_workers: int,
n_src: int,
n_lcs: int,
Expand All @@ -71,7 +71,7 @@ def training_loop(
Catalog to train on.
columns : list[str]
Columns to train on.
model : UncleModel
model : BaseUncleModel
Model to train.
n_workers : int
Number of Dask workers to use.
Expand Down
Loading