diff --git a/pymc_extras/distributions/__init__.py b/pymc_extras/distributions/__init__.py index 2afabad2b..65988c3dd 100644 --- a/pymc_extras/distributions/__init__.py +++ b/pymc_extras/distributions/__init__.py @@ -17,7 +17,13 @@ Experimental probability distributions for stochastic nodes in PyMC. """ -from pymc_extras.distributions.continuous import Chi, GenExtreme, Maxwell +from pymc_extras.distributions.continuous import ( + Chi, + ExtGenPareto, + GenExtreme, + GenPareto, + Maxwell, +) from pymc_extras.distributions.discrete import ( BetaNegativeBinomial, GeneralizedPoisson, @@ -29,14 +35,16 @@ from pymc_extras.distributions.transforms import PartialOrder __all__ = [ - "R2D2M2CP", "BetaNegativeBinomial", "Chi", "DiscreteMarkovChain", + "ExtGenPareto", "GenExtreme", + "GenPareto", "GeneralizedPoisson", "Maxwell", "PartialOrder", + "R2D2M2CP", "Skellam", "histogram_approximation", ] diff --git a/pymc_extras/distributions/continuous.py b/pymc_extras/distributions/continuous.py index 0bcda193e..5e9128a93 100644 --- a/pymc_extras/distributions/continuous.py +++ b/pymc_extras/distributions/continuous.py @@ -24,16 +24,67 @@ from pymc import ChiSquared, CustomDist from pymc.distributions import transforms -from pymc.distributions.dist_math import check_parameters -from pymc.distributions.distribution import Continuous -from pymc.distributions.shape_utils import rv_size_is_none +from pymc.distributions.dist_math import ( + check_icdf_parameters, + check_icdf_value, + check_parameters, +) +from pymc.distributions.distribution import Continuous, SymbolicRandomVariable +from pymc.distributions.shape_utils import ( + implicit_size_from_params, + rv_size_is_none, +) from pymc.logprob.utils import CheckParameterValue -from pymc.pytensorf import floatX +from pymc.pytensorf import floatX, normalize_rng_param +from pytensor.tensor.random.basic import uniform from pytensor.tensor.random.op import RandomVariable +from pytensor.tensor.random.utils import normalize_size_param from pytensor.tensor.variable import TensorVariable from scipy import stats +def _exprel(t): + """Compute exprel(t) = (exp(t) - 1) / t with stable gradient at t=0.""" + taylor = 1 + t * (0.5 + t * (1 / 6 + t / 24)) + direct = pt.expm1(t) / t + return pt.switch(pt.abs(t) < 1e-10, taylor, direct) + + +def _gpd_isf(v, mu, sigma, xi): + """GPD inverse survival function: x such that P(X > x) = v.""" + log_v = pt.log(v) + t = -xi * log_v + return mu + sigma * (-log_v) * _exprel(t) + + +class GenParetoRV(SymbolicRandomVariable): + """Generalized Pareto random variable.""" + + name = "genpareto" + extended_signature = "[rng],[size],(),(),()->[rng],()" + _print_name = ("Generalized Pareto", "\\operatorname{GPD}") + + @classmethod + def rv_op(cls, mu, sigma, xi, *, size=None, rng=None): + mu = pt.as_tensor(mu) + sigma = pt.as_tensor(sigma) + xi = pt.as_tensor(xi) + rng = normalize_rng_param(rng) + size = normalize_size_param(size) + + if rv_size_is_none(size): + size = implicit_size_from_params(mu, sigma, xi, ndims_params=cls.ndims_params) + + # Sample v ~ Uniform(0,1) as survival probability (avoids 1-u cancellation) + next_rng, v = uniform(size=size, rng=rng).owner.outputs + draws = _gpd_isf(v, mu, sigma, xi) + + return cls( + inputs=[rng, size, mu, sigma, xi], + outputs=[next_rng, draws], + )(rng, size, mu, sigma, xi) + + class GenExtremeRV(RandomVariable): name: str = "Generalized Extreme Value" signature = "(),(),()->()" @@ -219,6 +270,496 @@ def support_point(rv, size, mu, sigma, xi): return mode +class GenPareto(Continuous): + r""" + Univariate Generalized Pareto log-likelihood. + + The Generalized Pareto Distribution (GPD) is used for modeling exceedances over + a threshold in extreme value analysis. It is the natural distribution for + peaks-over-threshold modeling, complementing the GEV distribution which is used + for block maxima. + + The cdf of this distribution is + + .. math:: + + G(x \mid \mu, \sigma, \xi) = 1 - \left(1 + \xi \frac{x - \mu}{\sigma}\right)^{-\frac{1}{\xi}} + + for :math:`\xi \neq 0`, and + + .. math:: + + G(x \mid \mu, \sigma, \xi) = 1 - \exp\left(-\frac{x - \mu}{\sigma}\right) + + for :math:`\xi = 0`. + + .. plot:: + + import matplotlib.pyplot as plt + import numpy as np + import scipy.stats as st + import arviz as az + plt.style.use('arviz-darkgrid') + x = np.linspace(0, 10, 200) + mus = [0., 0., 0.] + sigmas = [1., 2., 1.] + xis = [0.0, 0.5, -0.5] + for mu, sigma, xi in zip(mus, sigmas, xis): + pdf = st.genpareto.pdf(x, c=xi, loc=mu, scale=sigma) + plt.plot(x, pdf, label=rf'$\mu$ = {mu}, $\sigma$ = {sigma}, $\xi$={xi}') + plt.xlabel('x', fontsize=12) + plt.ylabel('f(x)', fontsize=12) + plt.legend(loc=1) + plt.show() + + + ======== ========================================================================= + Support * :math:`x \geq \mu`, when :math:`\xi \geq 0` + * :math:`\mu \leq x \leq \mu - \sigma/\xi`, when :math:`\xi < 0` + Mean * :math:`\mu + \sigma/(1 - \xi)`, when :math:`\xi < 1` + * :math:`\infty`, when :math:`\xi \geq 1` + Variance * :math:`\sigma^2 / ((1-\xi)^2 (1-2\xi))`, when :math:`\xi < 0.5` + * :math:`\infty`, when :math:`\xi \geq 0.5` + ======== ========================================================================= + + Parameters + ---------- + mu : float + Location parameter (threshold). + sigma : float + Scale parameter (sigma > 0). + xi : float + Shape parameter. Controls tail behavior: + + * :math:`\xi > 0`: heavy tail (Pareto-like) + * :math:`\xi = 0`: exponential tail + * :math:`\xi < 0`: bounded upper tail + + References + ---------- + .. [1] Coles, S.G. (2001). + An Introduction to the Statistical Modeling of Extreme Values + Springer-Verlag, London + + .. [2] Pickands, J. (1975). + Statistical Inference Using Extreme Order Statistics. + Annals of Statistics, 3(1), 119-131. + + Examples + -------- + .. code-block:: python + + import pymc as pm + from pymc_extras.distributions import GenPareto + + # Peaks-over-threshold model + threshold = 10.0 + exceedances = data[data > threshold] - threshold + + with pm.Model(): + sigma = pm.HalfNormal("sigma", sigma=1) + xi = pm.Normal("xi", mu=0, sigma=0.5) + obs = GenPareto("obs", mu=0, sigma=sigma, xi=xi, observed=exceedances) + """ + + rv_type = GenParetoRV + rv_op = GenParetoRV.rv_op + + @classmethod + def dist(cls, mu=0, sigma=1, xi=0, **kwargs): + mu = pt.as_tensor_variable(floatX(mu)) + sigma = pt.as_tensor_variable(floatX(sigma)) + xi = pt.as_tensor_variable(floatX(xi)) + + return super().dist([mu, sigma, xi], **kwargs) + + def logp(value, mu, sigma, xi): + """ + Calculate log-probability of Generalized Pareto distribution + at specified value. + + Parameters + ---------- + value: numeric + Value(s) for which log-probability is calculated. If the log probabilities for multiple + values are desired the values must be provided in a numpy array or Pytensor tensor + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + logp_expression = pt.switch( + pt.isclose(xi, 0), + -pt.log(sigma) - scaled, + -pt.log(sigma) - (1 + 1 / xi) * pt.log1p(xi * scaled), + ) + + # Check support: scaled >= 0, and for xi < 0, also scaled <= -1/xi + in_support = pt.switch( + pt.ge(xi, 0), + pt.ge(scaled, 0), + pt.and_(pt.ge(scaled, 0), pt.le(scaled, -1 / xi)), + ) + + logp = pt.switch( + pt.and_(in_support, pt.gt(1 + xi * scaled, 0)), + logp_expression, + -np.inf, + ) + + return check_parameters(logp, sigma > 0, msg="sigma > 0") + + def logcdf(value, mu, sigma, xi): + """ + Compute the log of the cumulative distribution function for Generalized Pareto + distribution at the specified value. + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Value(s) for which log CDF is calculated. If the log CDF for + multiple values are desired the values must be provided in a numpy + array or `TensorVariable`. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + logcdf_expression = pt.switch( + pt.isclose(xi, 0), + pt.log1p(-pt.exp(-scaled)), + pt.log1p(-pt.pow(1 + xi * scaled, -1 / xi)), + ) + + # Handle bounds + logcdf = pt.switch( + pt.lt(scaled, 0), + -np.inf, + pt.switch( + pt.and_(pt.lt(xi, 0), pt.gt(scaled, -1 / xi)), + 0.0, # log(1) = 0 at upper bound for xi < 0 + logcdf_expression, + ), + ) + + return check_parameters(logcdf, sigma > 0, msg="sigma > 0") + + def support_point(rv, size, mu, sigma, xi): + r""" + Using the median as the support point, since mean can be infinite when :math:`\xi \geq 1`. + + Median = mu + sigma * (2^xi - 1) / xi for xi != 0 + Median = mu + sigma * log(2) for xi = 0 + """ + median = pt.switch( + pt.isclose(xi, 0), + mu + sigma * pt.log(2), + mu + sigma * (pt.pow(2, xi) - 1) / xi, + ) + if not rv_size_is_none(size): + median = pt.full(size, median) + return median + + +class ExtGenParetoRV(SymbolicRandomVariable): + """Extended Generalized Pareto random variable.""" + + name = "extgenpareto" + extended_signature = "[rng],[size],(),(),(),()->[rng],()" + _print_name = ("Extended Generalized Pareto", "\\operatorname{ExtGPD}") + + @classmethod + def rv_op(cls, mu, sigma, xi, kappa, *, size=None, rng=None): + mu = pt.as_tensor(mu) + sigma = pt.as_tensor(sigma) + xi = pt.as_tensor(xi) + kappa = pt.as_tensor(kappa) + rng = normalize_rng_param(rng) + size = normalize_size_param(size) + + if rv_size_is_none(size): + size = implicit_size_from_params(mu, sigma, xi, kappa, ndims_params=cls.ndims_params) + + next_rng, u = uniform(size=size, rng=rng).owner.outputs + + # ExtGPD inverse CDF: G^{-1}(p) = H^{-1}(p^{1/kappa}) + # where H is the GPD CDF. We need the GPD survival probability: + # v_gpd = 1 - p_gpd = 1 - u^{1/kappa} + # + # Compute v_gpd = 1 - u^{1/kappa} = 1 - exp(log(u)/kappa) = -expm1(log(u)/kappa) + # This avoids catastrophic cancellation when u^{1/kappa} is close to 1 + v_gpd = -pt.expm1(pt.log(u) / kappa) + + # Apply GPD inverse survival function + draws = _gpd_isf(v_gpd, mu, sigma, xi) + + return cls( + inputs=[rng, size, mu, sigma, xi, kappa], + outputs=[next_rng, draws], + )(rng, size, mu, sigma, xi, kappa) + + +class ExtGenPareto(Continuous): + r""" + Univariate Extended Generalized Pareto log-likelihood. + + The Extended Generalized Pareto Distribution (ExtGPD) is a transformation of the + GPD that provides a smooth connection between the upper tail and the main body + of the function. This approach avoids the need for threshold specification and + helps in sampling the entire time series for modeling extremes. + + The cdf of this distribution is + + .. math:: + + G(x \mid \mu, \sigma, \xi, \kappa) = \left[H\left(\frac{x - \mu}{\sigma}\right)\right]^\kappa + + where :math:`H` is the GPD cdf: + + .. math:: + + H(z) = 1 - (1 + \xi z)^{-1/\xi} + + for :math:`\xi \neq 0`, and :math:`H(z) = 1 - e^{-z}` for :math:`\xi = 0`. + + The parameter :math:`\kappa > 0` controls the lower tail behavior, while + :math:`\xi` controls the upper tail behavior. When :math:`\kappa = 1`, + this reduces to the standard GPD. + + .. plot:: + + import matplotlib.pyplot as plt + import numpy as np + import scipy.stats as st + import arviz as az + plt.style.use('arviz-darkgrid') + x = np.linspace(0.01, 5, 200) + sigma, xi = 1.0, 0.2 + for kappa in [0.5, 1.0, 2.0, 5.0]: + # ExtGPD pdf: g(x) = kappa * H(x)^(kappa-1) * h(x) + H = st.genpareto.cdf(x, c=xi, loc=0, scale=sigma) + h = st.genpareto.pdf(x, c=xi, loc=0, scale=sigma) + g = kappa * np.power(H, kappa - 1) * h + plt.plot(x, g, label=rf'$\kappa$ = {kappa}') + plt.xlabel('x', fontsize=12) + plt.ylabel('f(x)', fontsize=12) + plt.title(rf'ExtGPD with $\sigma$={sigma}, $\xi$={xi}') + plt.legend(loc=1) + plt.show() + + + ======== ========================================================================= + Support * :math:`x \geq \mu`, when :math:`\xi \geq 0` + * :math:`\mu \leq x \leq \mu - \sigma/\xi`, when :math:`\xi < 0` + ======== ========================================================================= + + Parameters + ---------- + mu : float + Location parameter (threshold). + sigma : float + Scale parameter (sigma > 0). + xi : float + Shape parameter controlling upper tail behavior: + + * :math:`\xi > 0`: heavy tail (Pareto-like) + * :math:`\xi = 0`: exponential tail + * :math:`\xi < 0`: bounded upper tail + kappa : float + Lower tail parameter (kappa > 0). Controls the behavior near zero. + When kappa = 1, reduces to standard GPD. + + References + ---------- + .. [1] Naveau, P., Huser, R., Ribereau, P., and Hannart, A. (2016). + Modeling jointly low, moderate, and heavy rainfall intensities without + a threshold selection. Water Resources Research, 52(4), 2753-2769. + + .. [2] Papastathopoulos, I. and Tawn, J. A. (2013). + Extended generalised Pareto models for tail estimation. + Journal of Statistical Planning and Inference, 143(1), 131-143. + + Examples + -------- + .. code-block:: python + + import pymc as pm + from pymc_extras.distributions import ExtGenPareto + + # Model entire distribution without threshold selection + with pm.Model(): + sigma = pm.HalfNormal("sigma", sigma=1) + xi = pm.Normal("xi", mu=0, sigma=0.5) + kappa = pm.HalfNormal("kappa", sigma=1) + obs = ExtGenPareto("obs", mu=0, sigma=sigma, xi=xi, kappa=kappa, observed=data) + """ + + rv_type = ExtGenParetoRV + rv_op = ExtGenParetoRV.rv_op + + @classmethod + def dist(cls, mu=0, sigma=1, xi=0, kappa=1, **kwargs): + mu = pt.as_tensor_variable(floatX(mu)) + sigma = pt.as_tensor_variable(floatX(sigma)) + xi = pt.as_tensor_variable(floatX(xi)) + kappa = pt.as_tensor_variable(floatX(kappa)) + + return super().dist([mu, sigma, xi, kappa], **kwargs) + + def logp(value, mu, sigma, xi, kappa): + """ + Calculate log-probability of Extended Generalized Pareto distribution + at specified value. + + The pdf is: g(x) = kappa * H(x)^(kappa-1) * h(x) + where H is the GPD cdf and h is the GPD pdf. + + log g(x) = log(kappa) + (kappa-1)*log(H(x)) + log(h(x)) + + Parameters + ---------- + value: numeric + Value(s) for which log-probability is calculated. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + # GPD cdf: H(z) = 1 - (1 + xi*z)^(-1/xi) for xi != 0 + # H(z) = 1 - exp(-z) for xi = 0 + H = pt.switch( + pt.isclose(xi, 0), + 1 - pt.exp(-scaled), + 1 - pt.pow(1 + xi * scaled, -1 / xi), + ) + + # GPD log-pdf: log h(z) = -log(sigma) - (1 + 1/xi)*log(1 + xi*z) for xi != 0 + # log h(z) = -log(sigma) - z for xi = 0 + log_h = pt.switch( + pt.isclose(xi, 0), + -pt.log(sigma) - scaled, + -pt.log(sigma) - (1 + 1 / xi) * pt.log1p(xi * scaled), + ) + + # ExtGPD log-pdf: log(kappa) + (kappa-1)*log(H) + log(h) + # Need to handle H near 0 carefully for numerical stability + logp_expression = pt.log(kappa) + (kappa - 1) * pt.log(H) + log_h + + # Check support: scaled >= 0, and for xi < 0, also scaled <= -1/xi + in_support = pt.switch( + pt.ge(xi, 0), + pt.ge(scaled, 0), + pt.and_(pt.ge(scaled, 0), pt.le(scaled, -1 / xi)), + ) + + # Also need H > 0 and 1 + xi*scaled > 0 + logp = pt.switch( + pt.and_(pt.and_(in_support, pt.gt(1 + xi * scaled, 0)), pt.gt(H, 0)), + logp_expression, + -np.inf, + ) + + return check_parameters(logp, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + + def logcdf(value, mu, sigma, xi, kappa): + """ + Compute the log of the cumulative distribution function for Extended + Generalized Pareto distribution at the specified value. + + G(x) = H(x)^kappa, so log G(x) = kappa * log(H(x)) + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Value(s) for which log CDF is calculated. + + Returns + ------- + TensorVariable + """ + scaled = (value - mu) / sigma + + # GPD cdf: H(z) + H = pt.switch( + pt.isclose(xi, 0), + 1 - pt.exp(-scaled), + 1 - pt.pow(1 + xi * scaled, -1 / xi), + ) + + # ExtGPD log-cdf: kappa * log(H) + logcdf_expression = kappa * pt.log(H) + + # Handle bounds + logcdf = pt.switch( + pt.lt(scaled, 0), + -np.inf, + pt.switch( + pt.and_(pt.lt(xi, 0), pt.gt(scaled, -1 / xi)), + 0.0, # log(1) = 0 at upper bound for xi < 0 + logcdf_expression, + ), + ) + + return check_parameters(logcdf, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + + def icdf(value, mu, sigma, xi, kappa): + """ + Compute the inverse CDF (quantile function) for Extended Generalized Pareto distribution. + + For ExtGPD: G(x) = H(x)^kappa, so G^{-1}(p) = H^{-1}(p^{1/kappa}) + + For GPD: H^{-1}(p) = mu + sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 + H^{-1}(p) = mu - sigma * log(1-p) for xi = 0 + + Parameters + ---------- + value: numeric or np.ndarray or `TensorVariable` + Probability value(s) in [0, 1] for which to compute quantiles. + + Returns + ------- + TensorVariable + """ + # Transform p to get the GPD quantile input: p_gpd = p^(1/kappa) + p_gpd = pt.pow(value, 1 / kappa) + + # GPD inverse CDF: H^{-1}(p) = mu + sigma * [(1-p)^(-xi) - 1] / xi + res = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - p_gpd), + mu + sigma * (pt.pow(1 - p_gpd, -xi) - 1) / xi, + ) + + res = check_icdf_value(res, value) + return check_icdf_parameters(res, sigma > 0, kappa > 0, msg="sigma > 0, kappa > 0") + + def support_point(rv, size, mu, sigma, xi, kappa): + r""" + Using the median as the support point. + + For ExtGPD, the median satisfies G(m) = 0.5, i.e., H(m)^kappa = 0.5 + So H(m) = 0.5^(1/kappa) and m = H^{-1}(0.5^{1/kappa}) + + For GPD: H^{-1}(p) = sigma * [(1-p)^(-xi) - 1] / xi for xi != 0 + H^{-1}(p) = -sigma * log(1-p) for xi = 0 + """ + p = pt.pow(0.5, 1 / kappa) # H(median) = 0.5^(1/kappa) + median = pt.switch( + pt.isclose(xi, 0), + mu - sigma * pt.log(1 - p), + mu + sigma * (pt.pow(1 - p, -xi) - 1) / xi, + ) + if not rv_size_is_none(size): + median = pt.full(size, median) + return median + + class Chi: r""" :math:`\chi` log-likelihood. diff --git a/tests/distributions/test_continuous.py b/tests/distributions/test_continuous.py index aec310eaa..061facf2d 100644 --- a/tests/distributions/test_continuous.py +++ b/tests/distributions/test_continuous.py @@ -34,7 +34,7 @@ ) # the distributions to be tested -from pymc_extras.distributions import Chi, GenExtreme, Maxwell +from pymc_extras.distributions import Chi, ExtGenPareto, GenExtreme, GenPareto, Maxwell pytestmark = pytest.mark.filterwarnings( "ignore:Numba will use object mode to run Generalized Extreme Value:UserWarning" @@ -143,6 +143,227 @@ class TestGenExtreme(BaseTestDistributionRandom): ] +class TestGenParetoClass: + """ + Wrapper class so that tests of experimental additions can be dropped into + PyMC directly on adoption. + """ + + def test_logp(self): + def ref_logp(value, mu, sigma, xi): + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return -np.inf + if 1 + xi * scaled <= 0: + return -np.inf + return sp.genpareto.logpdf(value, c=xi, loc=mu, scale=sigma) + + check_logp( + GenPareto, + R, + { + "mu": R, + "sigma": Rplusbig, + "xi": Domain([-0.5, -0.1, 0, 0.1, 0.5, 1]), + }, + ref_logp, + # GPD has no mathematical constraint on xi (unlike GEV), only sigma > 0 + skip_paramdomain_outside_edge_test=True, + ) + + def test_logcdf(self): + def ref_logcdf(value, mu, sigma, xi): + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return 0.0 # log(1) at upper bound + return sp.genpareto.logcdf(value, c=xi, loc=mu, scale=sigma) + + check_logcdf( + GenPareto, + R, + { + "mu": R, + "sigma": Rplusbig, + "xi": Domain([-0.5, -0.1, 0, 0.1, 0.5, 1]), + }, + ref_logcdf, + decimal=select_by_precision(float64=6, float32=2), + # GPD has no mathematical constraint on xi (unlike GEV), only sigma > 0 + skip_paramdomain_outside_edge_test=True, + ) + + @pytest.mark.parametrize( + "mu, sigma, xi, size, expected", + [ + (0, 1, 0, None, np.log(2)), # Exponential case: median = log(2) + (0, 1, 0.5, None, (2**0.5 - 1) / 0.5), # median = (2^xi - 1) / xi + (0, 1, -0.5, None, (2**-0.5 - 1) / -0.5), + (1, 2, 0, None, 1 + 2 * np.log(2)), # mu + sigma * log(2) + (0, 1, 0.5, 5, np.full(5, (2**0.5 - 1) / 0.5)), + ( + np.arange(3), + np.arange(1, 4), + 0.5, + (2, 3), + np.full((2, 3), np.arange(3) + np.arange(1, 4) * (2**0.5 - 1) / 0.5), + ), + ], + ) + def test_genpareto_support_point(self, mu, sigma, xi, size, expected): + with pm.Model() as model: + GenPareto("x", mu=mu, sigma=sigma, xi=xi, size=size) + assert_support_point_is_expected(model, expected) + + +class TestGenPareto(BaseTestDistributionRandom): + pymc_dist = GenPareto + pymc_dist_params = {"mu": 0, "sigma": 1, "xi": 0.1} + expected_rv_op_params = {"mu": 0, "sigma": 1, "xi": 0.1} + # GenPareto uses same xi sign convention as scipy + reference_dist_params = {"loc": 0, "scale": 1, "c": 0.1} + reference_dist = seeded_scipy_distribution_builder("genpareto") + tests_to_run = [ + "check_pymc_params_match_rv_op", + "check_pymc_draws_match_reference", + "check_rv_size", + ] + + +class TestExtGenParetoClass: + """ + Tests for the Extended Generalized Pareto Distribution (ExtGPD). + ExtGPD has CDF G(x) = H(x)^kappa where H is the GPD CDF. + """ + + def test_logp(self): + def ref_logp(value, mu, sigma, xi, kappa): + # ExtGPD pdf: g(x) = kappa * H(x)^(kappa-1) * h(x) + # where H is GPD CDF and h is GPD PDF + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return -np.inf + if 1 + xi * scaled <= 0: + return -np.inf + + H = sp.genpareto.cdf(value, c=xi, loc=mu, scale=sigma) + if H <= 0: + return -np.inf + log_h = sp.genpareto.logpdf(value, c=xi, loc=mu, scale=sigma) + return np.log(kappa) + (kappa - 1) * np.log(H) + log_h + + check_logp( + ExtGenPareto, + Rplus, + { + "mu": Domain([0, 0, 0, 0], edges=(0, 0)), + "sigma": Rplusbig, + "xi": Domain([-0.3, 0, 0.1, 0.5]), + "kappa": Domain([0.5, 1.0, 2.0, 5.0]), + }, + ref_logp, + skip_paramdomain_outside_edge_test=True, + ) + + def test_logcdf(self): + def ref_logcdf(value, mu, sigma, xi, kappa): + # ExtGPD CDF: G(x) = H(x)^kappa + # log CDF = kappa * log(H(x)) + scaled = (value - mu) / sigma + if scaled < 0: + return -np.inf + if xi < 0 and scaled > -1 / xi: + return 0.0 # log(1) at upper bound + + H = sp.genpareto.cdf(value, c=xi, loc=mu, scale=sigma) + if H <= 0: + return -np.inf + return kappa * np.log(H) + + check_logcdf( + ExtGenPareto, + Rplus, + { + "mu": Domain([0, 0, 0, 0], edges=(0, 0)), + "sigma": Rplusbig, + "xi": Domain([-0.3, 0, 0.1, 0.5]), + "kappa": Domain([0.5, 1.0, 2.0, 5.0]), + }, + ref_logcdf, + decimal=select_by_precision(float64=6, float32=2), + skip_paramdomain_outside_edge_test=True, + ) + + def test_kappa_one_equals_gpd(self): + """When kappa=1, ExtGPD should equal GPD.""" + + # Create ExtGPD with kappa=1 + ext_dist = ExtGenPareto.dist(mu=0, sigma=1, xi=0.2, kappa=1) + gpd_dist = GenPareto.dist(mu=0, sigma=1, xi=0.2) + + test_values = np.array([0.1, 0.5, 1.0, 2.0, 5.0]) + + ext_logp = pm.logp(ext_dist, test_values).eval() + gpd_logp = pm.logp(gpd_dist, test_values).eval() + + np.testing.assert_allclose(ext_logp, gpd_logp, rtol=1e-6) + + @pytest.mark.parametrize( + "mu, sigma, xi, kappa, size, expected", + [ + # kappa=1 should give GPD median: sigma * (2^xi - 1) / xi + (0, 1, 0.5, 1.0, None, (2**0.5 - 1) / 0.5), + # kappa=1, xi=0: exponential median = log(2) + (0, 1, 0, 1.0, None, np.log(2)), + # kappa=2: H(m) = 0.5^0.5, so m = sigma * [(1-0.5^0.5)^(-xi) - 1] / xi + (0, 1, 0.5, 2.0, None, ((1 - 0.5**0.5) ** (-0.5) - 1) / 0.5), + # With size + (0, 1, 0.5, 1.0, 5, np.full(5, (2**0.5 - 1) / 0.5)), + ], + ) + def test_extgenpareto_support_point(self, mu, sigma, xi, kappa, size, expected): + with pm.Model() as model: + ExtGenPareto("x", mu=mu, sigma=sigma, xi=xi, kappa=kappa, size=size) + assert_support_point_is_expected(model, expected) + + +class TestExtGenPareto(BaseTestDistributionRandom): + """Test random sampling for ExtGPD.""" + + pymc_dist = ExtGenPareto + pymc_dist_params = {"mu": 0, "sigma": 1, "xi": 0.1, "kappa": 2.0} + expected_rv_op_params = {"mu": 0, "sigma": 1, "xi": 0.1, "kappa": 2.0} + tests_to_run = [ + "check_pymc_params_match_rv_op", + "check_rv_size", + ] + + def test_random_samples_follow_distribution(self): + """Test that random samples follow the ExtGPD distribution using KS test.""" + rng = np.random.default_rng(42) + mu, sigma, xi, kappa = 0, 1, 0.2, 2.0 + + # Generate samples + dist = ExtGenPareto.dist(mu=mu, sigma=sigma, xi=xi, kappa=kappa) + samples = pm.draw(dist, draws=1000, random_seed=rng) + + # Define ExtGPD CDF for KS test + def ext_gpd_cdf(x, mu, sigma, xi, kappa): + H = sp.genpareto.cdf(x, c=xi, loc=mu, scale=sigma) + return np.power(H, kappa) + + # KS test + from scipy.stats import kstest + + stat, pvalue = kstest(samples, lambda x: ext_gpd_cdf(x, mu, sigma, xi, kappa)) + assert pvalue > 0.01, f"KS test failed with p-value {pvalue}" + + class TestChiClass: """ Wrapper class so that tests of experimental additions can be dropped into