From 79549901b0b718d6efa4c172a0e501d53ca82ec9 Mon Sep 17 00:00:00 2001 From: "Brandon T. Willard" Date: Tue, 18 May 2021 23:46:40 -0500 Subject: [PATCH] Convert custom Distributions to PyMC v4 format --- pymc3_hmm/distributions.py | 623 +++++++++++++++++------------------- pymc3_hmm/step_methods.py | 5 +- setup.py | 2 +- tests/test_distributions.py | 260 ++++++--------- 4 files changed, 399 insertions(+), 491 deletions(-) diff --git a/pymc3_hmm/distributions.py b/pymc3_hmm/distributions.py index 4554beb..50c81df 100644 --- a/pymc3_hmm/distributions.py +++ b/pymc3_hmm/distributions.py @@ -1,108 +1,142 @@ -import warnings +from copy import copy +from typing import Sequence +import aesara +import aesara.tensor as at import numpy as np +import pymc3 as pm +from aesara.compile.builders import OpFromGraph +from aesara.graph.basic import Constant + +# from aesara.compile.ops import ViewOp +from aesara.graph.fg import FunctionGraph +from aesara.scalar import upcast +from aesara.tensor.basic import make_vector +from aesara.tensor.extra_ops import broadcast_shape +from aesara.tensor.random.basic import categorical +from aesara.tensor.random.op import RandomVariable +from aesara.tensor.random.opt import local_subtensor_rv_lift +from aesara.tensor.random.utils import normalize_size_param +from aesara.tensor.var import TensorVariable +from pymc3.distributions.logp import _logp, logpt + +from pymc3_hmm.utils import tt_expand_dims + + +def naive_bcast_rv_lift(node): + """Lift a `BroadcastTo` through a `RandomVariable` `Op`. + + XXX: This implementation simply broadcasts the `RandomVariable`'s + parameters, which won't always work (e.g. multivariate distributions). + + TODO: Instead, it should use `RandomVariable.ndim_supp`--and the like--to + determine which dimensions of each parameter need to be broadcasted. + Also, this doesn't need to remove `size` to perform the lifting, like it + currently does. + """ + from aesara.graph.op import compute_test_value + from aesara.tensor.extra_ops import BroadcastTo + from aesara.tensor.random.opt import lift_rv_shapes -try: # pragma: no cover - import aesara - import aesara.tensor as at - from aesara.graph.op import get_test_value - from aesara.graph.utils import TestValueError - from aesara.scalar import upcast - from aesara.tensor.extra_ops import broadcast_to as at_broadcast_to -except ImportError: # pragma: no cover - import theano as aesara - import theano.tensor as at - from theano.graph.op import get_test_value - from theano.graph.utils import TestValueError - from theano.scalar import upcast - from theano.tensor.extra_ops import broadcast_to as at_broadcast_to + if not ( + isinstance(node.op, BroadcastTo) + and node.inputs[0].owner + and isinstance(node.inputs[0].owner.op, RandomVariable) + ): + return -import pymc3 as pm -from pymc3.distributions.distribution import _DrawValuesContext, draw_values -from pymc3.distributions.mixture import _conversion_map, all_discrete - -from pymc3_hmm.utils import tt_broadcast_arrays, tt_expand_dims, vsearchsorted - - -def distribution_subset_args(dist, shape, idx, point=None): - """Obtain subsets of a distribution parameters via indexing. - - This is used to effectively "lift" slices/`Subtensor` `Op`s up to a - distribution's parameters. In other words, `pm.Normal(mu, sigma)[idx]` - becomes `pm.Normal(mu[idx], sigma[idx])`. In computations, the former - requires the entire evaluation of `pm.Normal(mu, sigma)` (e.g. its `.logp` - or a sample from `.random`), which could be very complex, while the latter - only evaluates the subset of interest. - - XXX: this lifting isn't appropriate for every distribution. It's fine for - most scalar distributions and even some multivariate distributions, but - some required functionality is missing in order to handle even the latter. - - Parameters - ---------- - dist : Distribution - The distribution object with the parameters to be indexed. - shape : tuple or Shape - The shape of the distribution's output/support. This is used - to (naively) determine the parameters' broadcasting pattern. - idx : ndarray or TensorVariable - The indices applied to the parameters of `dist`. - point : dict (optional) - A dictionary keyed on the `str` names of each parameter in `dist`, - which are mapped to NumPy values for the corresponding parameter. When - this is given, the Theano parameters are replaced by their values in the - dictionary. - - Returns - ------- - res: list - An ordered set of broadcasted and indexed parameters for `dist`. + bcast_shape = node.inputs[1:] + assert len(bcast_shape) > 0 - """ + rv_node = node.inputs[0].owner + lifted_node = lift_rv_shapes(rv_node) - dist_param_names = dist._distr_parameters_for_repr() + rng, size, dtype, *dist_params = lifted_node.inputs - if point: - # Try to get a concrete/NumPy value if a `point` parameter was - # given. - try: - idx = get_test_value(idx) - except TestValueError: # pragma: no cover - pass + new_dist_params = [at.broadcast_to(param, bcast_shape) for param in dist_params] + bcasted_node = lifted_node.op.make_node(rng, size, dtype, *new_dist_params) - res = [] - for param in dist_param_names: + if aesara.config.compute_test_value != "off": + compute_test_value(bcasted_node) - # Use the (sampled) point, if present - if point is None or param not in point: - x = getattr(dist, param, None) + return bcasted_node - if x is None: - continue - else: - x = point[param] - bcast_res = at_broadcast_to(x, shape) +class SwitchingProcessFactory(OpFromGraph): + pass - res.append(bcast_res[idx]) - return res +def non_constant(x): + x = at.as_tensor_variable(x) + if isinstance(x, Constant): + res = x.type() + res.tag = copy(res.tag) + if aesara.config.compute_test_value != "off": + res.tag.test_value = x.data + res.name = x.name + return res + else: + return x -def get_and_check_comp_value(x): - if isinstance(x, pm.Distribution): - try: - return x.default() - except AttributeError: - pass +def create_switching_process_op(size, states, comp_rvs): + size_param = make_vector( + *[non_constant(size[i]) for i in range(at.get_vector_length(size))] + ) + size_param.name = "size" + states_param = non_constant(states) + comp_rv_params = [non_constant(rv) for rv in comp_rvs] - return x.random() - else: - raise TypeError( - "Component distributions must be PyMC3 Distributions. " - "Got {}".format(type(x)) - ) + output_shape = broadcast_shape( + tuple(states_param.shape), + *[tuple(rv.shape) for rv in comp_rv_params], + arrays_are_shapes=True + ) + + assert at.get_vector_length(output_shape) > 0 + + bcast_states = at.broadcast_to(states_param, output_shape) + + dtype = upcast(*[rv.type.dtype for rv in comp_rv_params]) + + # TODO: This can be done without a loop/`Scan`. + def loop_fn(output_shape, states, *comp_rvs): + + res = at.empty(output_shape, dtype=dtype) + + for i, comp_rv in enumerate(comp_rvs): + i_idx = at.nonzero(at.eq(states, i)) + + bcasted_comp_rv = at.broadcast_to(comp_rv, output_shape) + + if comp_rv.owner and isinstance(comp_rv.owner.op, RandomVariable): + + bcasted_node = naive_bcast_rv_lift(bcasted_comp_rv.owner) + indexed_comp_rv = bcasted_node.outputs[1][i_idx] + fgraph = FunctionGraph(outputs=[indexed_comp_rv], clone=False) + + (lifted_comp_rv,) = local_subtensor_rv_lift.transform( + fgraph, fgraph.outputs[0].owner + ) + else: + lifted_comp_rv = bcasted_comp_rv[i_idx] + + res = at.set_subtensor(res[i_idx], lifted_comp_rv) + + return res + + res, _ = aesara.scan( + loop_fn, + non_sequences=[output_shape, bcast_states] + list(comp_rv_params), + n_steps=at.prod(size_param), + ) + + res = res.reshape(tuple(size) + tuple(output_shape)) + + return SwitchingProcessFactory( + [size_param, states_param] + list(comp_rv_params), [res], inline=True + ) class SwitchingProcess(pm.Distribution): @@ -112,149 +146,69 @@ class SwitchingProcess(pm.Distribution): """ # noqa: E501 - def __init__(self, comp_dists, states, *args, **kwargs): + @classmethod + def dist( + cls, + comp_rvs: Sequence[TensorVariable], + states: TensorVariable, + *args, + size=None, + **kwargs + ): """Initialize a `SwitchingProcess` instance. - Each `Distribution` object in `comp_dists` must have a - `Distribution.random_subset` method that takes a list of indices and - returns a sample for only that subset. Unfortunately, since PyMC3 - doesn't provide such a method, you'll have to implement it yourself and - monkey patch a `Distribution` class. - Parameters ---------- - comp_dists : list of Distribution - A list containing `Distribution` objects for each mixture component. - These are essentially the emissions distributions. - states : DiscreteMarkovChain + comp_rvs: + A list containing `RandomVariable` objects for each mixture component. + states: The hidden state sequence. It should have a number of states equal to the size of `comp_dists`. """ - self.states = at.as_tensor_variable(pm.intX(states)) - if len(comp_dists) > 31: - warnings.warn( - "There are too many mixture distributions to properly" - " determine their combined shape." - ) + testval = kwargs.pop("testval", None) - self.comp_dists = comp_dists + size = normalize_size_param(size) - states_tv = get_test_value(self.states) - bcast_comps = np.broadcast( - states_tv, *[get_and_check_comp_value(x) for x in comp_dists[:31]] - ) - shape = bcast_comps.shape - - defaults = kwargs.pop("defaults", []) - - out_dtype = upcast(*[x.type.dtype for x in comp_dists]) - dtype = kwargs.pop("dtype", out_dtype) - - if not all_discrete(comp_dists): - try: - bcast_means = tt_broadcast_arrays( - *([self.states] + [d.mean.astype(dtype) for d in self.comp_dists]) - ) - self.mean = at.choose(self.states, bcast_means[1:]) - - if "mean" not in defaults: - defaults.append("mean") - - except (AttributeError, ValueError, IndexError): # pragma: no cover - pass - - try: - bcast_modes = tt_broadcast_arrays( - *([self.states] + [d.mode.astype(dtype) for d in self.comp_dists]) - ) - self.mode = at.choose(self.states, bcast_modes[1:]) + SwitchingProcessOp = create_switching_process_op(size, states, comp_rvs) + rv_var = SwitchingProcessOp(*([size, states] + list(comp_rvs))) - if "mode" not in defaults: - defaults.append("mode") + if testval is not None: + rv_var.tag.test_value = testval - except (AttributeError, ValueError, IndexError): # pragma: no cover - pass + return rv_var - super().__init__(shape=shape, dtype=dtype, defaults=defaults, **kwargs) - def logp(self, obs): - """Return the scalar Theano log-likelihood at a point.""" +@_logp.register(SwitchingProcessFactory) +def switching_process_logp(op, var, rvs_to_values, *dist_params, **kwargs): + obs = rvs_to_values.get(var, var) + size, states, *comp_rvs = dist_params[: len(op.inputs)] - obs_tt = at.as_tensor_variable(obs) + obs_tt = at.as_tensor_variable(obs) - logp_val = at.alloc(-np.inf, *obs.shape) + logp_val = at.alloc(-np.inf, *tuple(obs_tt.shape)) - for i, dist in enumerate(self.comp_dists): - i_mask = at.eq(self.states, i) - obs_i = obs_tt[i_mask] - subset_dist = dist.dist(*distribution_subset_args(dist, obs.shape, i_mask)) - logp_val = at.set_subtensor(logp_val[i_mask], subset_dist.logp(obs_i)) + for i, comp_rv in enumerate(comp_rvs): + i_idx = at.nonzero(at.eq(states, i)) + obs_i = obs_tt[i_idx] - return logp_val + bcasted_comp_rv = at.broadcast_to(comp_rv, obs_tt.shape) - def random(self, point=None, size=None): - """Sample from this distribution conditional on a given set of values. + if comp_rv.owner and isinstance(comp_rv.owner.op, RandomVariable): + bcasted_node = naive_bcast_rv_lift(bcasted_comp_rv.owner) + indexed_comp_rv = bcasted_node.outputs[1][i_idx] + fgraph = FunctionGraph(outputs=[indexed_comp_rv], clone=False) - Parameters - ---------- - point: dict, optional - Dict of variable values on which random values are to be - conditioned (uses default point if not specified). - size: int, optional - Desired size of random sample (returns one sample if not - specified). - - Returns - ------- - array - """ - with _DrawValuesContext(): - (states,) = draw_values([self.states], point=point, size=size) - - # This is a terrible thing to have to do here, but it's better than - # having to (know to) update `Distribution.shape` when/if dimensions - # change (e.g. when sampling new state sequences). - bcast_comps = np.broadcast( - states, *[dist.random(point=point) for dist in self.comp_dists] - ) - self_shape = bcast_comps.shape - - if size: - # `draw_values` will not honor the `size` parameter if its arguments - # don't contain random variables, so, when our `self.states` are - # constants, we have to broadcast `states` so that it matches `size + - # self.shape`. - expanded_states = np.broadcast_to( - states, tuple(np.atleast_1d(size)) + self_shape + (lifted_comp_rv,) = local_subtensor_rv_lift.transform( + fgraph, fgraph.outputs[0].owner ) else: - expanded_states = np.broadcast_to(states, self_shape) - - samples = np.empty(expanded_states.shape) - - for i, dist in enumerate(self.comp_dists): - # We want to sample from only the parts of our component - # distributions that are active given the states. - # This is only really relevant when the component distributions - # change over the state space (e.g. Poisson means that change - # over time). - # We could always sample such components over the entire space - # (e.g. time), but, for spaces with large dimension, that would - # be extremely costly and wasteful. - i_idx = np.where(expanded_states == i) - i_size = len(i_idx[0]) - if i_size > 0: - subset_args = distribution_subset_args( - dist, expanded_states.shape, i_idx, point=point - ) - state_dist = dist.dist(*subset_args) + lifted_comp_rv = bcasted_comp_rv[i_idx] - sample = state_dist.random(point=point) - samples[i_idx] = sample + logp_val = at.set_subtensor(logp_val[i_idx], logpt(lifted_comp_rv, obs_i)) - return samples + return logp_val class PoissonZeroProcess(SwitchingProcess): @@ -264,7 +218,8 @@ class PoissonZeroProcess(SwitchingProcess): the second mixture component is the Poisson random variable. """ - def __init__(self, mu=None, states=None, **kwargs): + @classmethod + def dist(cls, mu=None, states=None, rng=None, **kwargs): """Initialize a `PoissonZeroProcess` object. Parameters @@ -275,13 +230,67 @@ def __init__(self, mu=None, states=None, **kwargs): A vector of integer 0-1 states that indicate which component of the mixture is active at each point/time. """ - self.mu = at.as_tensor_variable(pm.floatX(mu)) - self.states = at.as_tensor_variable(states) + mu = at.as_tensor_variable(pm.floatX(mu)) + states = at.as_tensor_variable(states) + + return super().dist( + [pm.Constant.dist(0), pm.Poisson.dist(mu, rng=rng)], states, **kwargs + ) + + +class DiscreteMarkovChainFactory(OpFromGraph): + pass + - super().__init__([pm.Constant.dist(0), pm.Poisson.dist(mu)], states, **kwargs) +def create_discrete_mc_op(size, Gammas, gamma_0): + size_param = make_vector( + *[non_constant(size[i]) for i in range(at.get_vector_length(size))] + ) + size_param.name = "size" + Gammas_param = non_constant(Gammas) + gamma_0_param = non_constant(gamma_0) -class DiscreteMarkovChain(pm.Discrete): + ind_dims = tuple(Gammas_param.shape[:-3]) + bcast_gamma_0 = at.broadcast_to(gamma_0_param, ind_dims + (Gammas_param.shape[-1],)) + + # Sample state 0 in each state sequence + state_0 = categorical( + bcast_gamma_0, size=tuple(size_param) + (Gammas_param.shape[-1],) + ) + + N = Gammas_param.shape[-3] + + def loop_fn(n, state_nm1, Gammas): + # gamma_t = Gammas[..., n, state_nm1, :] + gamma_t = Gammas[..., n, :, :] + slices_tuple = tuple( + at.ogrid[[slice(None, d) for d in tuple(size_param) + ind_dims]] + ) + gamma_t = gamma_t[tuple(slices_tuple) + (state_nm1,)] + if gamma_t.ndim == 0: + gamma_t = gamma_t.dimshuffle("x") + state_n = categorical(gamma_t) + return state_n + + res, res_updates = aesara.scan( + loop_fn, + outputs_info=[{"initial": state_0, "taps": [-1]}], + sequences=[at.arange(N)], + non_sequences=[Gammas_param], + # TODO: Add `rng` to this + # strict=True, + ) + + return SwitchingProcessFactory( + [size_param, Gammas_param, gamma_0_param], + [res.T], + inline=True, + on_unused_input="ignore", + ) + + +class DiscreteMarkovChain(pm.Distribution): """A first-order discrete Markov chain distribution. This class characterizes vector random variables consisting of state @@ -290,7 +299,8 @@ class DiscreteMarkovChain(pm.Discrete): """ - def __init__(self, Gammas, gamma_0, shape, **kwargs): + @classmethod + def dist(cls, Gammas, gamma_0, size=None, **kwargs): """Initialize an `DiscreteMarkovChain` object. Parameters @@ -304,145 +314,100 @@ def __init__(self, Gammas, gamma_0, shape, **kwargs): gamma_0: TensorVariable The initial state probabilities. The last dimension should be length `M`, i.e. the number of distinct states. - shape: tuple of int - Shape of the state sequence. The last dimension is `N`, i.e. the - length of the state sequence(s). """ - self.gamma_0 = at.as_tensor_variable(pm.floatX(gamma_0)) + gamma_0 = at.as_tensor_variable(pm.floatX(gamma_0)) assert Gammas.ndim >= 3 - self.Gammas = at.as_tensor_variable(pm.floatX(Gammas)) + Gammas = at.as_tensor_variable(pm.floatX(Gammas)) - shape = np.atleast_1d(shape) + size = normalize_size_param(size) - dtype = _conversion_map[aesara.config.floatX] - self.mode = np.zeros(tuple(shape), dtype=dtype) + testval = kwargs.pop("testval", None) - super().__init__(shape=shape, **kwargs) + # rv_var = create_discrete_mc_op(size, Gammas, gamma_0) + DiscreteMarkovChainOp = create_discrete_mc_op(size, Gammas, gamma_0) + rv_var = DiscreteMarkovChainOp(size, Gammas, gamma_0) - def logp(self, states): - r"""Create a Theano graph that computes the log-likelihood for a discrete Markov chain. + if testval is not None: + rv_var.tag.test_value = testval - This is the log-likelihood for the joint distribution of states, :math:`S_t`, conditional - on state samples, :math:`s_t`, given by the following: + return rv_var - .. math:: - \int_{S_0} P(S_1 = s_1 \mid S_0) dP(S_0) \prod^{T}_{t=2} P(S_t = s_t \mid S_{t-1} = s_{t-1}) +@_logp.register(DiscreteMarkovChainFactory) +def discrete_mc_logp(op, var, rvs_to_values, *dist_params, **kwargs): + r"""Create a Theano graph that computes the log-likelihood for a discrete Markov chain. - The first term (i.e. the integral) simply computes the marginal :math:`P(S_1 = s_1)`, so - another way to express this result is as follows: + This is the log-likelihood for the joint distribution of states, :math:`S_t`, conditional + on state samples, :math:`s_t`, given by the following: - .. math:: + .. math:: - P(S_1 = s_1) \prod^{T}_{t=2} P(S_t = s_t \mid S_{t-1} = s_{t-1}) + \int_{S_0} P(S_1 = s_1 \mid S_0) dP(S_0) \prod^{T}_{t=2} P(S_t = s_t \mid S_{t-1} = s_{t-1}) - """ # noqa: E501 + The first term (i.e. the integral) simply computes the marginal :math:`P(S_1 = s_1)`, so + another way to express this result is as follows: - Gammas = at.shape_padleft(self.Gammas, states.ndim - (self.Gammas.ndim - 2)) + .. math:: - # Multiply the initial state probabilities by the first transition - # matrix by to get the marginal probability for state `S_1`. - # The integral that produces the marginal is essentially - # `gamma_0.dot(Gammas[0])` - Gamma_1 = Gammas[..., 0:1, :, :] - gamma_0 = tt_expand_dims(self.gamma_0, (-3, -1)) - P_S_1 = at.sum(gamma_0 * Gamma_1, axis=-2) + P(S_1 = s_1) \prod^{T}_{t=2} P(S_t = s_t \mid S_{t-1} = s_{t-1}) - # The `tt.switch`s allow us to broadcast the indexing operation when - # the replication dimensions of `states` and `Gammas` don't match - # (e.g. `states.shape[0] > Gammas.shape[0]`) - S_1_slices = tuple( - slice( - at.switch(at.eq(P_S_1.shape[i], 1), 0, 0), - at.switch(at.eq(P_S_1.shape[i], 1), 1, d), - ) - for i, d in enumerate(states.shape) - ) - S_1_slices = (tuple(at.ogrid[S_1_slices]) if S_1_slices else tuple()) + ( - states[..., 0:1], - ) - logp_S_1 = at.log(P_S_1[S_1_slices]).sum(axis=-1) - - # These are slices for the extra dimensions--including the state - # sequence dimension (e.g. "time")--along which which we need to index - # the transition matrix rows using the "observed" `states`. - trans_slices = tuple( - slice( - at.switch( - at.eq(Gammas.shape[i], 1), 0, 1 if i == states.ndim - 1 else 0 - ), - at.switch(at.eq(Gammas.shape[i], 1), 1, d), - ) - for i, d in enumerate(states.shape) - ) - trans_slices = (tuple(at.ogrid[trans_slices]) if trans_slices else tuple()) + ( - states[..., :-1], - ) - - # Select the transition matrix row of each observed state; this yields - # `P(S_t | S_{t-1} = s_{t-1})` - P_S_2T = Gammas[trans_slices] + """ # noqa: E501 - obs_slices = tuple(slice(None, d) for d in P_S_2T.shape[:-1]) - obs_slices = (tuple(at.ogrid[obs_slices]) if obs_slices else tuple()) + ( - states[..., 1:], + states = rvs_to_values.get(var, var) + size, Gammas, gamma_0 = dist_params[: len(op.inputs)] + + Gammas = at.shape_padleft(Gammas, states.ndim - (Gammas.ndim - 2)) + + # Multiply the initial state probabilities by the first transition + # matrix by to get the marginal probability for state `S_1`. + # The integral that produces the marginal is essentially + # `gamma_0.dot(Gammas[0])` + Gamma_1 = Gammas[..., 0:1, :, :] + gamma_0 = tt_expand_dims(gamma_0, (-3, -1)) + P_S_1 = at.sum(gamma_0 * Gamma_1, axis=-2) + + # The `tt.switch`s allow us to broadcast the indexing operation when + # the replication dimensions of `states` and `Gammas` don't match + # (e.g. `states.shape[0] > Gammas.shape[0]`) + S_1_slices = tuple( + slice( + at.switch(at.eq(P_S_1.shape[i], 1), 0, 0), + at.switch(at.eq(P_S_1.shape[i], 1), 1, d), ) - logp_S_1T = at.log(P_S_2T[obs_slices]) - - res = logp_S_1 + at.sum(logp_S_1T, axis=-1) - res.name = "DiscreteMarkovChain_logp" - - return res - - def random(self, point=None, size=None): - """Sample from a discrete Markov chain conditional on a given set of values. - - Parameters - ---------- - point: dict, optional - Dict of variable values on which random values are to be - conditioned (uses default point if not specified). - size: int, optional - Desired size of random sample (returns one sample if not - specified). - - Returns - ------- - array - """ - terms = [self.gamma_0, self.Gammas] - - with _DrawValuesContext(): - gamma_0, Gamma = draw_values(terms, point=point) - - # Sample state 0 in each state sequence - state_n = pm.Categorical.dist(gamma_0, shape=self.shape[:-1]).random( - point=point, size=size + for i, d in enumerate(states.shape) + ) + S_1_slices = (tuple(at.ogrid[S_1_slices]) if S_1_slices else tuple()) + ( + states[..., 0:1], + ) + logp_S_1 = at.log(P_S_1[S_1_slices]).sum(axis=-1) + + # These are slices for the extra dimensions--including the state + # sequence dimension (e.g. "time")--along which which we need to index + # the transition matrix rows using the "observed" `states`. + trans_slices = tuple( + slice( + at.switch(at.eq(Gammas.shape[i], 1), 0, 1 if i == states.ndim - 1 else 0), + at.switch(at.eq(Gammas.shape[i], 1), 1, d), ) - state_shape = state_n.shape - - N = self.shape[-1] + for i, d in enumerate(states.shape) + ) + trans_slices = (tuple(at.ogrid[trans_slices]) if trans_slices else tuple()) + ( + states[..., :-1], + ) - states = np.empty(state_shape + (N,), dtype=self.dtype) + # Select the transition matrix row of each observed state; this yields + # `P(S_t | S_{t-1} = s_{t-1})` + P_S_2T = Gammas[trans_slices] - unif_samples = np.random.uniform(size=states.shape) + obs_slices = tuple(slice(None, d) for d in P_S_2T.shape[:-1]) + obs_slices = (tuple(at.ogrid[obs_slices]) if obs_slices else tuple()) + ( + states[..., 1:], + ) + logp_S_1T = at.log(P_S_2T[obs_slices]) - # Make sure we have a transition matrix for each element in a state - # sequence - Gamma = np.broadcast_to(Gamma, tuple(states.shape) + Gamma.shape[-2:]) + res = logp_S_1 + at.sum(logp_S_1T, axis=-1) + res.name = "DiscreteMarkovChain_logp" - # Slices across each independent/replication dimension - slices_tuple = tuple(np.ogrid[[slice(None, d) for d in state_shape]]) - - for n in range(0, N): - gamma_t = Gamma[..., n, :, :] - gamma_t = gamma_t[slices_tuple + (state_n,)] - state_n = vsearchsorted(gamma_t.cumsum(axis=-1), unif_samples[..., n]) - states[..., n] = state_n - - return states - - def _distr_parameters_for_repr(self): - return ["Gammas", "gamma_0"] + return res diff --git a/pymc3_hmm/step_methods.py b/pymc3_hmm/step_methods.py index d7c470c..2b85db6 100644 --- a/pymc3_hmm/step_methods.py +++ b/pymc3_hmm/step_methods.py @@ -28,7 +28,6 @@ from theano.tensor.var import TensorConstant import pymc3 as pm -from pymc3.distributions.distribution import draw_values from pymc3.step_methods.arraystep import ArrayStep, BlockedStep, Competence from pymc3.util import get_untransformed_name @@ -94,6 +93,7 @@ def ffbs_step( log_lik_n: np.ndarray = log_lik[..., n] np.exp(log_lik_n - log_lik_n.max(), out=lik_n) np.dot(alpha_nm1, Gamma[n], out=alpha_n) + breakpoint() alpha_n *= lik_n alpha_n_sum: float = np.sum(alpha_n) @@ -184,7 +184,8 @@ def __init__(self, vars, values=None, model=None): comp_logp_stacked = at.sum(dep_comps_logp_stacked, axis=0) - (M,) = draw_values([var.distribution.gamma_0.shape[-1]], point=model.test_point) + # XXX: This isn't correct. + M = var.owner.inputs[2].eval(model.test_point) N = model.test_point[var.name].shape[-1] self.alphas = np.empty((M, N), dtype=float) diff --git a/setup.py b/setup.py index 01306ad..c474bdb 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ install_requires=[ "numpy>=1.18.1", "scipy>=1.4.0", - "pymc3>=3.11.1,<4.0.0", + "pymc3>=4.0.0", ], tests_require=["pytest"], long_description=open("README.md").read() if exists("README.md") else "", diff --git a/tests/test_distributions.py b/tests/test_distributions.py index d7551fe..1399447 100644 --- a/tests/test_distributions.py +++ b/tests/test_distributions.py @@ -8,72 +8,56 @@ import theano.tensor as at import pymc3 as pm -import pytest +from pymc3.distributions.logp import logpt from pymc3_hmm.distributions import ( DiscreteMarkovChain, PoissonZeroProcess, SwitchingProcess, - distribution_subset_args, ) from tests.utils import simulate_poiszero_hmm -def test_DiscreteMarkovChain_str(): - Gammas = at.as_tensor(np.eye(2)[None, ...], name="Gammas") - gamma_0 = at.as_tensor(np.r_[0, 1], name="gamma_0") - - with pm.Model(): - test_dist = DiscreteMarkovChain("P_rv", Gammas, gamma_0, shape=(2,)) - - assert str(test_dist) == "P_rv ~ DiscreteMarkovChain" - - +@aesara.config.change_flags(compute_test_value="raise", cxx="") def test_DiscreteMarkovChain_random(): # A single transition matrix and initial probabilities vector for each # element in the state sequence - test_Gamma = np.array([[[1.0, 0.0], [0.0, 1.0]]]) + test_Gamma_base = np.array([[[1.0, 0.0], [0.0, 1.0]]]) + test_Gamma = np.broadcast_to(test_Gamma_base, (10, 2, 2)) test_gamma_0 = np.r_[0.0, 1.0] - test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=10).random() + test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0).eval() assert np.all(test_sample == 1) test_sample = DiscreteMarkovChain.dist( - test_Gamma, 1.0 - test_gamma_0, shape=10 - ).random() + test_Gamma, 1.0 - test_gamma_0, size=10 + ).eval() assert np.all(test_sample == 0) - test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=10).random( - size=12 - ) - assert test_sample.shape == ( - 12, - 10, - ) + test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=12).eval() + assert test_sample.shape == (12, 10) - test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=10).random( - size=2 - ) + test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=2).eval() assert np.array_equal( test_sample, np.stack([np.ones(10), np.ones(10)], 0).astype(int) ) # Now, the same set-up, but--this time--generate two state sequences # samples - test_Gamma = np.array([[[0.8, 0.2], [0.2, 0.8]]]) + test_Gamma_base = np.array([[[0.8, 0.2], [0.2, 0.8]]]) + test_Gamma = np.broadcast_to(test_Gamma_base, (10, 2, 2)) test_gamma_0 = np.r_[0.2, 0.8] - test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=10).random( - size=2 - ) + test_sample = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=2).eval() # TODO: Fix the seed, and make sure there's at least one 0 and 1? assert test_sample.shape == (2, 10) # Two transition matrices--for two distinct state sequences--and one vector # of initial probs. - test_Gamma = np.stack( + test_Gamma_base = np.stack( [np.array([[[1.0, 0.0], [0.0, 1.0]]]), np.array([[[1.0, 0.0], [0.0, 1.0]]])] ) + test_Gamma = np.broadcast_to(test_Gamma_base, (2, 10, 2, 2)) test_gamma_0 = np.r_[0.0, 1.0] - test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=(2, 10)) - test_sample = test_dist.random() + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.stack([np.ones(10), np.ones(10)], 0).astype(int) ) @@ -81,7 +65,8 @@ def test_DiscreteMarkovChain_random(): # Now, the same set-up, but--this time--generate three state sequence # samples - test_sample = test_dist.random(size=3) + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=3) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.tile(np.stack([np.ones(10), np.ones(10)], 0).astype(int), (3, 1, 1)), @@ -90,12 +75,13 @@ def test_DiscreteMarkovChain_random(): # Two transition matrices and initial probs. for two distinct state # sequences - test_Gamma = np.stack( + test_Gamma_base = np.stack( [np.array([[[1.0, 0.0], [0.0, 1.0]]]), np.array([[[1.0, 0.0], [0.0, 1.0]]])] ) + test_Gamma = np.broadcast_to(test_Gamma_base, (2, 10, 2, 2)) test_gamma_0 = np.stack([np.r_[0.0, 1.0], np.r_[1.0, 0.0]]) - test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=(2, 10)) - test_sample = test_dist.random() + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.stack([np.ones(10), np.zeros(10)], 0).astype(int) ) @@ -103,7 +89,8 @@ def test_DiscreteMarkovChain_random(): # Now, the same set-up, but--this time--generate three state sequence # samples - test_sample = test_dist.random(size=3) + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=3) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.tile(np.stack([np.ones(10), np.zeros(10)], 0).astype(int), (3, 1, 1)), @@ -122,13 +109,14 @@ def test_DiscreteMarkovChain_random(): ) test_gamma_0 = np.r_[1, 0] - test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=3) - test_sample = test_dist.random() + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0) + test_sample = test_dist.eval() assert np.array_equal(test_sample, np.r_[1, 0, 0]) # Now, the same set-up, but--this time--generate three state sequence # samples - test_sample = test_dist.random(size=3) + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=3) + test_sample = test_dist.eval() assert np.array_equal(test_sample, np.tile(np.r_[1, 0, 0].astype(int), (3, 1))) # "Time"-varying transition matrices with two initial @@ -143,13 +131,14 @@ def test_DiscreteMarkovChain_random(): ) test_gamma_0 = np.array([[1, 0], [0, 1]]) - test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=(2, 3)) - test_sample = test_dist.random() + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0) + test_sample = test_dist.eval() assert np.array_equal(test_sample, np.array([[1, 0, 0], [0, 1, 1]])) # Now, the same set-up, but--this time--generate three state sequence # samples - test_sample = test_dist.random(size=3) + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=3) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.tile(np.array([[1, 0, 0], [0, 1, 1]]).astype(int), (3, 1, 1)) ) @@ -173,13 +162,14 @@ def test_DiscreteMarkovChain_random(): ) test_gamma_0 = np.array([[1, 0], [0, 1]]) - test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, shape=(2, 3)) - test_sample = test_dist.random() + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0) + test_sample = test_dist.eval() assert np.array_equal(test_sample, np.array([[1, 0, 0], [1, 1, 0]])) # Now, the same set-up, but--this time--generate three state sequence # samples - test_sample = test_dist.random(size=3) + test_dist = DiscreteMarkovChain.dist(test_Gamma, test_gamma_0, size=3) + test_sample = test_dist.eval() assert np.array_equal( test_sample, np.tile(np.array([[1, 0, 0], [1, 1, 0]]).astype(int), (3, 1, 1)) ) @@ -371,32 +361,32 @@ def test_SwitchingProcess_random(): test_states = np.r_[0, 0, 1, 1, 0, 1] mu_zero_nonzero = [pm.Constant.dist(0), pm.Constant.dist(1)] test_dist = SwitchingProcess.dist(mu_zero_nonzero, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) - test_sample = test_dist.random() + assert np.array_equal(test_dist.shape.eval(), test_states.shape) + test_sample = test_dist.eval() assert test_sample.shape == (test_states.shape[0],) assert np.all(test_sample[test_states > 0] > 0) - test_sample = test_dist.random(size=5) + test_sample = SwitchingProcess.dist(mu_zero_nonzero, test_states, size=5).eval() assert np.array_equal(test_sample.shape, (5,) + test_states.shape) assert np.all(test_sample[..., test_states > 0] > 0) test_states = np.r_[0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0] test_dist = SwitchingProcess.dist(mu_zero_nonzero, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) - test_sample = test_dist.random(size=1) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) + test_sample = SwitchingProcess.dist(mu_zero_nonzero, test_states, size=1).eval() assert np.array_equal(test_sample.shape, (1,) + test_states.shape) assert np.all(test_sample[..., test_states > 0] > 0) test_states = np.r_[0, 0, 1, 1, 0, 1] test_mus = [pm.Constant.dist(i) for i in range(6)] test_dist = SwitchingProcess.dist(test_mus, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) - test_sample = test_dist.random() + assert np.array_equal(test_dist.shape.eval(), test_states.shape) + test_sample = test_dist.eval() assert np.array_equal(test_sample.shape, test_states.shape) assert np.all(test_sample[..., test_states > 0] > 0) test_states = np.c_[0, 0, 1, 1, 0, 1].T - test_mus = np.arange(1, 6).astype(float) + test_mus = np.arange(1, 6).astype(np.float64) # One of the states has emissions that are a sequence of five Dirac delta # distributions on the values 1 to 5 (i.e. the one with values # `test_mus`), and the other is just a single delta at 0. A single state @@ -408,22 +398,23 @@ def test_SwitchingProcess_random(): test_dist = SwitchingProcess.dist( [pm.Constant.dist(0), pm.Constant.dist(test_mus)], test_states ) - assert np.array_equal(test_dist.shape, (6, 5)) - test_sample = test_dist.random() - assert np.array_equal(test_sample.shape, test_dist.shape) - assert np.all(test_sample[np.where(test_states > 0)[0]] == test_mus) + assert np.array_equal(test_dist.shape.eval(), (6, 5)) + test_sample = test_dist.eval() + assert np.array_equal(test_sample.shape, test_dist.shape.eval()) + sample_mus = test_sample[np.where(test_states > 0)[0]] + assert np.all(sample_mus == test_mus) test_states = np.c_[0, 0, 1, 1, 0, 1] - test_mus = np.arange(1, 7).astype(float) + test_mus = np.arange(1, 7).astype(np.float64) test_dist = SwitchingProcess.dist( [pm.Constant.dist(0), pm.Constant.dist(test_mus)], test_states ) - assert np.array_equal(test_dist.shape, test_states.shape) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) test_states = np.r_[0, 0, 1, 1, 0, 1] test_sample = SwitchingProcess.dist( - [pm.Constant.dist(0), pm.Constant.dist(test_mus)], test_states - ).random(size=3) + [pm.Constant.dist(0), pm.Constant.dist(test_mus)], test_states, size=3 + ).eval() assert np.array_equal(test_sample.shape, (3,) + test_mus.shape) assert np.all(test_sample.sum(0)[..., test_states > 0] > 0) @@ -433,35 +424,28 @@ def test_PoissonZeroProcess_point(): with pm.Model(): test_mean = pm.Constant("c", 1000.0) - test_point = {"c": 100.0} - test_sample = PoissonZeroProcess.dist(test_mean, test_states).random( - point=test_point - ) - - assert np.all(0 < test_sample[..., test_states > 0]) - assert np.all(test_sample[..., test_states > 0] < 200) + test_rv = PoissonZeroProcess.dist(test_mean, test_states) + test_sample = test_rv.eval() -def test_random_PoissonZeroProcess_DiscreteMarkovChain(): - poiszero_sim, test_model = simulate_poiszero_hmm(30, 5000) - - y_test = poiszero_sim["Y_t"].squeeze() - nonzeros_idx = poiszero_sim["S_t"] > 0 - - assert np.all(y_test[nonzeros_idx] > 0) - assert np.all(y_test[~nonzeros_idx] == 0) + assert np.all(0 < test_sample[..., test_states > 0]) + assert np.all(test_sample[..., test_states > 0] < 10000) def test_SwitchingProcess(): - np.random.seed(2023532) + rng = aesara.shared(np.random.RandomState(2023532), borrow=True) test_states = np.r_[2, 0, 1, 2, 0, 1] - test_dists = [pm.Constant.dist(0), pm.Poisson.dist(100.0), pm.Poisson.dist(1000.0)] + test_dists = [ + pm.Constant.dist(0), + pm.Poisson.dist(100.0, rng=rng), + pm.Poisson.dist(1000.0, rng=rng), + ] test_dist = SwitchingProcess.dist(test_dists, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) - test_sample = test_dist.random() + test_sample = test_dist.eval() assert test_sample.shape == (test_states.shape[0],) assert np.all(test_sample[test_states == 0] == 0) assert np.all(0 < test_sample[test_states == 1]) @@ -471,37 +455,40 @@ def test_SwitchingProcess(): test_mus = np.r_[100, 100, 500, 100, 100, 100] test_dists = [ pm.Constant.dist(0), - pm.Poisson.dist(test_mus), - pm.Poisson.dist(10000.0), + pm.Poisson.dist(test_mus, rng=rng), + pm.Poisson.dist(10000.0, rng=rng), ] test_dist = SwitchingProcess.dist(test_dists, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) - test_sample = test_dist.random() + test_sample = test_dist.eval() assert test_sample.shape == (test_states.shape[0],) assert np.all(200 < test_sample[2] < 600) assert np.all(0 < test_sample[5] < 200) assert np.all(5000 < test_sample[test_states == 2]) - test_dists = [pm.Constant.dist(0), pm.Poisson.dist(100.0), pm.Poisson.dist(1000.0)] + test_dists = [ + pm.Constant.dist(0), + pm.Poisson.dist(100.0, rng=rng), + pm.Poisson.dist(1000.0, rng=rng), + ] test_dist = SwitchingProcess.dist(test_dists, test_states) for i in range(len(test_dists)): - test_logp = test_dist.logp( - np.tile(test_dists[i].mode.eval(), test_states.shape) - ).eval() + obs = np.tile(test_dists[i].owner.inputs[3].eval(), test_states.shape) + test_logp = logpt(test_dist, obs).eval() assert test_logp[test_states != i].max() < test_logp[test_states == i].min() # Try a continuous mixture test_states = np.r_[2, 0, 1, 2, 0, 1] test_dists = [ - pm.Normal.dist(0.0, 1.0), - pm.Normal.dist(100.0, 1.0), - pm.Normal.dist(1000.0, 1.0), + pm.Normal.dist(0.0, 1.0, rng=rng), + pm.Normal.dist(100.0, 1.0, rng=rng), + pm.Normal.dist(1000.0, 1.0, rng=rng), ] test_dist = SwitchingProcess.dist(test_dists, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) - test_sample = test_dist.random() + test_sample = test_dist.eval() assert test_sample.shape == (test_states.shape[0],) assert np.all(test_sample[test_states == 0] < 10) assert np.all(50 < test_sample[test_states == 1]) @@ -512,79 +499,34 @@ def test_SwitchingProcess(): test_states = np.ones(50) test_dists = [pm.Constant.dist(i) for i in range(50)] test_dist = SwitchingProcess.dist(test_dists, test_states) - assert np.array_equal(test_dist.shape, test_states.shape) - - with pytest.raises(TypeError): - SwitchingProcess.dist([1], test_states) - - with aesara.change_flags(compute_test_value="off"): - # Test for the case when a default can't be computed - test_dist = pm.Poisson.dist(at.scalar()) + assert np.array_equal(test_dist.shape.eval(), test_states.shape) - # Confirm that there's no default - with pytest.raises(AttributeError): - test_dist.default() - - # Let it try to sample using `Distribution.random` and fail - with pytest.raises(ValueError): - SwitchingProcess.dist([test_dist], test_states) + # TODO: What should this return/do? + # with pytest.raises(TypeError): + # SwitchingProcess.dist([1], test_states) # Evaluate multiple observed state sequences in an extreme case test_states = at.imatrix("states") test_states.tag.test_value = np.zeros((10, 4)).astype("int32") + test_dist = SwitchingProcess.dist( [pm.Constant.dist(0), pm.Constant.dist(1)], test_states ) test_obs = np.tile(np.arange(4), (10, 1)).astype("int32") - test_logp = test_dist.logp(test_obs) + test_logp = logpt(test_dist, test_obs) exp_logp = np.tile( np.array([0.0] + [-np.inf] * 3, dtype=aesara.config.floatX), (10, 1) ) - assert np.array_equal(test_logp.tag.test_value, exp_logp) - - -def test_subset_args(): - - test_dist = pm.Constant.dist(c=np.r_[0.1, 1.2, 2.3]) - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx) - assert np.array_equal(res[0].eval(), np.r_[0.1, 2.3]) - - test_point = {"c": np.r_[2.0, 3.0, 4.0]} - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx, point=test_point) - assert np.array_equal(res[0].eval(), np.r_[2.0, 4.0]) - - test_dist = pm.Normal.dist(mu=np.r_[0.1, 1.2, 2.3], sigma=np.r_[10.0]) - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx) - assert np.array_equal(res[0].eval(), np.r_[0.1, 2.3]) - assert np.array_equal(res[1].eval(), np.r_[10.0, 10.0]) - - test_point = {"mu": np.r_[2.0, 3.0, 4.0], "sigma": np.r_[20.0, 30.0, 40.0]} - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx, point=test_point) - assert np.array_equal(res[0].eval(), np.r_[2.0, 4.0]) - assert np.array_equal(res[1].eval(), np.r_[20.0, 40.0]) - - test_dist = pm.Poisson.dist(mu=np.r_[0.1, 1.2, 2.3]) - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx) - assert np.array_equal(res[0].eval(), np.r_[0.1, 2.3]) - - test_point = {"mu": np.r_[2.0, 3.0, 4.0]} - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx, point=test_point) - assert np.array_equal(res[0].eval(), np.r_[2.0, 4.0]) - - test_dist = pm.NegativeBinomial.dist(mu=np.r_[0.1, 1.2, 2.3], alpha=2) - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx) - assert np.array_equal(res[0].eval(), np.r_[0.1, 2.3]) - assert np.array_equal(res[1].eval(), np.r_[2.0, 2.0]) - - test_point = {"mu": np.r_[2.0, 3.0, 4.0], "alpha": np.r_[10, 11, 12]} - test_idx = np.r_[0, 2] - res = distribution_subset_args(test_dist, shape=[3], idx=test_idx, point=test_point) - assert np.array_equal(res[0].eval(), np.r_[2.0, 4.0]) - assert np.array_equal(res[1].eval(), np.r_[10, 12]) + assert np.array_equal( + test_logp.eval({test_states: test_states.tag.test_value}), exp_logp + ) + + +def test_random_PoissonZeroProcess_DiscreteMarkovChain(): + poiszero_sim, test_model = simulate_poiszero_hmm(30, 5000) + + y_test = poiszero_sim["Y_t"].squeeze() + nonzeros_idx = poiszero_sim["S_t"] > 0 + + assert np.all(y_test[nonzeros_idx] > 0) + assert np.all(y_test[~nonzeros_idx] == 0)