From 7824a2f60377b1e1ade9fa7adca3fe9807293cfd Mon Sep 17 00:00:00 2001 From: Ben Mares Date: Wed, 4 Feb 2026 19:16:44 +0100 Subject: [PATCH 1/4] Add Generalized Pareto (GPD) and Extended GPD distributions - Add GenPareto distribution for peaks-over-threshold extreme value modeling - Add ExtGenPareto distribution following Naveau et al. (2016) for modeling entire distributions without threshold selection - ExtGPD uses CDF G(x) = H(x)^kappa where H is the GPD CDF and kappa > 0 controls lower tail behavior (reduces to GPD when kappa = 1) - Include comprehensive tests for logp, logcdf, support_point, and random sampling --- pymc_extras/distributions/__init__.py | 12 +- pymc_extras/distributions/continuous.py | 474 ++++++++++++++++++++++++ tests/distributions/test_continuous.py | 223 ++++++++++- 3 files changed, 706 insertions(+), 3 deletions(-) diff --git a/pymc_extras/distributions/__init__.py b/pymc_extras/distributions/__init__.py index 2afabad2b..65988c3dd 100644 --- a/pymc_extras/distributions/__init__.py +++ b/pymc_extras/distributions/__init__.py @@ -17,7 +17,13 @@ Experimental probability distributions for stochastic nodes in PyMC. """ -from pymc_extras.distributions.continuous import Chi, GenExtreme, Maxwell +from pymc_extras.distributions.continuous import ( + Chi, + ExtGenPareto, + GenExtreme, + GenPareto, + Maxwell, +) from pymc_extras.distributions.discrete import ( BetaNegativeBinomial, GeneralizedPoisson, @@ -29,14 +35,16 @@ from pymc_extras.distributions.transforms import PartialOrder __all__ = [ - "R2D2M2CP", "BetaNegativeBinomial", "Chi", "DiscreteMarkovChain", + "ExtGenPareto", "GenExtreme", + "GenPareto", "GeneralizedPoisson", "Maxwell", "PartialOrder", + "R2D2M2CP", "Skellam", "histogram_approximation", ] diff --git a/pymc_extras/distributions/continuous.py b/pymc_extras/distributions/continuous.py index 0bcda193e..d40f83561 100644 --- a/pymc_extras/distributions/continuous.py +++ b/pymc_extras/distributions/continuous.py @@ -34,6 +34,30 @@ from scipy import stats +class GenParetoRV(RandomVariable): + name: str = "Generalized Pareto" + signature = "(),(),()->()" + dtype: str = "floatX" + _print_name: tuple[str, str] = ("Generalized Pareto", "\\operatorname{GPD}") + + def __call__(self, mu=0.0, sigma=1.0, xi=0.0, size=None, **kwargs) -> TensorVariable: + return super().__call__(mu, sigma, xi, size=size, **kwargs) + + @classmethod + def rng_fn( + cls, + rng: np.random.RandomState | np.random.Generator, + mu: np.ndarray, + sigma: np.ndarray, + xi: np.ndarray, + size: tuple[int, ...], + ) -> np.ndarray: + return stats.genpareto.rvs(c=xi, loc=mu, scale=sigma, random_state=rng, size=size) + + +gpd = GenParetoRV() + + class GenExtremeRV(RandomVariable): name: str = "Generalized Extreme Value" signature = "(),(),()->()" @@ -219,6 +243,456 @@ def support_point(rv, size, mu, sigma, xi): return mode +class GenPareto(Continuous): + r""" + Univariate Generalized Pareto log-likelihood. + + The Generalized Pareto Distribution (GPD) is used for modeling exceedances over + a threshold in extreme value analysis. It is the natural distribution for + peaks-over-threshold modeling, complementing the GEV distribution which is used + for block maxima. + + The cdf of this distribution is + + .. math:: + + G(x \mid \mu, \sigma, \xi) = 1 - \left(1 + \xi \frac{x - \mu}{\sigma}\right)^{-\frac{1}{\xi}} + + for :math:`\xi \neq 0`, and + + .. math:: + + G(x \mid \mu, \sigma, \xi) = 1 - \exp\left(-\frac{x - \mu}{\sigma}\right) + + for :math:`\xi = 0`. + + .. plot:: + + import matplotlib.pyplot as plt + import numpy as np + import scipy.stats as st + import arviz as az + plt.style.use('arviz-darkgrid') + x = np.linspace(0, 10, 200) + mus = [0., 0., 0.] + sigmas = [1., 2., 1.] + xis = [0.0, 0.5, -0.5] + for mu, sigma, xi in zip(mus, sigmas, xis): + pdf = st.genpareto.pdf(x, c=xi, loc=mu, scale=sigma) + plt.plot(x, pdf, label=rf'$\mu$ = {mu}, $\sigma$ = {sigma}, $\xi$={xi}') + plt.xlabel('x', fontsize=12) + plt.ylabel('f(x)', fontsize=12) + plt.legend(loc=1) + plt.show() + + + ======== ========================================================================= + Support * :math:`x \geq \mu`, when :math:`\xi \geq 0` + * :math:`\mu \leq x \leq \mu - \sigma/\xi`, when :math:`\xi < 0` + Mean * :math:`\mu + \sigma/(1 - \xi)`, when :math:`\xi < 1` + * :math:`\infty`, when :math:`\xi \geq 1` + Variance * :math:`\sigma^2 / ((1-\xi)^2 (1-2\xi))`, when :math:`\xi < 0.5` + * :math:`\infty`, when :math:`\xi \geq 0.5` + ======== ========================================================================= + + Parameters + ---------- + mu : float + Location parameter (threshold). + sigma : float + Scale parameter (sigma > 0). + xi : float + Shape parameter. Controls tail behavior: + + * :math:`\xi > 0`: heavy tail (Pareto-like) + * :math:`\xi = 0`: exponential tail + * :math:`\xi < 0`: bounded upper tail + + References + ---------- + .. [1] Coles, S.G. (2001). + An Introduction to the Statistical Modeling of Extreme Values + Springer-Verlag, London + + .. [2] Pickands, J. (1975). + Statistical Inference Using Extreme Order Statistics. + Annals of Statistics, 3(1), 119-131. + + Examples + -------- + .. code-block:: python + + import pymc as pm + from pymc_extras.distributions import GenPareto + + # Peaks-over-threshold model + threshold = 10.0 + exceedances = data[data > threshold] - threshold + + with pm.Model(): + sigma = pm.HalfNormal("sigma", sigma=1) + xi = pm.Normal("xi", mu=0, sigma=0.5) + obs = GenPareto("obs", mu=0, sigma=sigma, xi=xi, observed=exceedances) + """ + + rv_op = gpd + + @classmethod + def dist(cls, mu=0, sigma=1, xi=0, **kwargs): + mu = pt.as_tensor_variable(floatX(mu)) + sigma = pt.as_tensor_variable(floatX(sigma)) + xi = pt.as_tensor_variable(floatX(xi)) + + return super().dist([mu, sigma, xi], **kwargs) + + def logp(value, mu, sigma, xi): + """ + Calculate log-probability of Generalized Pareto distribution + at specified value. + + Parameters + ---------- + value: numeric + Value(s) for which log-probability is calculated. If the log probabilities for multiple + values are desired the values must be provided in a numpy array or Pytensor tensor + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + logp_expression = pt.switch( + pt.isclose(xi, 0), + -pt.log(sigma) - scaled, + -pt.log(sigma) - (1 + 1 / xi) * pt.log1p(xi * scaled), + ) + + # Check support: scaled >= 0, and for xi < 0, also scaled <= -1/xi + in_support = pt.switch( + pt.ge(xi, 0), + pt.ge(scaled, 0), + pt.and_(pt.ge(scaled, 0), pt.le(scaled, -1 / xi)), + ) + + logp = pt.switch( + pt.and_(in_support, pt.gt(1 + xi * scaled, 0)), + logp_expression, + -np.inf, + ) + + return check_parameters(logp, sigma > 0, msg="sigma > 0") + + def logcdf(value, mu, sigma, xi): + """ + Compute the log of the cumulative distribution function for Generalized Pareto + distribution at the specified value. + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Value(s) for which log CDF is calculated. If the log CDF for + multiple values are desired the values must be provided in a numpy + array or `TensorVariable`. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + logcdf_expression = pt.switch( + pt.isclose(xi, 0), + pt.log1p(-pt.exp(-scaled)), + pt.log1p(-pt.pow(1 + xi * scaled, -1 / xi)), + ) + + # Handle bounds + logcdf = pt.switch( + pt.lt(scaled, 0), + -np.inf, + pt.switch( + pt.and_(pt.lt(xi, 0), pt.gt(scaled, -1 / xi)), + 0.0, # log(1) = 0 at upper bound for xi < 0 + logcdf_expression, + ), + ) + + return check_parameters(logcdf, sigma > 0, msg="sigma > 0") + + def support_point(rv, size, mu, sigma, xi): + r""" + Using the median as the support point, since mean can be infinite when :math:`\xi \geq 1`. + + Median = mu + sigma * (2^xi - 1) / xi for xi != 0 + Median = mu + sigma * log(2) for xi = 0 + """ + median = pt.switch( + pt.isclose(xi, 0), + mu + sigma * pt.log(2), + mu + sigma * (pt.pow(2, xi) - 1) / xi, + ) + if not rv_size_is_none(size): + median = pt.full(size, median) + return median + + +class ExtGenParetoRV(RandomVariable): + name: str = "Extended Generalized Pareto" + signature = "(),(),(),()->()" + dtype: str = "floatX" + _print_name: tuple[str, str] = ("Extended Generalized Pareto", "\\operatorname{ExtGPD}") + + def __call__(self, mu=0.0, sigma=1.0, xi=0.0, kappa=1.0, size=None, **kwargs) -> TensorVariable: + return super().__call__(mu, sigma, xi, kappa, size=size, **kwargs) + + @classmethod + def rng_fn( + cls, + rng: np.random.RandomState | np.random.Generator, + mu: np.ndarray, + sigma: np.ndarray, + xi: np.ndarray, + kappa: np.ndarray, + size: tuple[int, ...], + ) -> np.ndarray: + # Use inverse transform sampling: X = H^{-1}(U^{1/kappa}) + # where H^{-1} is the GPD quantile function + u = rng.uniform(size=size) + # Transform uniform: v = u^{1/kappa} gives CDF v^kappa + v = np.power(u, 1.0 / kappa) + # Apply GPD quantile function + return stats.genpareto.ppf(v, c=xi, loc=mu, scale=sigma) + + +ext_gpd = ExtGenParetoRV() + + +class ExtGenPareto(Continuous): + r""" + Univariate Extended Generalized Pareto log-likelihood. + + The Extended Generalized Pareto Distribution (ExtGPD) is a transformation of the + GPD that provides a smooth connection between the upper tail and the main body + of the function. This approach avoids the need for threshold specification and + helps in sampling the entire time series for modeling extremes. + + The cdf of this distribution is + + .. math:: + + G(x \mid \mu, \sigma, \xi, \kappa) = \left[H\left(\frac{x - \mu}{\sigma}\right)\right]^\kappa + + where :math:`H` is the GPD cdf: + + .. math:: + + H(z) = 1 - (1 + \xi z)^{-1/\xi} + + for :math:`\xi \neq 0`, and :math:`H(z) = 1 - e^{-z}` for :math:`\xi = 0`. + + The parameter :math:`\kappa > 0` controls the lower tail behavior, while + :math:`\xi` controls the upper tail behavior. When :math:`\kappa = 1`, + this reduces to the standard GPD. + + .. plot:: + + import matplotlib.pyplot as plt + import numpy as np + import scipy.stats as st + import arviz as az + plt.style.use('arviz-darkgrid') + x = np.linspace(0.01, 5, 200) + sigma, xi = 1.0, 0.2 + for kappa in [0.5, 1.0, 2.0, 5.0]: + # ExtGPD pdf: g(x) = kappa * H(x)^(kappa-1) * h(x) + H = st.genpareto.cdf(x, c=xi, loc=0, scale=sigma) + h = st.genpareto.pdf(x, c=xi, loc=0, scale=sigma) + g = kappa * np.power(H, kappa - 1) * h + plt.plot(x, g, label=rf'$\kappa$ = {kappa}') + plt.xlabel('x', fontsize=12) + plt.ylabel('f(x)', fontsize=12) + plt.title(rf'ExtGPD with $\sigma$={sigma}, $\xi$={xi}') + plt.legend(loc=1) + plt.show() + + + ======== ========================================================================= + Support * :math:`x \geq \mu`, when :math:`\xi \geq 0` + * :math:`\mu \leq x \leq \mu - \sigma/\xi`, when :math:`\xi < 0` + ======== ========================================================================= + + Parameters + ---------- + mu : float + Location parameter (threshold). + sigma : float + Scale parameter (sigma > 0). + xi : float + Shape parameter controlling upper tail behavior: + + * :math:`\xi > 0`: heavy tail (Pareto-like) + * :math:`\xi = 0`: exponential tail + * :math:`\xi < 0`: bounded upper tail + kappa : float + Lower tail parameter (kappa > 0). Controls the behavior near zero. + When kappa = 1, reduces to standard GPD. + + References + ---------- + .. [1] Naveau, P., Huser, R., Ribereau, P., and Hannart, A. (2016). + Modeling jointly low, moderate, and heavy rainfall intensities without + a threshold selection. Water Resources Research, 52(4), 2753-2769. + + .. [2] Papastathopoulos, I. and Tawn, J. A. (2013). + Extended generalised Pareto models for tail estimation. + Journal of Statistical Planning and Inference, 143(1), 131-143. + + Examples + -------- + .. code-block:: python + + import pymc as pm + from pymc_extras.distributions import ExtGenPareto + + # Model entire distribution without threshold selection + with pm.Model(): + sigma = pm.HalfNormal("sigma", sigma=1) + xi = pm.Normal("xi", mu=0, sigma=0.5) + kappa = pm.HalfNormal("kappa", sigma=1) + obs = ExtGenPareto("obs", mu=0, sigma=sigma, xi=xi, kappa=kappa, observed=data) + """ + + rv_op = ext_gpd + + @classmethod + def dist(cls, mu=0, sigma=1, xi=0, kappa=1, **kwargs): + mu = pt.as_tensor_variable(floatX(mu)) + sigma = pt.as_tensor_variable(floatX(sigma)) + xi = pt.as_tensor_variable(floatX(xi)) + kappa = pt.as_tensor_variable(floatX(kappa)) + + return super().dist([mu, sigma, xi, kappa], **kwargs) + + def logp(value, mu, sigma, xi, kappa): + """ + Calculate log-probability of Extended Generalized Pareto distribution + at specified value. + + The pdf is: g(x) = kappa * H(x)^(kappa-1) * h(x) + where H is the GPD cdf and h is the GPD pdf. + + log g(x) = log(kappa) + (kappa-1)*log(H(x)) + log(h(x)) + + Parameters + ---------- + value: numeric + Value(s) for which log-probability is calculated. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + # GPD cdf: H(z) = 1 - (1 + xi*z)^(-1/xi) for xi != 0 + # H(z) = 1 - exp(-z) for xi = 0 + H = pt.switch( + pt.isclose(xi, 0), + 1 - pt.exp(-scaled), + 1 - pt.pow(1 + xi * scaled, -1 / xi), + ) + + # GPD log-pdf: log h(z) = -log(sigma) - (1 + 1/xi)*log(1 + xi*z) for xi != 0 + # log h(z) = -log(sigma) - z for xi = 0 + log_h = pt.switch( + pt.isclose(xi, 0), + -pt.log(sigma) - scaled, + -pt.log(sigma) - (1 + 1 / xi) * pt.log1p(xi * scaled), + ) + + # ExtGPD log-pdf: log(kappa) + (kappa-1)*log(H) + log(h) + # Need to handle H near 0 carefully for numerical stability + logp_expression = pt.log(kappa) + (kappa - 1) * pt.log(H) + log_h + + # Check support: scaled >= 0, and for xi < 0, also scaled <= -1/xi + in_support = pt.switch( + pt.ge(xi, 0), + pt.ge(scaled, 0), + pt.and_(pt.ge(scaled, 0), pt.le(scaled, -1 / xi)), + ) + + # Also need H > 0 and 1 + xi*scaled > 0 + logp = pt.switch( + pt.and_(pt.and_(in_support, pt.gt(1 + xi * scaled, 0)), pt.gt(H, 0)), + logp_expression, + -np.inf, + ) + + return check_parameters(logp, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + + def logcdf(value, mu, sigma, xi, kappa): + """ + Compute the log of the cumulative distribution function for Extended + Generalized Pareto distribution at the specified value. + + G(x) = H(x)^kappa, so log G(x) = kappa * log(H(x)) + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Value(s) for which log CDF is calculated. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + # GPD cdf: H(z) + H = pt.switch( + pt.isclose(xi, 0), + 1 - pt.exp(-scaled), + 1 - pt.pow(1 + xi * scaled, -1 / xi), + ) + + # ExtGPD log-cdf: kappa * log(H) + logcdf_expression = kappa * pt.log(H) + + # Handle bounds + logcdf = pt.switch( + pt.lt(scaled, 0), + -np.inf, + pt.switch( + pt.and_(pt.lt(xi, 0), pt.gt(scaled, -1 / xi)), + 0.0, # log(1) = 0 at upper bound for xi < 0 + logcdf_expression, + ), + ) + + return check_parameters(logcdf, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + + def support_point(rv, size, mu, sigma, xi, kappa): + r""" + Using the median as the support point. + + For ExtGPD, the median satisfies G(m) = 0.5, i.e., H(m)^kappa = 0.5 + So H(m) = 0.5^(1/kappa) and m = H^{-1}(0.5^{1/kappa}) + + For GPD: H^{-1}(p) = sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 + H^{-1}(p) = -sigma * log(1-p) for xi = 0 + """ + p = pt.pow(0.5, 1 / kappa) # H(median) = 0.5^(1/kappa) + median = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - p), + mu + sigma * (pt.pow(1 - p, -xi) - 1) / xi, + ) + if not rv_size_is_none(size): + median = pt.full(size, median) + return median + + class Chi: r""" :math:`\chi` log-likelihood. diff --git a/tests/distributions/test_continuous.py b/tests/distributions/test_continuous.py index aec310eaa..061facf2d 100644 --- a/tests/distributions/test_continuous.py +++ b/tests/distributions/test_continuous.py @@ -34,7 +34,7 @@ ) # the distributions to be tested -from pymc_extras.distributions import Chi, GenExtreme, Maxwell +from pymc_extras.distributions import Chi, ExtGenPareto, GenExtreme, GenPareto, Maxwell pytestmark = pytest.mark.filterwarnings( "ignore:Numba will use object mode to run Generalized Extreme Value:UserWarning" @@ -143,6 +143,227 @@ class TestGenExtreme(BaseTestDistributionRandom): ] +class TestGenParetoClass: + """ + Wrapper class so that tests of experimental additions can be dropped into + PyMC directly on adoption. + """ + + def test_logp(self): + def ref_logp(value, mu, sigma, xi): + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return -np.inf + if 1 + xi * scaled <= 0: + return -np.inf + return sp.genpareto.logpdf(value, c=xi, loc=mu, scale=sigma) + + check_logp( + GenPareto, + R, + { + "mu": R, + "sigma": Rplusbig, + "xi": Domain([-0.5, -0.1, 0, 0.1, 0.5, 1]), + }, + ref_logp, + # GPD has no mathematical constraint on xi (unlike GEV), only sigma > 0 + skip_paramdomain_outside_edge_test=True, + ) + + def test_logcdf(self): + def ref_logcdf(value, mu, sigma, xi): + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return 0.0 # log(1) at upper bound + return sp.genpareto.logcdf(value, c=xi, loc=mu, scale=sigma) + + check_logcdf( + GenPareto, + R, + { + "mu": R, + "sigma": Rplusbig, + "xi": Domain([-0.5, -0.1, 0, 0.1, 0.5, 1]), + }, + ref_logcdf, + decimal=select_by_precision(float64=6, float32=2), + # GPD has no mathematical constraint on xi (unlike GEV), only sigma > 0 + skip_paramdomain_outside_edge_test=True, + ) + + @pytest.mark.parametrize( + "mu, sigma, xi, size, expected", + [ + (0, 1, 0, None, np.log(2)), # Exponential case: median = log(2) + (0, 1, 0.5, None, (2**0.5 - 1) / 0.5), # median = (2^xi - 1) / xi + (0, 1, -0.5, None, (2**-0.5 - 1) / -0.5), + (1, 2, 0, None, 1 + 2 * np.log(2)), # mu + sigma * log(2) + (0, 1, 0.5, 5, np.full(5, (2**0.5 - 1) / 0.5)), + ( + np.arange(3), + np.arange(1, 4), + 0.5, + (2, 3), + np.full((2, 3), np.arange(3) + np.arange(1, 4) * (2**0.5 - 1) / 0.5), + ), + ], + ) + def test_genpareto_support_point(self, mu, sigma, xi, size, expected): + with pm.Model() as model: + GenPareto("x", mu=mu, sigma=sigma, xi=xi, size=size) + assert_support_point_is_expected(model, expected) + + +class TestGenPareto(BaseTestDistributionRandom): + pymc_dist = GenPareto + pymc_dist_params = {"mu": 0, "sigma": 1, "xi": 0.1} + expected_rv_op_params = {"mu": 0, "sigma": 1, "xi": 0.1} + # GenPareto uses same xi sign convention as scipy + reference_dist_params = {"loc": 0, "scale": 1, "c": 0.1} + reference_dist = seeded_scipy_distribution_builder("genpareto") + tests_to_run = [ + "check_pymc_params_match_rv_op", + "check_pymc_draws_match_reference", + "check_rv_size", + ] + + +class TestExtGenParetoClass: + """ + Tests for the Extended Generalized Pareto Distribution (ExtGPD). + ExtGPD has CDF G(x) = H(x)^kappa where H is the GPD CDF. + """ + + def test_logp(self): + def ref_logp(value, mu, sigma, xi, kappa): + # ExtGPD pdf: g(x) = kappa * H(x)^(kappa-1) * h(x) + # where H is GPD CDF and h is GPD PDF + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return -np.inf + if 1 + xi * scaled <= 0: + return -np.inf + + H = sp.genpareto.cdf(value, c=xi, loc=mu, scale=sigma) + if H <= 0: + return -np.inf + log_h = sp.genpareto.logpdf(value, c=xi, loc=mu, scale=sigma) + return np.log(kappa) + (kappa - 1) * np.log(H) + log_h + + check_logp( + ExtGenPareto, + Rplus, + { + "mu": Domain([0, 0, 0, 0], edges=(0, 0)), + "sigma": Rplusbig, + "xi": Domain([-0.3, 0, 0.1, 0.5]), + "kappa": Domain([0.5, 1.0, 2.0, 5.0]), + }, + ref_logp, + skip_paramdomain_outside_edge_test=True, + ) + + def test_logcdf(self): + def ref_logcdf(value, mu, sigma, xi, kappa): + # ExtGPD CDF: G(x) = H(x)^kappa + # log CDF = kappa * log(H(x)) + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return 0.0 # log(1) at upper bound + + H = sp.genpareto.cdf(value, c=xi, loc=mu, scale=sigma) + if H <= 0: + return -np.inf + return kappa * np.log(H) + + check_logcdf( + ExtGenPareto, + Rplus, + { + "mu": Domain([0, 0, 0, 0], edges=(0, 0)), + "sigma": Rplusbig, + "xi": Domain([-0.3, 0, 0.1, 0.5]), + "kappa": Domain([0.5, 1.0, 2.0, 5.0]), + }, + ref_logcdf, + decimal=select_by_precision(float64=6, float32=2), + skip_paramdomain_outside_edge_test=True, + ) + + def test_kappa_one_equals_gpd(self): + """When kappa=1, ExtGPD should equal GPD.""" + + # Create ExtGPD with kappa=1 + ext_dist = ExtGenPareto.dist(mu=0, sigma=1, xi=0.2, kappa=1) + gpd_dist = GenPareto.dist(mu=0, sigma=1, xi=0.2) + + test_values = np.array([0.1, 0.5, 1.0, 2.0, 5.0]) + + ext_logp = pm.logp(ext_dist, test_values).eval() + gpd_logp = pm.logp(gpd_dist, test_values).eval() + + np.testing.assert_allclose(ext_logp, gpd_logp, rtol=1e-6) + + @pytest.mark.parametrize( + "mu, sigma, xi, kappa, size, expected", + [ + # kappa=1 should give GPD median: sigma * (2^xi - 1) / xi + (0, 1, 0.5, 1.0, None, (2**0.5 - 1) / 0.5), + # kappa=1, xi=0: exponential median = log(2) + (0, 1, 0, 1.0, None, np.log(2)), + # kappa=2: H(m) = 0.5^0.5, so m = sigma * [(1-0.5^0.5)^(-xi) - 1] / xi + (0, 1, 0.5, 2.0, None, ((1 - 0.5**0.5) ** (-0.5) - 1) / 0.5), + # With size + (0, 1, 0.5, 1.0, 5, np.full(5, (2**0.5 - 1) / 0.5)), + ], + ) + def test_extgenpareto_support_point(self, mu, sigma, xi, kappa, size, expected): + with pm.Model() as model: + ExtGenPareto("x", mu=mu, sigma=sigma, xi=xi, kappa=kappa, size=size) + assert_support_point_is_expected(model, expected) + + +class TestExtGenPareto(BaseTestDistributionRandom): + """Test random sampling for ExtGPD.""" + + pymc_dist = ExtGenPareto + pymc_dist_params = {"mu": 0, "sigma": 1, "xi": 0.1, "kappa": 2.0} + expected_rv_op_params = {"mu": 0, "sigma": 1, "xi": 0.1, "kappa": 2.0} + tests_to_run = [ + "check_pymc_params_match_rv_op", + "check_rv_size", + ] + + def test_random_samples_follow_distribution(self): + """Test that random samples follow the ExtGPD distribution using KS test.""" + rng = np.random.default_rng(42) + mu, sigma, xi, kappa = 0, 1, 0.2, 2.0 + + # Generate samples + dist = ExtGenPareto.dist(mu=mu, sigma=sigma, xi=xi, kappa=kappa) + samples = pm.draw(dist, draws=1000, random_seed=rng) + + # Define ExtGPD CDF for KS test + def ext_gpd_cdf(x, mu, sigma, xi, kappa): + H = sp.genpareto.cdf(x, c=xi, loc=mu, scale=sigma) + return np.power(H, kappa) + + # KS test + from scipy.stats import kstest + + stat, pvalue = kstest(samples, lambda x: ext_gpd_cdf(x, mu, sigma, xi, kappa)) + assert pvalue > 0.01, f"KS test failed with p-value {pvalue}" + + class TestChiClass: """ Wrapper class so that tests of experimental additions can be dropped into From 2929f635518a3efe9b137c33ad36da52c6c144d6 Mon Sep 17 00:00:00 2001 From: Ben Mares Date: Thu, 5 Feb 2026 00:03:57 +0100 Subject: [PATCH 2/4] Add icdf method to ExtGenPareto distribution This enables PyMC's Truncated distribution to use inverse CDF sampling instead of rejection sampling, which is critical when the truncation probability is high (e.g., when kappa is small and most mass is below the truncation threshold). The inverse CDF for ExtGPD is: G^{-1}(p) = H^{-1}(p^{1/kappa}) where H^{-1} is the GPD quantile function. --- pymc_extras/distributions/continuous.py | 37 ++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/pymc_extras/distributions/continuous.py b/pymc_extras/distributions/continuous.py index d40f83561..da96dcdb1 100644 --- a/pymc_extras/distributions/continuous.py +++ b/pymc_extras/distributions/continuous.py @@ -24,7 +24,11 @@ from pymc import ChiSquared, CustomDist from pymc.distributions import transforms -from pymc.distributions.dist_math import check_parameters +from pymc.distributions.dist_math import ( + check_icdf_parameters, + check_icdf_value, + check_parameters, +) from pymc.distributions.distribution import Continuous from pymc.distributions.shape_utils import rv_size_is_none from pymc.logprob.utils import CheckParameterValue @@ -672,6 +676,37 @@ def logcdf(value, mu, sigma, xi, kappa): return check_parameters(logcdf, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + def icdf(value, mu, sigma, xi, kappa): + """ + Compute the inverse CDF (quantile function) for Extended Generalized Pareto distribution. + + For ExtGPD: G(x) = H(x)^kappa, so G^{-1}(p) = H^{-1}(p^{1/kappa}) + + For GPD: H^{-1}(p) = mu + sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 + H^{-1}(p) = mu - sigma * log(1-p) for xi = 0 + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Probability value(s) in [0, 1] for which to compute quantiles. + + Returns + ------- + TensorVariable + """ + # Transform p to get the GPD quantile input: p_gpd = p^(1/kappa) + p_gpd = pt.pow(value, 1 / kappa) + + # GPD inverse CDF: H^{-1}(p) = mu + sigma * [(1-p)^(-xi) - 1] / xi + res = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - p_gpd), + mu + sigma * (pt.pow(1 - p_gpd, -xi) - 1) / xi, + ) + + res = check_icdf_value(res, value) + return check_icdf_parameters(res, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + def support_point(rv, size, mu, sigma, xi, kappa): r""" Using the median as the support point. From 5ae11b11ea2c741d6dd0d784b567aee12a7812e6 Mon Sep 17 00:00:00 2001 From: Ben Mares Date: Thu, 5 Feb 2026 14:07:26 +0100 Subject: [PATCH 3/4] refactor: convert GenParetoRV and ExtGenParetoRV to SymbolicRandomVariable Switch from scipy-based RandomVariable to symbolic inverse CDF sampling. This enables backend-agnostic random sampling (JAX, NumPy, etc.) by implementing the inverse CDF transformation directly in PyTensor. The inverse CDF for GPD is: - mu + sigma * [(1-u)^(-xi) - 1] / xi for xi != 0 - mu - sigma * log(1-u) for xi = 0 For ExtGPD, we use the transformation u^{1/kappa} before applying the GPD inverse CDF. Co-authored-by: Claude AI Co-authored-by: Cursor --- pymc_extras/distributions/continuous.py | 117 ++++++++++++++---------- 1 file changed, 71 insertions(+), 46 deletions(-) diff --git a/pymc_extras/distributions/continuous.py b/pymc_extras/distributions/continuous.py index da96dcdb1..ce14f6340 100644 --- a/pymc_extras/distributions/continuous.py +++ b/pymc_extras/distributions/continuous.py @@ -29,37 +29,52 @@ check_icdf_value, check_parameters, ) -from pymc.distributions.distribution import Continuous -from pymc.distributions.shape_utils import rv_size_is_none +from pymc.distributions.distribution import Continuous, SymbolicRandomVariable +from pymc.distributions.shape_utils import ( + implicit_size_from_params, + rv_size_is_none, +) from pymc.logprob.utils import CheckParameterValue -from pymc.pytensorf import floatX +from pymc.pytensorf import floatX, normalize_rng_param +from pytensor.tensor.random.basic import uniform from pytensor.tensor.random.op import RandomVariable +from pytensor.tensor.random.utils import normalize_size_param from pytensor.tensor.variable import TensorVariable from scipy import stats -class GenParetoRV(RandomVariable): - name: str = "Generalized Pareto" - signature = "(),(),()->()" - dtype: str = "floatX" - _print_name: tuple[str, str] = ("Generalized Pareto", "\\operatorname{GPD}") +class GenParetoRV(SymbolicRandomVariable): + """Symbolic random variable for Generalized Pareto Distribution using inverse CDF sampling.""" - def __call__(self, mu=0.0, sigma=1.0, xi=0.0, size=None, **kwargs) -> TensorVariable: - return super().__call__(mu, sigma, xi, size=size, **kwargs) + name = "genpareto" + extended_signature = "[rng],[size],(),(),()->[rng],()" + _print_name = ("Generalized Pareto", "\\operatorname{GPD}") @classmethod - def rng_fn( - cls, - rng: np.random.RandomState | np.random.Generator, - mu: np.ndarray, - sigma: np.ndarray, - xi: np.ndarray, - size: tuple[int, ...], - ) -> np.ndarray: - return stats.genpareto.rvs(c=xi, loc=mu, scale=sigma, random_state=rng, size=size) + def rv_op(cls, mu, sigma, xi, *, size=None, rng=None): + mu = pt.as_tensor(mu) + sigma = pt.as_tensor(sigma) + xi = pt.as_tensor(xi) + rng = normalize_rng_param(rng) + size = normalize_size_param(size) + if rv_size_is_none(size): + size = implicit_size_from_params(mu, sigma, xi, ndims_params=cls.ndims_params) + + next_rng, u = uniform(size=size, rng=rng).owner.outputs + + # GPD inverse CDF: mu + sigma * [(1-u)^(-xi) - 1] / xi for xi != 0 + # mu - sigma * log(1-u) for xi = 0 + draws = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - u), + mu + sigma * (pt.pow(1 - u, -xi) - 1) / xi, + ) -gpd = GenParetoRV() + return cls( + inputs=[rng, size, mu, sigma, xi], + outputs=[next_rng, draws], + )(rng, size, mu, sigma, xi) class GenExtremeRV(RandomVariable): @@ -339,7 +354,8 @@ class GenPareto(Continuous): obs = GenPareto("obs", mu=0, sigma=sigma, xi=xi, observed=exceedances) """ - rv_op = gpd + rv_type = GenParetoRV + rv_op = GenParetoRV.rv_op @classmethod def dist(cls, mu=0, sigma=1, xi=0, **kwargs): @@ -441,35 +457,43 @@ def support_point(rv, size, mu, sigma, xi): return median -class ExtGenParetoRV(RandomVariable): - name: str = "Extended Generalized Pareto" - signature = "(),(),(),()->()" - dtype: str = "floatX" - _print_name: tuple[str, str] = ("Extended Generalized Pareto", "\\operatorname{ExtGPD}") +class ExtGenParetoRV(SymbolicRandomVariable): + """Symbolic random variable for Extended Generalized Pareto using inverse CDF sampling.""" - def __call__(self, mu=0.0, sigma=1.0, xi=0.0, kappa=1.0, size=None, **kwargs) -> TensorVariable: - return super().__call__(mu, sigma, xi, kappa, size=size, **kwargs) + name = "extgenpareto" + extended_signature = "[rng],[size],(),(),(),()->[rng],()" + _print_name = ("Extended Generalized Pareto", "\\operatorname{ExtGPD}") @classmethod - def rng_fn( - cls, - rng: np.random.RandomState | np.random.Generator, - mu: np.ndarray, - sigma: np.ndarray, - xi: np.ndarray, - kappa: np.ndarray, - size: tuple[int, ...], - ) -> np.ndarray: - # Use inverse transform sampling: X = H^{-1}(U^{1/kappa}) - # where H^{-1} is the GPD quantile function - u = rng.uniform(size=size) - # Transform uniform: v = u^{1/kappa} gives CDF v^kappa - v = np.power(u, 1.0 / kappa) - # Apply GPD quantile function - return stats.genpareto.ppf(v, c=xi, loc=mu, scale=sigma) + def rv_op(cls, mu, sigma, xi, kappa, *, size=None, rng=None): + mu = pt.as_tensor(mu) + sigma = pt.as_tensor(sigma) + xi = pt.as_tensor(xi) + kappa = pt.as_tensor(kappa) + rng = normalize_rng_param(rng) + size = normalize_size_param(size) + if rv_size_is_none(size): + size = implicit_size_from_params(mu, sigma, xi, kappa, ndims_params=cls.ndims_params) + + next_rng, u = uniform(size=size, rng=rng).owner.outputs + + # ExtGPD inverse CDF: G^{-1}(p) = H^{-1}(p^{1/kappa}) + # Transform uniform: p_gpd = u^{1/kappa} + p_gpd = pt.pow(u, 1 / kappa) + + # GPD inverse CDF: mu + sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 + # mu - sigma * log(1-p) for xi = 0 + draws = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - p_gpd), + mu + sigma * (pt.pow(1 - p_gpd, -xi) - 1) / xi, + ) -ext_gpd = ExtGenParetoRV() + return cls( + inputs=[rng, size, mu, sigma, xi, kappa], + outputs=[next_rng, draws], + )(rng, size, mu, sigma, xi, kappa) class ExtGenPareto(Continuous): @@ -567,7 +591,8 @@ class ExtGenPareto(Continuous): obs = ExtGenPareto("obs", mu=0, sigma=sigma, xi=xi, kappa=kappa, observed=data) """ - rv_op = ext_gpd + rv_type = ExtGenParetoRV + rv_op = ExtGenParetoRV.rv_op @classmethod def dist(cls, mu=0, sigma=1, xi=0, kappa=1, **kwargs): From 660eeaa82fac544bccd1019699626809853d72a2 Mon Sep 17 00:00:00 2001 From: Ben Mares Date: Thu, 5 Feb 2026 14:39:38 +0100 Subject: [PATCH 4/4] Use numerically stable inverse survival function for GPD sampling - Add _exprel helper for (exp(t)-1)/t with correct gradient at t=0 - Use survival probability directly to avoid 1-u cancellation in upper tail - For ExtGPD, compute GPD survival probability via -expm1(log(u)/kappa) Co-authored-by: Cursor --- pymc_extras/distributions/continuous.py | 49 ++++++++++++++----------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/pymc_extras/distributions/continuous.py b/pymc_extras/distributions/continuous.py index ce14f6340..5e9128a93 100644 --- a/pymc_extras/distributions/continuous.py +++ b/pymc_extras/distributions/continuous.py @@ -43,8 +43,22 @@ from scipy import stats +def _exprel(t): + """Compute exprel(t) = (exp(t) - 1) / t with stable gradient at t=0.""" + taylor = 1 + t * (0.5 + t * (1 / 6 + t / 24)) + direct = pt.expm1(t) / t + return pt.switch(pt.abs(t) < 1e-10, taylor, direct) + + +def _gpd_isf(v, mu, sigma, xi): + """GPD inverse survival function: x such that P(X > x) = v.""" + log_v = pt.log(v) + t = -xi * log_v + return mu + sigma * (-log_v) * _exprel(t) + + class GenParetoRV(SymbolicRandomVariable): - """Symbolic random variable for Generalized Pareto Distribution using inverse CDF sampling.""" + """Generalized Pareto random variable.""" name = "genpareto" extended_signature = "[rng],[size],(),(),()->[rng],()" @@ -61,15 +75,9 @@ def rv_op(cls, mu, sigma, xi, *, size=None, rng=None): if rv_size_is_none(size): size = implicit_size_from_params(mu, sigma, xi, ndims_params=cls.ndims_params) - next_rng, u = uniform(size=size, rng=rng).owner.outputs - - # GPD inverse CDF: mu + sigma * [(1-u)^(-xi) - 1] / xi for xi != 0 - # mu - sigma * log(1-u) for xi = 0 - draws = pt.switch( - pt.isclose(xi, 0), - mu - sigma * pt.log(1 - u), - mu + sigma * (pt.pow(1 - u, -xi) - 1) / xi, - ) + # Sample v ~ Uniform(0,1) as survival probability (avoids 1-u cancellation) + next_rng, v = uniform(size=size, rng=rng).owner.outputs + draws = _gpd_isf(v, mu, sigma, xi) return cls( inputs=[rng, size, mu, sigma, xi], @@ -458,7 +466,7 @@ def support_point(rv, size, mu, sigma, xi): class ExtGenParetoRV(SymbolicRandomVariable): - """Symbolic random variable for Extended Generalized Pareto using inverse CDF sampling.""" + """Extended Generalized Pareto random variable.""" name = "extgenpareto" extended_signature = "[rng],[size],(),(),(),()->[rng],()" @@ -479,16 +487,15 @@ def rv_op(cls, mu, sigma, xi, kappa, *, size=None, rng=None): next_rng, u = uniform(size=size, rng=rng).owner.outputs # ExtGPD inverse CDF: G^{-1}(p) = H^{-1}(p^{1/kappa}) - # Transform uniform: p_gpd = u^{1/kappa} - p_gpd = pt.pow(u, 1 / kappa) - - # GPD inverse CDF: mu + sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 - # mu - sigma * log(1-p) for xi = 0 - draws = pt.switch( - pt.isclose(xi, 0), - mu - sigma * pt.log(1 - p_gpd), - mu + sigma * (pt.pow(1 - p_gpd, -xi) - 1) / xi, - ) + # where H is the GPD CDF. We need the GPD survival probability: + # v_gpd = 1 - p_gpd = 1 - u^{1/kappa} + # + # Compute v_gpd = 1 - u^{1/kappa} = 1 - exp(log(u)/kappa) = -expm1(log(u)/kappa) + # This avoids catastrophic cancellation when u^{1/kappa} is close to 1 + v_gpd = -pt.expm1(pt.log(u) / kappa) + + # Apply GPD inverse survival function + draws = _gpd_isf(v_gpd, mu, sigma, xi) return cls( inputs=[rng, size, mu, sigma, xi, kappa],