diff --git a/pymc_extras/statespace/core/statespace.py b/pymc_extras/statespace/core/statespace.py index 0f887291d..5a276d248 100644 --- a/pymc_extras/statespace/core/statespace.py +++ b/pymc_extras/statespace/core/statespace.py @@ -15,6 +15,7 @@ from pymc.model.transform.optimization import freeze_dims_and_data from pymc.util import RandomState from pytensor import Variable, graph_replace +from pytensor.graph.traversal import explicit_graph_inputs from rich.box import SIMPLE_HEAD from rich.console import Console from rich.table import Table @@ -266,6 +267,7 @@ def __init__( self._fit_dims: dict[str, Sequence[str]] | None = None self._fit_data: pt.TensorVariable | None = None self._fit_exog_data: dict[str, dict] = {} + self._shared_timestep: pt.TensorVariable | None = None self._needs_exog_data = None self._tensor_variable_info = SymbolicVariableInfo() @@ -279,6 +281,9 @@ def __init__( self._populate_properties() + # Placeholder for time-varying matrices that depend on data length + self._n_timesteps_placeholder = pt.iscalar("n_timesteps") + # All models contain a state space representation and a Kalman filter self.ssm = PytensorRepresentation(k_endog, k_states, k_posdef) @@ -494,6 +499,11 @@ def unpack_statespace(self) -> list[pt.TensorVariable]: return self.subbed_ssm + @property + def n_timesteps(self) -> Variable: + """Symbolic placeholder for the number of time steps in the data.""" + return self._n_timesteps_placeholder + @property def param_names(self) -> tuple[str, ...]: """ @@ -894,6 +904,56 @@ def _insert_data_variables(self): replacement_dict = {data: pymc_model[name] for name, data in self._name_to_data.items()} self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=True) + def _insert_data_shape_into_n_timesteps(self, data): + """ + Replace any occurrence of the n_timesteps symbolic variable with the length of the data. + + n_timesteps is a special symbolic variable used by graphs with time-varying matrices, whose shapes won't be + known until the user provides data. We need to collect them and replace them with a single common shared + variable to define all shapes consistently. + """ + # This method should only be called after data has been ingested and transformed into a pytensor variable. + # Otherwise, we don't get symbolic linkage between time-varying matrix shapes and the data when the user calls + # pm.set_data + assert isinstance(data, pt.TensorVariable) + matrices = ( + self.subbed_ssm + if self.subbed_ssm is not None + else self._unpack_statespace_with_placeholders() + ) + + n_timestep_variables = tuple( + variable + for variable in explicit_graph_inputs(matrices) + if variable.name == "n_timesteps" + ) + + if n_timestep_variables: + self._shared_timestep = data.shape[0].astype("int32") + replacement_dict = {var: self._shared_timestep for var in n_timestep_variables} + + self.subbed_ssm = graph_replace(self.subbed_ssm, replace=replacement_dict, strict=False) + + def _insert_constant_timestep(self, matrices, step: int | pt.TensorVariable): + """ + Replace any occurrence of the n_timesteps symbolic variable with a constant integer. + + This is used for constructing graphs for prior predictive sampling, where no data is available. + """ + step = pt.as_tensor_variable(step).astype("int32") + + n_timestep_variables = tuple( + variable + for variable in explicit_graph_inputs(matrices) + if variable.name == "n_timesteps" + ) + + if not n_timestep_variables: + return matrices + + replacement_dict = {var: step for var in n_timestep_variables} + return graph_replace(matrices, replace=replacement_dict, strict=False) + def _register_matrices_with_pymc_model(self) -> list[pt.TensorVariable]: """ Add all statespace matrices to the PyMC model currently on the context stack as pm.Deterministic nodes, and @@ -1048,6 +1108,9 @@ def build_statespace_graph( missing_fill_value=missing_fill_value, ) + # Order is important here: only call _insert_data_shape_into_n_timesteps after data has been registered. + self._insert_data_shape_into_n_timesteps(data) + filter_outputs = self.kalman_filter.build_graph( pt.as_tensor_variable(data), *self.unpack_statespace(), @@ -1217,11 +1280,16 @@ def _kalman_filter_outputs_from_dummy_graph( if name in scenario.keys(): pm.set_data({name: scenario[name]}) - x0, P0, c, d, T, Z, R, H, Q = self.unpack_statespace() + matrices = self.unpack_statespace() if data is None: data = self._fit_data + # Replace n_timesteps with data length for time-varying matrices + data_len = data.shape[0] if hasattr(data, "shape") else len(data) + matrices = self._insert_constant_timestep(matrices, data_len) + x0, P0, c, d, T, Z, R, H, Q = matrices + obs_coords = pm_mod.coords.get(OBS_STATE_DIM, None) data, nan_mask = register_data_with_pymc( @@ -1481,8 +1549,8 @@ def _sample_unconditional( pm.Data(**self._fit_exog_data[name]) self._insert_data_variables() - - matrices = [x0, P0, c, d, T, Z, R, H, Q] = self.unpack_statespace() + matrices = self._insert_constant_timestep(self.unpack_statespace(), step=steps) + x0, P0, c, d, T, Z, R, H, Q = matrices if not self.measurement_error: H_jittered = pm.Deterministic( @@ -1814,7 +1882,11 @@ def sample_statespace_matrices( return matrix_idata def sample_filter_outputs( - self, idata, filter_output_names: str | list[str] | None, group: str = "posterior", **kwargs + self, + idata, + filter_output_names: str | list[str] | None = None, + group: str = "posterior", + **kwargs, ): if isinstance(filter_output_names, str): filter_output_names = [filter_output_names] @@ -2302,11 +2374,15 @@ def _build_forecast_model( "P0_slice", cov_frozen[t0_idx], dims=cov_dims[1:] if cov_dims is not None else None ) + # Get matrices with n_timesteps set to forecast length for time-varying models + # Note: matrices already has x0, P0 skipped from _kalman_filter_outputs_from_dummy_graph + forecast_matrices = self._insert_constant_timestep(matrices, len(forecast_index)) + _ = LinearGaussianStateSpace( "forecast", x0, P0, - *matrices, + *forecast_matrices, steps=len(forecast_index), dims=dims, sequence_names=self.kalman_filter.seq_names, @@ -2603,7 +2679,8 @@ def impulse_response_function( self._build_dummy_graph() self._insert_random_variables() - P0, _, c, d, T, Z, R, H, post_Q = self.unpack_statespace() + matrices = self._insert_constant_timestep(self.unpack_statespace(), step=n_steps) + P0, _, c, d, T, Z, R, H, post_Q = matrices x0 = pm.Deterministic("x0_new", pt.zeros(self.k_states), dims=[ALL_STATE_DIM]) if use_posterior_cov: @@ -2632,15 +2709,25 @@ def impulse_response_function( else: shock_trajectory = pt.as_tensor_variable(shock_trajectory) - def irf_step(shock, x, c, T, R): + time_varying_T = T.ndim == 3 + + def irf_step(*args): + if time_varying_T: + shock, T, x, c, R = args + else: + shock, x, c, T, R = args + next_x = c + T @ x + R @ shock return next_x + sequences = [shock_trajectory, T] if time_varying_T else [shock_trajectory] + non_sequences = [c, R] if time_varying_T else [c, T, R] + irf = pytensor.scan( irf_step, - sequences=[shock_trajectory], + sequences=sequences, outputs_info=[x0], - non_sequences=[c, T, R], + non_sequences=non_sequences, n_steps=n_steps, strict=True, return_updates=False, diff --git a/pymc_extras/statespace/models/structural/components/seasonality.py b/pymc_extras/statespace/models/structural/components/seasonality.py index 547718acb..0c487673d 100644 --- a/pymc_extras/statespace/models/structural/components/seasonality.py +++ b/pymc_extras/statespace/models/structural/components/seasonality.py @@ -3,6 +3,7 @@ import numpy as np from pytensor import tensor as pt +from pytensor.tensor import TensorVariable from pymc_extras.statespace.core.properties import ( Coord, @@ -13,283 +14,255 @@ from pymc_extras.statespace.models.structural.core import Component from pymc_extras.statespace.models.structural.utils import _frequency_transition_block +__all__ = ["TimeSeasonality", "FrequencySeasonality"] -class TimeSeasonality(Component): - r""" - Seasonal component, modeled in the time domain - Parameters - ---------- - season_length: int - The number of periods in a single seasonal cycle, e.g. 12 for monthly data with annual seasonal pattern, 7 for - daily data with weekly seasonal pattern, etc. It must be greater than one. - - duration: int, default 1 - Number of time steps for each seasonal period. - This determines how long each seasonal period is held constant before moving to the next. - - innovations: bool, default True - Whether to include stochastic innovations in the strength of the seasonal effect - - name: str, default None - A name for this seasonal component. Used to label dimensions and coordinates. Useful when multiple seasonal - components are included in the same model. Default is ``f"Seasonal[s={season_length}, d={duration}]"`` - - state_names: list of str, default None - List of strings for seasonal effect labels. If provided, it must be of length ``season_length`` times ``duration``. - An example would be ``state_names = ['Mon', 'Tue', 'Wed', 'Thur', 'Fri', 'Sat', 'Sun']`` when data is daily with a weekly - seasonal pattern (``season_length = 7``). - - If None and ``duration = 1``, states will be named as ``[State_0, ..., State_s-1]`` (here s is ``season_length``). - If None and ``duration > 1``, states will be named as ``[State_0_0, ..., State_s-1_d-1]`` (here d is ``duration``). +class TimeSeasonality(Component): + """ + Create a TimeSeasonality component for a state space model. + """ - remove_first_state: bool, default True - If True, the first state will be removed from the model. This is done because there are only ``season_length-1`` degrees of - freedom in the seasonal component, and one state is not identified. If False, the first state will be - included in the model, but it will not be identified -- you will need to handle this in the priors (e.g. with - ZeroSumNormal). + def __init__( + self, + season_length: int, + duration: int = 1, + innovations: bool = True, + name: str | None = None, + state_names: Sequence[str] | None = None, + remove_first_state: bool = True, + observed_state_names: Sequence[str] | None = None, + share_states: bool = False, + start_state: str | int | None = None, + use_time_varying: bool = True, + ): + r""" + Deterministic seasonal pattern with optional stochastic drift. - observed_state_names: list[str] | None, default None - List of strings for observed state labels. If None, defaults to ["data"]. + Many time series exhibit regular patterns tied to the calendar: sales spike in December, electricity demand peaks + on weekday evenings, ice cream consumption rises in summer. This component captures such effects by estimating a + distinct effect for each period within a seasonal cycle, subject to the constraint that effects sum to zero over a + complete cycle. This ensures the seasonality captures deviations from the level, not the level itself. - share_states: bool, default False - Whether latent states are shared across the observed states. If True, there will be only one set of latent - states, which are observed by all observed states. If False, each observed state has its own set of - latent states. This argument has no effect if `k_endog` is 1. + Parameters + ---------- + season_length : int + Number of periods in one complete seasonal cycle. Must be at least 2. - Notes - ----- - A seasonal effect is any pattern that repeats at fixed intervals. There are several ways to model such effects; - here, we present two models that are straightforward extensions of those described in [1]. + duration : int, default 1 + Number of observations each seasonal effect spans. The default (1) means each observation gets its own seasonal + effect. Set ``duration > 1`` when your data frequency is finer than your seasonal pattern—for example, daily + observations with monthly seasonality (``season_length=12``, ``duration≈30``). - **First model** (``remove_first_state=True``) + innovations : bool, default True + If True, seasonal effects evolve stochastically over time, allowing the seasonal pattern to change gradually. + If False, the pattern is deterministic (constant across all cycles). - In this model, the state vector is defined as: + name : str, optional + Label for this component, used in parameter names and coordinates. Defaults to ``"Seasonal[s={season_length}, + d={duration}]"``. - .. math:: - \alpha_t :=(\gamma_t, \ldots, \gamma_{t-d(s-1)+1}), \quad t \ge 0. + state_names : sequence of str, optional + Labels for each seasonal period. Length must equal ``season_length``. These appear in output coordinates, + making results interpretable. For example, a weekly season might use the names of the days of the week. - This vector has length :math:`d(s-1)`, where: + Defaults to ``["{name}_0", "{name}_1", ...]``. - - :math:`s` is the ``seasonal_length`` parameter, and - - :math:`d` is the ``duration`` parameter. + remove_first_state : bool, default True + Controls how the sum-to-zero constraint is enforced. - The components of the initial vector :math:`\alpha_{0}` are given by + - **True** (recommended): Estimates ``s-1`` free parameters; the first seasonal effect is computed as the + negative sum of the others. This is the Durbin-Koopman [1]_ formulation. + - **False**: Estimates all ``s`` parameters. You must enforce the constraint yourself, typically via a + ``ZeroSumNormal`` prior. Use this when you want symmetric treatment of all seasons. - .. math:: - \gamma_{-l} := \tilde{\gamma}_{k_l}, \quad \text{where} \quad k_l := \left\lfloor \frac{l}{d} \right\rfloor \bmod s \quad \text{and} \quad l=0,\ldots, d(s-1)-1. + .. warning:: + With ``remove_first_state=True``, the first element of ``state_names`` does not + appear in the parameter coordinates (since it's not a free parameter). - Here, the values + observed_state_names : sequence of str, optional + Labels for observed series. Defaults to ``["data"]`` for univariate models. - .. math:: - \tilde{\gamma}_{0}, \ldots, \tilde{\gamma}_{s-2}, + share_states : bool, default False + For multivariate models: if True, all series share the same seasonal pattern; if False, + each series has independent seasonal effects. Ignored if ``k_endog=1``. - represent the initial seasonal states. The transition matrix of this model is the :math:`d(s-1) \times d(s-1)` matrix + start_state : str or int, optional + Which seasonal period corresponds to the first observation (t=0). Specify as either a name from + ``state_names`` or an integer index. Use this when your sample doesn't start at the beginning of a cycle— + for instance, if you have weekly seasonality but your data begins on a Wednesday, set ``start_state="Wed"`` + or ``start_state=3``. - .. math:: - \begin{bmatrix} - -\mathbf{1}_d & -\mathbf{1}_d & \cdots & -\mathbf{1}_d & -\mathbf{1}_d \\ - \mathbf{1}_d & \mathbf{0}_d & \cdots & \mathbf{0}_d & \mathbf{0}_d \\ - \mathbf{0}_d & \mathbf{1}_d & \cdots & \mathbf{0}_d & \mathbf{0}_d \\ - \vdots & \vdots & \ddots & \vdots \\ - \mathbf{0}_d & \mathbf{0}_d & \cdots & \mathbf{1}_d & \mathbf{0}_d - \end{bmatrix} + The index refers to positions in the original ``state_names`` (before any removal). - where :math:`\mathbf{1}_d` and :math:`\mathbf{0}_d` denote the :math:`d \times d` identity and null matrices, respectively. + use_time_varying : bool, default True + If True and duration > 1, the transition matrix will be time-varying to correctly handle the shifting + seasonal effects. If False, a single very large and sparse transition matrix will be used. Ignored if + duration = 1. The time-varying approach is suggested for now to keep the state space small. - **Second model** (``remove_first_state=False``) + Notes + ----- + **The Model** - In contrast, the state vector in the second model is defined as: + The observation at time :math:`t` is influenced by a seasonal effect :math:`\gamma_t`: - .. math:: - \alpha_t=(\gamma_t, \ldots, \gamma_{t-ds+1}), \quad t \ge 0. + .. math:: - This vector has length :math:`ds`. The components of the initial state vector :math:`\alpha_{0}` are defined similarly: + y_t = \ldots + \gamma_t + \varepsilon_t - .. math:: - \gamma_{-l} := \tilde{\gamma}_{k_l}, \quad \text{where} \quad k_l := \left\lfloor \frac{l}{d} \right\rfloor \bmod s \quad \text{and} \quad l=0,\ldots, ds-1. + where the seasonal effect cycles through :math:`s` values, repeating every :math:`s` observations (or every + :math:`s \times d` observations if ``duration > 1``). - In this case, the initial seasonal states :math:`\tilde{\gamma}_{0}, \ldots, \tilde{\gamma}_{s-1}` are required to satisfy the following condition: + To ensure identifiability—separating seasonality from the overall level—we impose: - .. math:: - \sum_{i=0}^{s-1} \tilde{\gamma}_{i} = 0. + .. math:: - The transition matrix of this model is the following :math:`ds \times ds` circulant matrix: + \sum_{j=0}^{s-1} \gamma_j = 0 - .. math:: - \begin{bmatrix} - 0 & 1 & 0 & \cdots & 0 \\ - 0 & 0 & 1 & \cdots & 0 \\ - \vdots & \vdots & \ddots & \ddots & \vdots \\ - 0 & 0 & \cdots & 0 & 1 \\ - 1 & 0 & \cdots & 0 & 0 - \end{bmatrix} - - To give interpretation to the :math:`\gamma` terms, it is helpful to work through the algebra for a simple - example. Let :math:`s=4`, :math:`d=1`, ``remove_first_state=True``, and omit the shock term. Then, we have - :math:`\gamma_{-i} = \tilde{\gamma}_{-i}`, for :math:`i=-2,\ldots, 0` and the value of the seasonal component - for the first 5 timesteps will be: + **Enforcing the Constraint: Two Approaches** - .. math:: - \begin{align} - \gamma_1 &= -\gamma_0 - \gamma_{-1} - \gamma_{-2} \\ - \gamma_2 &= -\gamma_1 - \gamma_0 - \gamma_{-1} \\ - &= -(-\gamma_0 - \gamma_{-1} - \gamma_{-2}) - \gamma_0 - \gamma_{-1} \\ - &= (\gamma_0 - \gamma_0 )+ (\gamma_{-1} - \gamma_{-1}) + \gamma_{-2} \\ - &= \gamma_{-2} \\ - \gamma_3 &= -\gamma_2 - \gamma_1 - \gamma_0 \\ - &= -\gamma_{-2} - (-\gamma_0 - \gamma_{-1} - \gamma_{-2}) - \gamma_0 \\ - &= (\gamma_{-2} - \gamma_{-2}) + \gamma_{-1} + (\gamma_0 - \gamma_0) \\ - &= \gamma_{-1} \\ - \gamma_4 &= -\gamma_3 - \gamma_2 - \gamma_1 \\ - &= -\gamma_{-1} - \gamma_{-2} -(-\gamma_0 - \gamma_{-1} - \gamma_{-2}) \\ - &= (\gamma_{-2} - \gamma_{-2}) + (\gamma_{-1} - \gamma_{-1}) + \gamma_0 \\ - &= \gamma_0 \\ - \gamma_5 &= -\gamma_4 - \gamma_3 - \gamma_2 \\ - &= -\gamma_0 - \gamma_{-1} - \gamma_{-2} \\ - &= \gamma_1 - \end{align} + 1. **Durbin-Koopman formulation** (``remove_first_state=True``) - This exercise shows that, given a list ``initial_conditions`` of length ``s-1``, the effects of this model will be: + Parameterize only :math:`\gamma_1, \ldots, \gamma_{s-1}` as free parameters, then define :math:`\gamma_0 = + -\sum_{j=1}^{s-1} \gamma_j`. The state vector tracks these :math:`s-1` values, and the transition matrix + rotates through the cycle while computing the implied :math:`\gamma_0` automatically. The state transition + follows: - - Period 1: ``-sum(initial_conditions)`` - - Period 2: ``initial_conditions[-1]`` - - Period 3: ``initial_conditions[-2]`` - - ... - - Period s: ``initial_conditions[0]`` - - Period s+1: ``-sum(initial_condition)`` + .. math:: - And so on. So for interpretation, the ``season_length - 1`` initial states are, when reversed, the coefficients - associated with ``state_names[1:]``. + T_\gamma = \begin{bmatrix} + -1 & -1 & \cdots & -1 \\ + 1 & 0 & \cdots & 0 \\ + 0 & 1 & \ddots & \vdots \\ + \vdots & & \ddots & 0 + \end{bmatrix} - In the next example, we set :math:`s=2`, :math:`d=2`, ``remove_first_state=True``, and omit the shock term. - By definition, the initial vector :math:`\alpha_{0}` is + This formulation is statistically efficient (minimal state dimension) and guarantees the constraint by + construction. - .. math:: - \alpha_0=(\tilde{\gamma}_{0}, \tilde{\gamma}_{0}, \tilde{\gamma}_{-1}, \tilde{\gamma}_{-1}) + 2. **Unconstrained formulation** (``remove_first_state=False``) - and the transition matrix is + All :math:`s` seasonal effects are free parameters. The state simply cycles via a permutation matrix. The + sum-to-zero constraint must be imposed through the prior (e.g., ``pm.ZeroSumNormal``). This formulation + treats all states symmetrically and can be more intuitive when you want to directly interpret each seasonal + effect, but it has a slightly larger state dimension. - .. math:: - \begin{bmatrix} - -1 & 0 & -1 & 0 \\ - 0 & -1 & 0 & -1 \\ - 1 & 0 & 0 & 0 \\ - 0 & 1 & 0 & 0 \\ - \end{bmatrix} + **Duration: Handling Mismatched Frequencies** - It is easy to verify that: + When ``duration > 1``, each seasonal effect is held constant for :math:`d` consecutive observations before + transitioning to the next. This produces a step-function pattern and is useful when data frequency exceeds + seasonal frequency (e.g., when observations are daily, but the seasonal pattern repeats monthly). - .. math:: - \begin{align} - \gamma_1 &= -\tilde{\gamma}_0 - \tilde{\gamma}_{-1}\\ - \gamma_2 &= -(-\tilde{\gamma}_0 - \tilde{\gamma}_{-1})-\tilde{\gamma}_0\\ - &= \tilde{\gamma}_{-1}\\ - \gamma_3 &= -\tilde{\gamma}_{-1} +(\tilde{\gamma}_0 + \tilde{\gamma}_{-1})\\ - &= \tilde{\gamma}_{0}\\ - \gamma_4 &= -\tilde{\gamma}_0 - \tilde{\gamma}_{-1}.\\ - \end{align} + The total cycle length becomes :math:`s \times d` observations. - .. warning:: - Although the ``state_names`` argument expects a list of length ``season_length`` times ``duration``, - only ``state_names[duration:]`` will be saved as model dimensions, since the first coefficient is not identified - (it is defined as :math:`-\sum_{i=1}^{s-1} \tilde{\gamma}_{-i}`). + **Stochastic Seasonality** - Examples - -------- - Estimate monthly with a model with a gaussian random walk trend and monthly seasonality: + With ``innovations=True``, seasonal effects evolve over time: - .. code:: python + .. math:: - from pymc_extras.statespace import structural as st - import pymc as pm - import pytensor.tensor as pt - import pandas as pd + \gamma_{j,t+1} = \gamma_{j,t} + \omega_{j,t}, \quad \omega_{j,t} \sim N(0, \sigma^2_\gamma) - # Get month names - state_names = pd.date_range('1900-01-01', '1900-12-31', freq='MS').month_name().tolist() + This allows the seasonal pattern to adapt—capturing phenomena like shifting holiday shopping patterns or + changing commuter behavior. The latent season effect evolves with a Gaussian random walk. The smoothness of + evolution is controlled by the prior on ``sigma_{name}``. - # Build the structural model - grw = st.LevelTrend(order=1, innovations_order=1) - annual_season = st.TimeSeasonality( - season_length=12, name="annual", state_names=state_names, innovations=False - ) - ss_mod = (grw + annual_season).build() + Examples + -------- + Weekly seasonality for daily data: - with pm.Model(coords=ss_mod.coords) as model: - P0 = pm.Deterministic('P0', pt.eye(ss_mod.k_states) * 10, dims=ss_mod.param_dims['P0']) + >>> mod = st.TimeSeasonality( + ... season_length=7, + ... state_names=['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'], + ... start_state='Mon', # Data starts on Monday + ... name='day_of_week', + ... ) - initial_level_trend = pm.Deterministic( - "initial_level_trend", pt.zeros(1), dims=ss_mod.param_dims["initial_level_trend"] - ) - sigma_level_trend = pm.HalfNormal( - "sigma_level_trend", sigma=1e-6, dims=ss_mod.param_dims["sigma_level_trend"] - ) - params_annual = pm.Normal("params_annual", sigma=1e-2, dims=ss_mod.param_dims["params_annual"]) + Monthly seasonality for daily data (each month held constant for ~30 days): - ss_mod.build_statespace_graph(data) - idata = pm.sample( - nuts_sampler="nutpie", nuts_sampler_kwargs={"backend": "JAX", "gradient_backend": "JAX"} - ) + >>> mod = st.TimeSeasonality( + ... season_length=12, + ... duration=30, + ... name='month', + ... ) - References - ---------- - .. [1] Durbin, James, and Siem Jan Koopman. 2012. - Time Series Analysis by State Space Methods: Second Edition. - Oxford University Press. - """ + See Also + -------- + FrequencySeasonality : + Alternative parameterization using Fourier basis functions. More compact for long seasonal periods but less + interpretable (effects do not map to specific calendar periods). Can handle non-integer season lengths. - def __init__( - self, - season_length: int, - duration: int = 1, - innovations: bool = True, - name: str | None = None, - state_names: list | None = None, - remove_first_state: bool = True, - observed_state_names: list[str] | None = None, - share_states: bool = False, - ): + References + ---------- + .. [1] Durbin, J., & Koopman, S. J. (2012). *Time Series Analysis by State Space Methods* (2nd ed.). Oxford + University Press. Section 3.2. + """ if observed_state_names is None: observed_state_names = ["data"] - if season_length <= 1 or not isinstance(season_length, int): + if not isinstance(season_length, int) or season_length <= 1: raise ValueError( f"season_length must be an integer greater than 1, got {season_length}" ) - if duration <= 0 or not isinstance(duration, int): + if not isinstance(duration, int) or duration <= 0: raise ValueError(f"duration must be a positive integer, got {duration}") if name is None: name = f"Seasonal[s={season_length}, d={duration}]" + + # The user only provides unique names. If duration > 1, the states will be repeated with suffixes _0, _1, ..., + # _{duration-1} to create unique state names for each position in the cycle. if state_names is None: - if duration > 1: - state_names = [ - f"{name}_{i}_{j}" for i in range(season_length) for j in range(duration) - ] - else: - state_names = [f"{name}_{i}" for i in range(season_length)] + state_names = [f"{name}_{i}" for i in range(season_length)] else: - if len(state_names) != season_length * duration: + if len(state_names) != season_length: raise ValueError( - f"state_names must be a list of length season_length*duration, got {len(state_names)}" + f"state_names must be a list of length season_length={season_length}, got {len(state_names)}" ) state_names = list(state_names) + # Validate and convert start_state to an index + if start_state is not None: + if isinstance(start_state, str): + if start_state not in state_names: + raise ValueError( + f"start_state '{start_state}' not found in state_names. " + f"Available names: {state_names}" + ) + start_idx = state_names.index(start_state) + elif isinstance(start_state, int): + if not (0 <= start_state < season_length): + raise ValueError( + f"start_state index must be in [0, {season_length}), got {start_state}" + ) + start_idx = start_state + else: + raise ValueError( + f"start_state must be a string (state name) or int (index), got {type(start_state)}" + ) + else: + start_idx = 0 + + self.start_idx = start_idx self.share_states = share_states self.innovations = innovations self.duration = duration self.remove_first_state = remove_first_state self.season_length = season_length + self.use_time_varying = use_time_varying if self.remove_first_state: - # In traditional models, the first state isn't identified, so we can help out the user by automatically - # discarding it. - # TODO: Can this be stashed and reconstructed automatically somehow? - state_names = state_names[duration:] + # TODO: Can we somehow use a transformation to preserve all of the user's states? + state_names = state_names[1:] self.provided_state_names = state_names - k_states = (season_length - int(self.remove_first_state)) * duration + # When using time-varying transition matrices with duration > 1, we don't need + # to expand the state dimension. The time-varying T handles the duration logic. + use_tv = use_time_varying and duration > 1 + if use_tv: + k_states = season_length - int(remove_first_state) + else: + k_states = duration * (season_length - int(remove_first_state)) + k_endog = len(observed_state_names) k_posdef = int(innovations) @@ -307,18 +280,39 @@ def __init__( share_states=share_states, ) + @property + def n_seasons(self) -> int: + """Number of unique seasonal parameters (season_length - 1 if remove_first_state, else season_length).""" + return self.season_length - int(self.remove_first_state) + + @property + def _uses_time_varying_transition(self) -> bool: + """Whether this component uses time-varying transition matrices.""" + return self.use_time_varying and self.duration > 1 + def set_states(self) -> State | tuple[State, ...] | None: observed_state_names = self.base_observed_state_names + # Expand state names for duration > 1, but NOT when using time-varying T + # (time-varying T keeps the state compact) + if self.duration > 1 and not self._uses_time_varying_transition: + expanded_state_names = [ + f"{state_name}_{i}" + for state_name in self.provided_state_names + for i in range(self.duration) + ] + else: + expanded_state_names = self.provided_state_names + if self.share_states: state_names = [ - f"{state_name}[{self.name}_shared]" for state_name in self.provided_state_names + f"{state_name}[{self.name}_shared]" for state_name in expanded_state_names ] else: state_names = [ f"{state_name}[{endog_name}]" for endog_name in observed_state_names - for state_name in self.provided_state_names + for state_name in expanded_state_names ] hidden_states = [State(name=name, observed=False, shared=True) for name in state_names] @@ -330,11 +324,13 @@ def set_states(self) -> State | tuple[State, ...] | None: def set_parameters(self) -> Parameter | tuple[Parameter, ...] | None: k_endog = self.k_endog k_endog_effective = 1 if self.share_states else k_endog - k_states = self.k_states // k_endog_effective + k_unique_params = self.n_seasons seasonal_param = Parameter( name=f"params_{self.name}", - shape=(k_states,) if k_endog == 1 else (k_endog, k_states), + shape=(k_unique_params,) + if k_endog_effective == 1 + else (k_endog_effective, k_unique_params), dims=(f"state_{self.name}",) if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}"), @@ -379,71 +375,245 @@ def set_coords(self) -> Coord | tuple[Coord, ...] | None: return tuple(coords_container) - def make_symbolic_graph(self) -> None: - k_endog = self.k_endog - k_endog_effective = 1 if self.share_states else k_endog - k_states = self.k_states // k_endog_effective - duration = self.duration - - k_unique_states = k_states // duration - k_posdef = self.k_posdef // k_endog_effective - + def _k_endog_effective(self) -> int: + return 1 if self.share_states else self.k_endog + + def _k_states_per_endog(self) -> int: + return self.k_states // self._k_endog_effective() + + def _k_posdef_per_endog(self) -> int: + return self.k_posdef // self._k_endog_effective() + + def _build_dk_seasonal_rotation(self) -> TensorVariable: + """Build the (s-1) x (s-1) Durbin-Koopman seasonal transition matrix.""" + n = self.season_length - 1 + T_gamma = pt.zeros((n, n)) + T_gamma = pt.set_subtensor(T_gamma[0, :], -1.0) + if n > 1: + T_gamma = pt.set_subtensor(T_gamma[1:, :-1], pt.eye(n - 1)) + return T_gamma + + def _build_circulant_rotation(self) -> TensorVariable: + """Build simple circulant permutation matrix of size season_length.""" + n = self.season_length + T = pt.eye(n, k=1) + return pt.set_subtensor(T[-1, 0], 1) + + def _build_static_transition(self) -> TensorVariable: + """Build static transition matrix (2D) for duration >= 1.""" + k_states = self._k_states_per_endog() + + if not self.remove_first_state: + T_rotate = self._build_circulant_rotation() + + if self.duration == 1: + return T_rotate + + # Duration > 1: block structure with circulant rotation at wrap + n = self.season_length + I_n = pt.eye(n) + T = pt.zeros((k_states, k_states)) + + for k in range(self.duration - 1): + row_slice = slice(k * n, (k + 1) * n) + col_slice = slice((k + 1) * n, (k + 2) * n) + T = pt.set_subtensor(T[row_slice, col_slice], I_n) + + last_row_slice = slice((self.duration - 1) * n, self.duration * n) + T = pt.set_subtensor(T[last_row_slice, :n], T_rotate) + return T + + if self.duration == 1: + return self._build_dk_seasonal_rotation() + + # Duration > 1: block structure with D&K rotation at wrap + n = self.season_length - 1 + T_gamma = self._build_dk_seasonal_rotation() + I_n = pt.eye(n) + T = pt.zeros((k_states, k_states)) + + for k in range(self.duration - 1): + row_slice = slice(k * n, (k + 1) * n) + col_slice = slice((k + 1) * n, (k + 2) * n) + T = pt.set_subtensor(T[row_slice, col_slice], I_n) + + last_row_slice = slice((self.duration - 1) * n, self.duration * n) + T = pt.set_subtensor(T[last_row_slice, :n], T_gamma) + return T + + def _build_time_varying_transition(self) -> TensorVariable: + """Build time-varying transition matrix (3D) for duration > 1 with time-varying mode.""" if self.remove_first_state: - # In this case, parameters are normalized to sum to zero, so the current state is the negative sum of - # all previous states. - zero_d = pt.zeros((self.duration, self.duration)) - id_d = pt.eye(self.duration) - - row_blocks = [] - - # First row: all -1_d blocks - first_row = [-id_d for _ in range(self.season_length - 1)] - row_blocks.append(pt.concatenate(first_row, axis=1)) - - # Rows 2 to season_length-1: shifted identity blocks - for i in range(self.season_length - 2): - row = [] - for j in range(self.season_length - 1): - if j == i: - row.append(id_d) - else: - row.append(zero_d) - row_blocks.append(pt.concatenate(row, axis=1)) - - # Stack blocks - T = pt.concatenate(row_blocks, axis=0) + n = self.season_length - 1 + T_rotate = self._build_dk_seasonal_rotation() else: - # In this case we assume the user to be responsible for ensuring the states sum to zero, so T is just a - # circulant matrix that cycles between the states. - T = pt.eye(k_states, k=1) - T = pt.set_subtensor(T[-1, 0], 1) + n = self.season_length + T_rotate = self._build_circulant_rotation() - self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog_effective)]) + T_hold = pt.eye(n) + time_idx = pt.arange(self.n_timesteps) + is_rotation_step = pt.eq(time_idx % self.duration, self.duration - 1) + + return pt.where( + is_rotation_step[:, None, None], + pt.broadcast_to(T_rotate, (self.n_timesteps, n, n)), + pt.broadcast_to(T_hold, (self.n_timesteps, n, n)), + ) + + def _build_transition_matrix(self) -> TensorVariable: + """Build the full transition matrix, handling multivariate via block_diag.""" + k_endog_effective = self._k_endog_effective() + + if self._uses_time_varying_transition: + T_single = self._build_time_varying_transition() + if k_endog_effective == 1: + return T_single + # For multivariate: build block diagonal for each time step + # T_single is (n_timesteps, n, n), we need (n_timesteps, k_states, k_states) + blocks = [T_single for _ in range(k_endog_effective)] + # Stack along a new axis then reshape to block diagonal per timestep + return pt.linalg.block_diag(*blocks) + else: + T_single = self._build_static_transition() + return pt.linalg.block_diag(*[T_single for _ in range(k_endog_effective)]) + + def _build_design_matrix(self) -> TensorVariable: + """Build the design matrix Z that extracts the first state.""" + k_states = self._k_states_per_endog() + k_endog_effective = self._k_endog_effective() Z = pt.zeros((1, k_states))[0, 0].set(1) - self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog_effective)]) + return pt.linalg.block_diag(*[Z for _ in range(k_endog_effective)]) + + def _build_initial_state_dk(self, initial_params: TensorVariable) -> TensorVariable: + """Build initial state for Durbin-Koopman formulation (remove_first_state=True).""" + k_endog_effective = self._k_endog_effective() + k_unique_params = self.season_length - 1 + use_tv = self._uses_time_varying_transition + + if k_endog_effective == 1: + gamma_0 = -pt.sum(initial_params) + if k_unique_params > 1: + reordered = pt.concatenate([[gamma_0], initial_params[1:][::-1]]) + else: + reordered = pt.atleast_1d(gamma_0) + # Only tile when NOT using time-varying transition + if use_tv: + return reordered + else: + return pt.tile(reordered, self.duration) + else: + gamma_0 = -pt.sum(initial_params, axis=1, keepdims=True) + if k_unique_params > 1: + reordered = pt.concatenate([gamma_0, initial_params[:, 1:][:, ::-1]], axis=1) + else: + reordered = gamma_0 + if use_tv: + return reordered.ravel() + else: + return pt.tile(reordered, (1, self.duration)).ravel() + + def _build_initial_state_simple(self, initial_params: TensorVariable) -> TensorVariable: + """Build initial state for simple formulation (remove_first_state=False).""" + k_endog_effective = self._k_endog_effective() + use_tv = self._uses_time_varying_transition + + if k_endog_effective == 1: + if use_tv: + return initial_params + else: + return pt.extra_ops.repeat(initial_params, self.duration, axis=0) + else: + if use_tv: + return initial_params.ravel() + else: + return pt.extra_ops.repeat(initial_params, self.duration, axis=1).ravel() + + def _apply_start_state_shift( + self, initial_state: TensorVariable, T: TensorVariable + ) -> TensorVariable: + """Shift initial state to account for start_state offset.""" + if self.start_idx == 0: + return initial_state + + k_endog_effective = self._k_endog_effective() + + if self._uses_time_varying_transition: + # Time-varying case: shift by start_idx rotations + # Each rotation is one application of T_rotate, which happens every 'duration' steps + if self.remove_first_state: + T_rotate = self._build_dk_seasonal_rotation() + else: + T_rotate = self._build_circulant_rotation() + if k_endog_effective == 1: + return pt.linalg.matrix_power(T_rotate, self.start_idx) @ initial_state + else: + T_full = pt.linalg.block_diag(*[T_rotate for _ in range(k_endog_effective)]) + return pt.linalg.matrix_power(T_full, self.start_idx) @ initial_state + else: + # Static case: shift by start_idx * duration applications of T + shift_steps = self.start_idx * self.duration + if k_endog_effective == 1: + return pt.linalg.matrix_power(T, shift_steps) @ initial_state + else: + T_full = pt.linalg.block_diag(*[T for _ in range(k_endog_effective)]) + return pt.linalg.matrix_power(T_full, shift_steps) @ initial_state + + def _build_selection_and_state_cov(self) -> None: + """Build selection matrix R and state covariance Q for innovations.""" + if not self.innovations: + return + + k_endog_effective = self._k_endog_effective() + k_states = self._k_states_per_endog() + k_posdef = self._k_posdef_per_endog() + + R = pt.zeros((k_states, k_posdef))[0, 0].set(1.0) + self.ssm["selection", :, :] = pt.join(0, *[R for _ in range(k_endog_effective)]) - initial_states = self.make_and_register_variable( + sigma = self.make_and_register_variable( + f"sigma_{self.name}", + shape=() if k_endog_effective == 1 else (k_endog_effective,), + ) + cov_idx = ("state_cov", *np.diag_indices(k_posdef * k_endog_effective)) + self.ssm[cov_idx] = sigma**2 + + def make_symbolic_graph(self) -> None: + k_endog_effective = self._k_endog_effective() + k_unique_params = self.n_seasons + + # Transition matrix + T = self._build_transition_matrix() + if T.ndim == 3: + self.ssm["transition"] = T + else: + self.ssm["transition", :, :] = T + + # Design matrix + self.ssm["design", :, :] = self._build_design_matrix() + + # Initial state parameters + initial_params = self.make_and_register_variable( f"params_{self.name}", - shape=(k_unique_states,) + shape=(k_unique_params,) if k_endog_effective == 1 - else (k_endog_effective, k_unique_states), + else (k_endog_effective, k_unique_params), ) - if k_endog_effective == 1: - self.ssm["initial_state", :] = pt.extra_ops.repeat(initial_states, duration, axis=0) + + # Build initial state + if self.remove_first_state: + initial_state = self._build_initial_state_dk(initial_params) else: - self.ssm["initial_state", :] = pt.extra_ops.repeat( - initial_states, duration, axis=1 - ).ravel() + initial_state = self._build_initial_state_simple(initial_params) - if self.innovations: - R = pt.zeros((k_states, k_posdef))[0, 0].set(1.0) - self.ssm["selection", :, :] = pt.join(0, *[R for _ in range(k_endog_effective)]) - season_sigma = self.make_and_register_variable( - f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,) - ) - cov_idx = ("state_cov", *np.diag_indices(k_posdef * k_endog_effective)) - self.ssm[cov_idx] = season_sigma**2 + # Apply start_state shift (handles time-varying vs static internally) + T_for_shift = self._build_static_transition() + initial_state = self._apply_start_state_shift(initial_state, T_for_shift) + + self.ssm["initial_state", :] = initial_state + + # Selection and state covariance + self._build_selection_and_state_cov() class FrequencySeasonality(Component): @@ -507,7 +677,7 @@ class FrequencySeasonality(Component): def __init__( self, - season_length: int, + season_length: int | float, n: int | None = None, name: str | None = None, innovations: bool = True, @@ -517,11 +687,16 @@ def __init__( if observed_state_names is None: observed_state_names = ["data"] + if not isinstance(season_length, int | float) or season_length <= 0: + raise ValueError(f"season_length must be a positive number, got {season_length}") + self.share_states = share_states k_endog = len(observed_state_names) if n is None: n = int(season_length / 2) + if not isinstance(n, int) or n <= 0: + raise ValueError(f"n must be a positive integer, got {n}") if name is None: name = f"Frequency[s={season_length}, n={n}]" diff --git a/pymc_extras/statespace/models/structural/core.py b/pymc_extras/statespace/models/structural/core.py index dda5269c0..c51265a0d 100644 --- a/pymc_extras/statespace/models/structural/core.py +++ b/pymc_extras/statespace/models/structural/core.py @@ -534,6 +534,8 @@ def __init__( self._k_posdef = k_posdef self._k_endog = len(base_observed_state_names) or k_endog self._k_states = k_states + self._n_timesteps_placeholder = pt.iscalar("n_timesteps") + self.base_state_names = base_state_names self.base_observed_state_names = base_observed_state_names @@ -720,6 +722,10 @@ def _name_to_variable(self): def _name_to_data(self): return self._tensor_data_info.to_dict() + @property + def n_timesteps(self) -> Variable: + return self._n_timesteps_placeholder + def make_and_register_variable(self, name, shape, dtype=floatX) -> Variable: r""" Helper function to create a pytensor symbolic variable and register it in the _name_to_variable dictionary diff --git a/tests/statespace/core/test_statespace.py b/tests/statespace/core/test_statespace.py index c8e343609..f83d35d4d 100644 --- a/tests/statespace/core/test_statespace.py +++ b/tests/statespace/core/test_statespace.py @@ -140,6 +140,44 @@ def ss_mod_no_exog_dt(rng): return ll.build() +@pytest.fixture(scope="session") +def ss_mod_time_varying(): + """A minimal model with time-varying observation intercept (uses n_timesteps).""" + + class TimeVaryingInterceptModel(PyMCStateSpace): + def __init__(self): + super().__init__(k_states=1, k_endog=1, k_posdef=1) + + def make_symbolic_graph(self) -> None: + self.ssm["transition", 0, 0] = 1.0 + self.ssm["design", 0, 0] = 1.0 + self.ssm["selection", 0, 0] = 1.0 + self.ssm["state_cov", 0, 0] = 1.0 + + # Time-varying observation intercept: slope * arange(n_timesteps) + slope = self.make_and_register_variable("slope", ()) + time_trend = slope * pt.arange(self.n_timesteps) + self.ssm["obs_intercept"] = time_trend[:, None] + + @property + def param_names(self) -> list[str]: + return ["slope"] + + @property + def state_names(self) -> list[str]: + return ["level"] + + @property + def observed_states(self) -> list[str]: + return ["level"] + + @property + def shock_names(self) -> list[str]: + return ["level"] + + return TimeVaryingInterceptModel() + + @pytest.fixture(scope="session") def exog_data(rng): # simulate data @@ -336,6 +374,23 @@ def pymc_mod_no_exog_dt(ss_mod_no_exog_dt, rng): return m +@pytest.fixture(scope="session") +def pymc_mod_time_varying(ss_mod_time_varying, rng): + """PyMC model with time-varying observation intercept.""" + n_obs = 40 + y = rng.normal(size=(n_obs, 1)).astype(floatX) + + with pm.Model(coords=ss_mod_time_varying.coords) as m: + slope = pm.Normal("slope", mu=0, sigma=1) + P0 = pm.Deterministic( + "P0", pt.eye(ss_mod_time_varying.k_states) * 1.0, dims=["state", "state_aux"] + ) + x0 = pm.Normal("x0", dims=["state"]) + ss_mod_time_varying.build_statespace_graph(y) + + return m + + @pytest.fixture(scope="session") def idata(pymc_mod, rng, mock_pymc_sample): with pymc_mod: @@ -400,6 +455,16 @@ def idata_no_exog_dt(pymc_mod_no_exog_dt, rng, mock_pymc_sample): return idata +@pytest.fixture(scope="session") +def idata_time_varying(pymc_mod_time_varying, rng, mock_pymc_sample): + """Inference data for time-varying model.""" + with pymc_mod_time_varying: + idata = pm.sample(draws=10, tune=0, chains=1, random_seed=rng) + idata_prior = pm.sample_prior_predictive(draws=10, random_seed=rng) + idata.extend(idata_prior) + return idata + + def test_invalid_filter_name_raises(): msg = "The following are valid filter types: " + ", ".join(list(FILTER_FACTORY.keys())) with pytest.raises(NotImplementedError, match=msg): @@ -1256,3 +1321,55 @@ def test_sample_filter_outputs(rng, exog_ss_mod, idata_exog): incorrect_outputs = ["filter_states", "filter_covariances"] with pytest.raises(ValueError, match=re.escape(msg)): exog_ss_mod.sample_filter_outputs(idata_exog, filter_output_names=incorrect_outputs) + + +class TestTimeVaryingTransition: + """Tests for models with time-varying transition matrices (n_timesteps placeholder).""" + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + def test_sample_conditional_prior(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.sample_conditional_prior(idata_time_varying) + assert "filtered_prior" in result + assert "smoothed_prior" in result + assert "predicted_prior" in result + assert not np.any(np.isnan(result["filtered_prior"].values)) + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + def test_sample_conditional_posterior(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.sample_conditional_posterior(idata_time_varying) + assert "filtered_posterior" in result + assert "smoothed_posterior" in result + assert "predicted_posterior" in result + assert not np.any(np.isnan(result["filtered_posterior"].values)) + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + def test_sample_unconditional_prior(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.sample_unconditional_prior(idata_time_varying) + assert "prior_latent" in result + assert "prior_observed" in result + assert not np.any(np.isnan(result["prior_latent"].values)) + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + def test_sample_unconditional_posterior(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.sample_unconditional_posterior(idata_time_varying) + assert "posterior_latent" in result + assert "posterior_observed" in result + assert not np.any(np.isnan(result["posterior_latent"].values)) + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + @pytest.mark.filterwarnings("ignore:No start date provided") + def test_forecast(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.forecast(idata_time_varying, periods=10) + assert "forecast_latent" in result + assert "forecast_observed" in result + assert result["forecast_latent"].shape[2] == 10 + assert not np.any(np.isnan(result["forecast_latent"].values)) + + @pytest.mark.filterwarnings("ignore:No time index found on the supplied data.") + def test_impulse_response_function(self, ss_mod_time_varying, idata_time_varying): + result = ss_mod_time_varying.impulse_response_function( + idata_time_varying, n_steps=20, shock_size=1.0 + ) + assert "irf" in result + assert result["irf"].shape[2] == 20 + assert not np.any(np.isnan(result["irf"].values)) diff --git a/tests/statespace/models/structural/components/test_seasonality.py b/tests/statespace/models/structural/components/test_seasonality.py index c72041ed3..4aafb5561 100644 --- a/tests/statespace/models/structural/components/test_seasonality.py +++ b/tests/statespace/models/structural/components/test_seasonality.py @@ -1,5 +1,7 @@ import numpy as np +import pymc as pm import pytensor +import pytensor.tensor as pt import pytest from pytensor import config @@ -14,7 +16,7 @@ RTOL = 0 if config.floatX.endswith("64") else 1e-6 -@pytest.mark.parametrize("s", [10, 25, 50]) +@pytest.mark.parametrize("s", [2, 10, 25]) @pytest.mark.parametrize("d", [1, 3]) @pytest.mark.parametrize("innovations", [True, False]) @pytest.mark.parametrize("remove_first_state", [True, False]) @@ -24,19 +26,14 @@ "ignore:invalid value encountered in matmul:RuntimeWarning", ) def test_time_seasonality(s, d, innovations, remove_first_state, rng): - def random_word(rng): - return "".join(rng.choice(list("abcdefghijklmnopqrstuvwxyz")) for _ in range(5)) - - state_names = tuple(random_word(rng) for _ in range(s * d)) mod = st.TimeSeasonality( season_length=s, duration=d, innovations=innovations, name="season", - state_names=state_names, remove_first_state=remove_first_state, ) - x0 = np.zeros(mod.k_states // mod.duration, dtype=config.floatX) + x0 = np.zeros(mod.n_seasons, dtype=config.floatX) x0[0] = 1 params = {"params_season": x0} @@ -48,21 +45,84 @@ def random_word(rng): if not innovations: assert_pattern_repeats(y, s * d, atol=ATOL, rtol=RTOL) - # Check coords mod = mod.build(verbose=False) _assert_basic_coords_correct(mod) - test_slice = slice(d, None) if remove_first_state else slice(None) - assert mod.coords["state_season"] == state_names[test_slice] + test_slice = slice(1, None) if remove_first_state else slice(None) + expected_states = tuple(f"season_{i}" for i in range(s)) + assert mod.coords["state_season"] == expected_states[test_slice] @pytest.mark.parametrize("d", [1, 3]) -@pytest.mark.parametrize( - "remove_first_state", [True, False], ids=["remove_first_state", "keep_first_state"] -) +@pytest.mark.parametrize("start_state", [0, 2, "state_2"]) +def test_time_seasonality_start_state(d, start_state, rng): + s = 4 + state_names = [f"state_{i}" for i in range(s)] + + mod = st.TimeSeasonality( + season_length=s, + duration=d, + innovations=False, + name="season", + state_names=state_names, + remove_first_state=True, + start_state=start_state, + ) + + params = np.array([1.0, 2.0, 3.0], dtype=config.floatX) + implied_gamma0 = -params.sum() + default_seasons = [implied_gamma0, params[0], params[1], params[2]] + + start_idx = state_names.index(start_state) if isinstance(start_state, str) else start_state + expected_seasons = default_seasons[start_idx:] + default_seasons[:start_idx] + + param_dict = {"params_season": params} + x, y = simulate_from_numpy_model(mod, rng, param_dict, steps=s * d * 2) + y = y.ravel() + + for season_idx, expected_val in enumerate(expected_seasons): + for duration_step in range(d): + t = season_idx * d + duration_step + np.testing.assert_allclose(y[t], expected_val, atol=ATOL, rtol=RTOL) + + +@pytest.mark.filterwarnings("ignore:No time index found:UserWarning") +@pytest.mark.parametrize("use_time_varying", [True, False], ids=["time_varying", "time_invariant"]) +def test_time_seasonality_prior_sampling(use_time_varying): + s, d = 4, 2 + state_names = ["Q1", "Q2", "Q3", "Q4"] + + mod = st.TimeSeasonality( + season_length=s, + duration=d, + innovations=False, + name="quarterly", + state_names=state_names, + remove_first_state=True, + use_time_varying=use_time_varying, + ) + ss_mod = mod.build(verbose=False) + + expected_n_states = (s - 1) if use_time_varying else d * (s - 1) + assert len(ss_mod.coords["state"]) == expected_n_states + assert len(ss_mod.coords["state_quarterly"]) == s - 1 + + with pm.Model(coords=ss_mod.coords) as model: + _P0 = pm.Deterministic("P0", pt.eye(ss_mod.k_states) * 10, dims=ss_mod.param_dims["P0"]) + _params = pm.Normal( + "params_quarterly", mu=0, sigma=1, dims=ss_mod.param_dims["params_quarterly"] + ) + ss_mod.build_statespace_graph(np.zeros((20, 1))) + prior = pm.sample_prior_predictive(draws=5) + + assert prior.prior["params_quarterly"].shape == (1, 5, s - 1) + + +@pytest.mark.parametrize("d", [1, 3]) +@pytest.mark.parametrize("remove_first_state", [True, False]) def test_time_seasonality_multiple_observed(rng, d, remove_first_state): s = 3 - state_names = [f"state_{i}_{j}" for i in range(s) for j in range(d)] + state_names = [f"state_{i}" for i in range(s)] mod = st.TimeSeasonality( season_length=s, duration=d, @@ -71,81 +131,78 @@ def test_time_seasonality_multiple_observed(rng, d, remove_first_state): state_names=state_names, observed_state_names=["data_1", "data_2"], remove_first_state=remove_first_state, + use_time_varying=False, ) - x0 = np.zeros((mod.k_endog, mod.k_states // mod.k_endog // mod.duration), dtype=config.floatX) - - expected_states = [ - f"state_{i}_{j}[data_{k}]" - for k in range(1, 3) - for i in range(int(remove_first_state), s) - for j in range(d) - ] - assert mod.state_names == tuple(expected_states) - assert mod.shock_names == ("season[data_1]", "season[data_2]") - + x0 = np.zeros((mod.k_endog, mod.n_seasons), dtype=config.floatX) x0[0, 0] = 1 x0[1, 0] = 2.0 params = {"params_season": x0, "sigma_season": np.array([0.0, 0.0], dtype=config.floatX)} - x, y = simulate_from_numpy_model(mod, rng, params, steps=123 * d) assert_pattern_repeats(y[:, 0], s * d, atol=ATOL, rtol=RTOL) assert_pattern_repeats(y[:, 1], s * d, atol=ATOL, rtol=RTOL) mod = mod.build(verbose=False) - x0, *_, T, Z, R, _, Q = mod._unpack_statespace_with_placeholders() - - input_vars = explicit_graph_inputs([x0, T, Z, R, Q]) + x0_sym, *_, T, Z, R, _, Q = mod._unpack_statespace_with_placeholders() + input_vars = explicit_graph_inputs([x0_sym, T, Z, R, Q]) fn = pytensor.function( - inputs=list(input_vars), - outputs=[x0, T, Z, R, Q], - mode="FAST_COMPILE", + inputs=list(input_vars), outputs=[x0_sym, T, Z, R, Q], mode="FAST_COMPILE" ) params["sigma_season"] = np.array([0.1, 0.8], dtype=config.floatX) - x0, T, Z, R, Q = fn(**params) + x0_v, T_v, Z_v, R_v, Q_v = fn(**params) - # Because the dimension of the observed states is 2, - # the expected T is the diagonal block matrix [[T0, 0], [0, T0]] - # where T0 is the transition matrix we would have if the - # seasonality were not multiple observed. - mod0 = st.TimeSeasonality(season_length=s, duration=d, remove_first_state=remove_first_state) + mod0 = st.TimeSeasonality( + season_length=s, + duration=d, + remove_first_state=remove_first_state, + use_time_varying=False, + ) T0 = mod0.ssm["transition"].eval() if remove_first_state: - expected_x0 = np.repeat(np.array([1.0, 0.0, 2.0, 0.0]), d) + k_states_per_endog = d * (s - 1) expected_T = np.block( - [[T0, np.zeros((d * (s - 1), d * (s - 1)))], [np.zeros((d * (s - 1), d * (s - 1))), T0]] - ) - expected_R = np.array( - [[1.0, 1.0]] + [[0.0, 0.0]] * (2 * d - 1) + [[1.0, 1.0]] + [[0.0, 0.0]] * (2 * d - 1) + [ + [T0, np.zeros((k_states_per_endog, k_states_per_endog))], + [np.zeros((k_states_per_endog, k_states_per_endog)), T0], + ] ) - Z0 = np.zeros((2, d * (s - 1))) - Z0[0, 0] = 1 - Z1 = np.zeros((2, d * (s - 1))) - Z1[1, 0] = 1 - expected_Z = np.block([[Z0, Z1]]) - else: - expected_x0 = np.repeat(np.array([1.0, 0.0, 0.0, 2.0, 0.0, 0.0]), d) expected_T = np.block([[T0, np.zeros((s * d, s * d))], [np.zeros((s * d, s * d)), T0]]) - expected_R = np.array( - [[1.0, 1.0]] + [[0.0, 0.0]] * (s * d - 1) + [[1.0, 1.0]] + [[0.0, 0.0]] * (s * d - 1) - ) - Z0 = np.zeros((2, s * d)) - Z0[0, 0] = 1 - Z1 = np.zeros((2, s * d)) - Z1[1, 0] = 1 - expected_Z = np.block([[Z0, Z1]]) - expected_Q = np.array([[0.1**2, 0.0], [0.0, 0.8**2]]) + np.testing.assert_allclose(T_v, expected_T, atol=ATOL, rtol=RTOL) + np.testing.assert_allclose(Q_v, np.array([[0.1**2, 0.0], [0.0, 0.8**2]]), atol=ATOL, rtol=RTOL) + + +@pytest.mark.parametrize("d", [2, 3]) +@pytest.mark.parametrize("remove_first_state", [True, False]) +def test_time_seasonality_multiple_observed_time_varying(rng, d, remove_first_state): + s = 3 + mod = st.TimeSeasonality( + season_length=s, + duration=d, + innovations=True, + name="season", + observed_state_names=["data_1", "data_2"], + remove_first_state=remove_first_state, + use_time_varying=True, + ) + # Time-varying: state dimension is s or s-1, not multiplied by d + expected_k_states = (s - 1 if remove_first_state else s) * 2 + assert mod.k_states == expected_k_states + + x0 = np.zeros((mod.k_endog, mod.n_seasons), dtype=config.floatX) + x0[0, 0], x0[1, 0] = 1.0, 2.0 + params = {"params_season": x0, "sigma_season": np.array([0.0, 0.0], dtype=config.floatX)} - for matrix, expected in zip( - [x0, T, Z, R, Q], - [expected_x0, expected_T, expected_Z, expected_R, expected_Q], - ): - np.testing.assert_allclose(matrix, expected) + steps = ( + s * d * 10 + ) # assert_pattern_repeats requires this to be divisible by pattern period s * d + x, y = simulate_from_numpy_model(mod, rng, params, steps=steps) + assert_pattern_repeats(y[:, 0], s * d, atol=ATOL, rtol=RTOL) + assert_pattern_repeats(y[:, 1], s * d, atol=ATOL, rtol=RTOL) def test_time_seasonality_shared_states(): @@ -163,29 +220,19 @@ def test_time_seasonality_shared_states(): assert mod.k_endog == 2 assert mod.k_states == 3 assert mod.k_posdef == 1 - assert mod.coords["state_season"] == ("season_1", "season_2", "season_3") - assert mod.state_names == ( - "season_1[season_shared]", - "season_2[season_shared]", - "season_3[season_shared]", - ) - assert mod.shock_names == ("season[shared]",) - Z, T, R = pytensor.function( [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" )() - np.testing.assert_allclose(np.array([[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]]), Z) - - np.testing.assert_allclose(np.array([[0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [1.0, 0.0, 0.0]]), T) - - np.testing.assert_allclose(np.array([[1.0], [0.0], [0.0]]), R) + np.testing.assert_allclose(Z, np.array([[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]])) + np.testing.assert_allclose(T, np.array([[0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [1.0, 0.0, 0.0]])) + np.testing.assert_allclose(R, np.array([[1.0], [0.0], [0.0]])) def test_add_mixed_shared_not_shared_time_seasonality(): - shared_season = st.TimeSeasonality( + shared = st.TimeSeasonality( season_length=3, duration=1, innovations=True, @@ -195,7 +242,7 @@ def test_add_mixed_shared_not_shared_time_seasonality(): remove_first_state=False, share_states=True, ) - individual_season = st.TimeSeasonality( + individual = st.TimeSeasonality( season_length=3, duration=1, innovations=False, @@ -205,50 +252,14 @@ def test_add_mixed_shared_not_shared_time_seasonality(): remove_first_state=True, share_states=False, ) - mod = (shared_season + individual_season).build(verbose=False) + mod = (shared + individual).build(verbose=False) assert mod.k_endog == 2 assert mod.k_states == 7 assert mod.k_posdef == 1 - assert mod.coords["state_shared"] == ("season_1", "season_2", "season_3") assert mod.coords["state_individual"] == ("season_2", "season_3") - assert mod.state_names == ( - "season_1[shared_shared]", - "season_2[shared_shared]", - "season_3[shared_shared]", - "season_2[data_1]", - "season_3[data_1]", - "season_2[data_2]", - "season_3[data_2]", - ) - - Z, T, R = pytensor.function( - [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" - )() - - np.testing.assert_allclose( - np.array([[1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0]]), Z - ) - - np.testing.assert_allclose( - np.array( - [ - [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], - [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 0.0, -1.0, -1.0, 0.0, 0.0], - [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], - [0.0, 0.0, 0.0, 0.0, 0.0, -1.0, -1.0], - [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0], - ] - ), - T, - ) - - np.testing.assert_allclose(np.array([[1.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]), R) - @pytest.mark.parametrize("d1, d2", [(1, 1), (1, 3), (3, 1), (3, 3)]) def test_add_two_time_seasonality_different_observed(rng, d1, d2): @@ -257,17 +268,19 @@ def test_add_two_time_seasonality_different_observed(rng, d1, d2): duration=d1, innovations=True, name="season1", - state_names=[f"state_{i}_{j}" for i in range(3) for j in range(d1)], + state_names=[f"state_{i}" for i in range(3)], observed_state_names=["data_1"], remove_first_state=False, + use_time_varying=False, ) mod2 = st.TimeSeasonality( season_length=5, duration=d2, innovations=True, name="season2", - state_names=[f"state_{i}_{j}" for i in range(5) for j in range(d2)], + state_names=[f"state_{i}" for i in range(5)], observed_state_names=["data_2"], + use_time_varying=False, ) mod = (mod1 + mod2).build(verbose=False) @@ -284,104 +297,132 @@ def test_add_two_time_seasonality_different_observed(rng, d1, d2): assert_pattern_repeats(y[:, 0], 3 * d1, atol=ATOL, rtol=RTOL) assert_pattern_repeats(y[:, 1], 5 * d2, atol=ATOL, rtol=RTOL) - assert mod.state_names == tuple( - [ - item - for sublist in [ - [f"state_0_{j}[data_1]" for j in range(d1)], - [f"state_1_{j}[data_1]" for j in range(d1)], - [f"state_2_{j}[data_1]" for j in range(d1)], - [f"state_1_{j}[data_2]" for j in range(d2)], - [f"state_2_{j}[data_2]" for j in range(d2)], - [f"state_3_{j}[data_2]" for j in range(d2)], - [f"state_4_{j}[data_2]" for j in range(d2)], - ] - for item in sublist - ] - ) - - assert mod.shock_names == ( - "season1[data_1]", - "season2[data_2]", - ) + T1 = mod1.ssm["transition"].eval() + T2 = mod2.ssm["transition"].eval() x0, *_, T = mod._unpack_statespace_with_placeholders()[:5] input_vars = explicit_graph_inputs([x0, T]) - fn = pytensor.function( - inputs=list(input_vars), - outputs=[x0, T], - mode="FAST_COMPILE", - ) - - x0, T = fn( + fn = pytensor.function(inputs=list(input_vars), outputs=[x0, T], mode="FAST_COMPILE") + x0_v, T_v = fn( params_season1=np.array([1.0, 0.0, 0.0], dtype=config.floatX), params_season2=np.array([3.0, 0.0, 0.0, 1.2], dtype=config.floatX), ) - np.testing.assert_allclose( - np.repeat(np.array([1.0, 0.0, 0.0, 3.0, 0.0, 0.0, 1.2]), [d1, d1, d1, d2, d2, d2, d2]), - x0, - atol=ATOL, - rtol=RTOL, + expected_T = np.block( + [[T1, np.zeros((T1.shape[0], T2.shape[1]))], [np.zeros((T2.shape[0], T1.shape[1])), T2]] ) + np.testing.assert_allclose(T_v, expected_T, atol=ATOL, rtol=RTOL) - # The transition matrix T of mod is expected to be [[T1, 0], [0, T2]], - # where T1 and T2 are the transition matrices of mod1 and mod2, respectively. - T1 = mod1.ssm["transition"].eval() - T2 = mod2.ssm["transition"].eval() - np.testing.assert_allclose( - np.block( - [[T1, np.zeros((T1.shape[0], T2.shape[1]))], [np.zeros((T2.shape[0], T1.shape[1])), T2]] - ), - T, - atol=ATOL, - rtol=RTOL, + +@pytest.mark.parametrize("d1, d2", [(2, 2), (2, 3), (3, 2)]) +def test_add_two_time_seasonality_different_observed_time_varying(rng, d1, d2): + s1, s2 = 3, 5 + mod1 = st.TimeSeasonality( + season_length=s1, + duration=d1, + innovations=True, + name="season1", + observed_state_names=["data_1"], + remove_first_state=True, + use_time_varying=True, + ) + mod2 = st.TimeSeasonality( + season_length=s2, + duration=d2, + innovations=True, + name="season2", + observed_state_names=["data_2"], + remove_first_state=True, + use_time_varying=True, + ) + # Time-varying: compact state dimensions + assert mod1.k_states == s1 - 1 + assert mod2.k_states == s2 - 1 + + mod = (mod1 + mod2).build(verbose=False) + assert mod.k_states == (s1 - 1) + (s2 - 1) + + params = { + "params_season1": np.array([1.0, 0.0], dtype=config.floatX), + "params_season2": np.array([3.0, 0.0, 0.0, 0.0], dtype=config.floatX), + "sigma_season1": np.array(0.0, dtype=config.floatX), + "sigma_season2": np.array(0.0, dtype=config.floatX), + "initial_state_cov": np.eye(mod.k_states, dtype=config.floatX), + } + x, y = simulate_from_numpy_model(mod, rng, params, steps=s1 * s2 * max(d1, d2) * 2) + assert_pattern_repeats(y[:, 0], s1 * d1, atol=ATOL, rtol=RTOL) + assert_pattern_repeats(y[:, 1], s2 * d2, atol=ATOL, rtol=RTOL) + + +def test_add_time_varying_and_static_seasonality(rng): + s1, d1 = 4, 3 # time-varying: compact state dim = s1 - 1 = 3 + s2, d2 = 5, 2 # static: expanded state dim = d2 * (s2 - 1) = 8 + + mod_tv = st.TimeSeasonality( + season_length=s1, + duration=d1, + innovations=True, + name="weekly", + observed_state_names=["data"], + remove_first_state=True, + use_time_varying=True, + ) + mod_static = st.TimeSeasonality( + season_length=s2, + duration=d2, + innovations=True, + name="monthly", + observed_state_names=["data"], + remove_first_state=True, + use_time_varying=False, ) + assert mod_tv.k_states == s1 - 1 + assert mod_static.k_states == d2 * (s2 - 1) + mod = (mod_tv + mod_static).build(verbose=False) + assert mod.k_states == (s1 - 1) + d2 * (s2 - 1) -def get_shift_factor(s): - s_str = str(s) - if "." not in s_str: - return 1 - _, decimal = s_str.split(".") - return 10 ** len(decimal) + params = { + "params_weekly": np.array([1.0, 0.0, 0.0], dtype=config.floatX), + "params_monthly": np.array([2.0, 0.0, 0.0, 0.0], dtype=config.floatX), + "sigma_weekly": np.array(0.0, dtype=config.floatX), + "sigma_monthly": np.array(0.0, dtype=config.floatX), + "initial_state_cov": np.eye(mod.k_states, dtype=config.floatX), + } + # Pattern repeats every LCM(s1*d1, s2*d2) = LCM(12, 10) = 60 + steps = 60 * 3 + x, y = simulate_from_numpy_model(mod, rng, params, steps=steps) + # Combined output should have period = LCM of individual periods + # For testing, verify the model runs without error and output shape is correct + assert y.shape == (steps,) -@pytest.mark.parametrize("n", [*np.arange(1, 6, dtype="int").tolist(), None]) -@pytest.mark.parametrize("s", [5, 10, 25, 25.2]) + +@pytest.mark.parametrize("n", [1, 2, 3, None]) +@pytest.mark.parametrize("s", [5, 10, 25.2]) def test_frequency_seasonality(n, s, rng): mod = st.FrequencySeasonality(season_length=s, n=n, name="season") - assert mod.param_info["sigma_season"].shape == () # scalar for univariate + assert mod.param_info["sigma_season"].shape == () assert mod.param_info["sigma_season"].dims is None assert len(mod.coords["state_season"]) == mod.n_coefs x0 = rng.normal(size=mod.n_coefs).astype(config.floatX) params = {"params_season": x0, "sigma_season": 0.0} - k = get_shift_factor(s) - T = int(s * k) + + decimal = s_str.split(".") if "." in (s_str := str(s)) else "0" + T = int(s * 10 ** len(decimal)) x, y = simulate_from_numpy_model(mod, rng, params, steps=2 * T) assert_pattern_repeats(y, T, atol=ATOL, rtol=RTOL) - # check coords mod = mod.build(verbose=False) _assert_basic_coords_correct(mod) - if n is None: - n = int(s // 2) - states = [f"{f}_{i}_season" for i in range(n) for f in ["Cos", "Sin"]] - - # remove last state when model is completely saturated - if s / n == 2.0: - states.pop() - assert mod.coords["state_season"] == tuple(states) - def test_frequency_seasonality_multiple_observed(rng): observed_state_names = ["data_1", "data_2"] - season_length = 4 mod = st.FrequencySeasonality( - season_length=season_length, + season_length=4, n=None, name="season", innovations=True, @@ -391,44 +432,16 @@ def test_frequency_seasonality_multiple_observed(rng): assert mod.param_info["params_season"].dims == ("endog_season", "state_season") assert mod.param_info["sigma_season"].dims == ("endog_season",) - expected_state_names = ( - "Cos_0_season[data_1]", - "Sin_0_season[data_1]", - "Cos_1_season[data_1]", - "Sin_1_season[data_1]", - "Cos_0_season[data_2]", - "Sin_0_season[data_2]", - "Cos_1_season[data_2]", - "Sin_1_season[data_2]", - ) - assert mod.state_names == expected_state_names - assert mod.shock_names == ( - "Cos_0_season[data_1]", - "Sin_0_season[data_1]", - "Cos_1_season[data_1]", - "Sin_1_season[data_1]", - "Cos_0_season[data_2]", - "Sin_0_season[data_2]", - "Cos_1_season[data_2]", - "Sin_1_season[data_2]", - ) - x0 = np.zeros((2, 3), dtype=config.floatX) x0[0, 0] = 1.0 x0[1, 0] = 2.0 params = {"params_season": x0, "sigma_season": np.zeros(2, dtype=config.floatX)} x, y = simulate_from_numpy_model(mod, rng, params, steps=12) - # check periodicity for each observed series assert_pattern_repeats(y[:, 0], 4, atol=ATOL, rtol=RTOL) assert_pattern_repeats(y[:, 1], 4, atol=ATOL, rtol=RTOL) mod = mod.build(verbose=False) - assert mod.coords["state_season"] == ( - "Cos_0_season", - "Sin_0_season", - "Cos_1_season", - ) x0_sym, *_, T_sym, Z_sym, R_sym, _, Q_sym = mod._unpack_statespace_with_placeholders() input_vars = explicit_graph_inputs([x0_sym, T_sym, Z_sym, R_sym, Q_sym]) @@ -440,40 +453,9 @@ def test_frequency_seasonality_multiple_observed(rng): params["sigma_season"] = np.array([0.1, 0.8], dtype=config.floatX) x0_v, T_v, Z_v, R_v, Q_v = fn(**params) - # x0 should be raveled into a single vector, with data_1 states first, then data_2 states np.testing.assert_allclose( x0_v, np.array([1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0]), atol=ATOL, rtol=RTOL ) - - # T_v shape: (8, 8) (k_endog * k_states) - # The transition matrix is block diagonal, each block is: - # For n=2, season_length=4: - # lambda_1 = 2*pi*1/4 = pi/2, cos(pi/2)=0, sin(pi/2)=1 - # lambda_2 = 2*pi*2/4 = pi, cos(pi)=-1, sin(pi)=0 - # Block 1 (Cos_0, Sin_0): - # [[cos(pi/2), sin(pi/2)], - # [-sin(pi/2), cos(pi/2)]] = [[0, 1], [-1, 0]] - # Block 2 (Cos_1, Sin_1): - # [[-1, 0], [0, -1]] - expected_T_block1 = np.array([[0.0, 1.0], [-1.0, 0.0]]) - expected_T_block2 = np.array([[-1.0, 0.0], [0.0, -1.0]]) - expected_T = np.zeros((8, 8)) - # data_1 - expected_T[0:2, 0:2] = expected_T_block1 - expected_T[2:4, 2:4] = expected_T_block2 - # data_2 - expected_T[4:6, 4:6] = expected_T_block1 - expected_T[6:8, 6:8] = expected_T_block2 - np.testing.assert_allclose(T_v, expected_T, atol=ATOL, rtol=RTOL) - - # Only the first two states (one sin and one cos component) of each observed series are observed - expected_Z = np.zeros((2, 8)) - expected_Z[0, 0] = 1.0 - expected_Z[0, 2] = 1.0 - expected_Z[1, 4] = 1.0 - expected_Z[1, 6] = 1.0 - np.testing.assert_allclose(Z_v, expected_Z, atol=ATOL, rtol=RTOL) - np.testing.assert_allclose(R_v, np.eye(8), atol=ATOL, rtol=RTOL) Q_diag = np.diag(Q_v) @@ -494,43 +476,32 @@ def test_frequency_seasonality_multivariate_shared_states(): assert mod.k_endog == 2 assert mod.k_states == 2 assert mod.k_posdef == 2 - - assert mod.state_names == ( - "Cos_0_season[shared]", - "Sin_0_season[shared]", - ) - assert mod.shock_names == ( - "Cos_0_season[shared]", - "Sin_0_season[shared]", - ) - assert mod.coords["state_season"] == ("Cos_0_season", "Sin_0_season") Z, T, R = pytensor.function( [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" )() - np.testing.assert_allclose(np.array([[1.0, 0.0], [1.0, 0.0]]), Z) - - np.testing.assert_allclose(np.array([[1.0, 0.0], [0.0, 1.0]]), R) + np.testing.assert_allclose(Z, np.array([[1.0, 0.0], [1.0, 0.0]])) + np.testing.assert_allclose(R, np.array([[1.0, 0.0], [0.0, 1.0]])) lam = 2 * np.pi * 1 / 4 np.testing.assert_allclose( - np.array([[np.cos(lam), np.sin(lam)], [-np.sin(lam), np.cos(lam)]]), T + T, np.array([[np.cos(lam), np.sin(lam)], [-np.sin(lam), np.cos(lam)]]) ) def test_add_two_frequency_seasonality_different_observed(rng): mod1 = st.FrequencySeasonality( season_length=4, - n=2, # saturated + n=2, name="freq1", innovations=True, observed_state_names=["data_1"], ) mod2 = st.FrequencySeasonality( season_length=6, - n=1, # unsaturated + n=1, name="freq2", innovations=True, observed_state_names=["data_2"], @@ -551,64 +522,9 @@ def test_add_two_frequency_seasonality_different_observed(rng): assert_pattern_repeats(y[:, 0], 4, atol=ATOL, rtol=RTOL) assert_pattern_repeats(y[:, 1], 6, atol=ATOL, rtol=RTOL) - assert mod.state_names == ( - "Cos_0_freq1[data_1]", - "Sin_0_freq1[data_1]", - "Cos_1_freq1[data_1]", - "Sin_1_freq1[data_1]", - "Cos_0_freq2[data_2]", - "Sin_0_freq2[data_2]", - ) - - assert mod.shock_names == ( - "Cos_0_freq1[data_1]", - "Sin_0_freq1[data_1]", - "Cos_1_freq1[data_1]", - "Sin_1_freq1[data_1]", - "Cos_0_freq2[data_2]", - "Sin_0_freq2[data_2]", - ) - - x0, *_, T = mod._unpack_statespace_with_placeholders()[:5] - input_vars = explicit_graph_inputs([x0, T]) - fn = pytensor.function( - inputs=list(input_vars), - outputs=[x0, T], - mode="FAST_COMPILE", - ) - - x0_v, T_v = fn( - params_freq1=np.array([1.0, 0.0, 1.2], dtype=config.floatX), - params_freq2=np.array([3.0, 0.0], dtype=config.floatX), - ) - - # Make sure the extra 0 in from the first component (the saturated state) is there! - np.testing.assert_allclose(np.array([1.0, 0.0, 1.2, 0.0, 3.0, 0.0]), x0_v, atol=ATOL, rtol=RTOL) - - # Transition matrix is block diagonal: 4x4 for freq1, 2x2 for freq2 - # freq1: n=4, lambdas = 2*pi*1/6, 2*pi*2/6 - lam1 = 2 * np.pi * 1 / 4 - lam2 = 2 * np.pi * 2 / 4 - freq1_T1 = np.array([[np.cos(lam1), np.sin(lam1)], [-np.sin(lam1), np.cos(lam1)]]) - freq1_T2 = np.array([[np.cos(lam2), np.sin(lam2)], [-np.sin(lam2), np.cos(lam2)]]) - freq1_T = np.zeros((4, 4)) - - # freq2: n=4, lambdas = 2*pi*1/6 - lam3 = 2 * np.pi * 1 / 6 - freq2_T = np.array([[np.cos(lam3), np.sin(lam3)], [-np.sin(lam3), np.cos(lam3)]]) - - freq1_T[0:2, 0:2] = freq1_T1 - freq1_T[2:4, 2:4] = freq1_T2 - - expected_T = np.zeros((6, 6)) - expected_T[0:4, 0:4] = freq1_T - expected_T[4:6, 4:6] = freq2_T - - np.testing.assert_allclose(expected_T, T_v, atol=ATOL, rtol=RTOL) - def test_add_frequency_seasonality_shared_and_not_shared(): - shared_season = st.FrequencySeasonality( + shared = st.FrequencySeasonality( season_length=4, n=1, name="shared_season", @@ -616,8 +532,7 @@ def test_add_frequency_seasonality_shared_and_not_shared(): observed_state_names=["data_1", "data_2"], share_states=True, ) - - individual_season = st.FrequencySeasonality( + individual = st.FrequencySeasonality( season_length=4, n=2, name="individual_season", @@ -626,16 +541,12 @@ def test_add_frequency_seasonality_shared_and_not_shared(): share_states=False, ) - mod = (shared_season + individual_season).build(verbose=False) + mod = (shared + individual).build(verbose=False) assert mod.k_endog == 2 assert mod.k_states == 10 assert mod.k_posdef == 10 - - assert mod.coords["state_shared_season"] == ( - "Cos_0_shared_season", - "Sin_0_shared_season", - ) + assert mod.coords["state_shared_season"] == ("Cos_0_shared_season", "Sin_0_shared_season") assert mod.coords["state_individual_season"] == ( "Cos_0_individual_season", "Sin_0_individual_season", @@ -644,94 +555,30 @@ def test_add_frequency_seasonality_shared_and_not_shared(): @pytest.mark.parametrize( - "test_case", + "n,observed,expected_shape", [ - { - "name": "single_endog_non_saturated", - "season_length": 12, - "n": 2, - "observed_state_names": ("data1",), - "expected_shape": (4,), - }, - { - "name": "single_endog_saturated", - "season_length": 12, - "n": 6, - "observed_state_names": ("data1",), - "expected_shape": (11,), - }, - { - "name": "multiple_endog_non_saturated", - "season_length": 12, - "n": 2, - "observed_state_names": ( - "data1", - "data2", - ), - "expected_shape": (2, 4), - }, - { - "name": "multiple_endog_saturated", - "season_length": 12, - "n": 6, - "observed_state_names": ( - "data1", - "data2", - ), - "expected_shape": (2, 11), - }, - { - "name": "small_n", - "season_length": 12, - "n": 1, - "observed_state_names": ("data1",), - "expected_shape": (2,), - }, - { - "name": "many_endog", - "season_length": 12, - "n": 2, - "observed_state_names": ("data1", "data2", "data3", "data4"), - "expected_shape": (4, 4), - }, + (2, ("data1",), (4,)), + (6, ("data1",), (11,)), + (2, ("data1", "data2"), (2, 4)), + (6, ("data1", "data2"), (2, 11)), + (1, ("data1",), (2,)), + (2, ("data1", "data2", "data3", "data4"), (4, 4)), ], - ids=lambda x: x["name"], ) -def test_frequency_seasonality_coordinates(test_case): - model_name = f"season_{test_case['name'].split('_')[0]}" - +def test_frequency_seasonality_coordinates(n, observed, expected_shape): season = FrequencySeasonality( - season_length=test_case["season_length"], - n=test_case["n"], - name=model_name, - observed_state_names=test_case["observed_state_names"], + season_length=12, + n=n, + name="season", + observed_state_names=observed, ) season.populate_component_properties() - # assert parameter shape - assert season.param_info[f"params_{model_name}"].shape == test_case["expected_shape"] + assert season.param_info["params_season"].shape == expected_shape - # generate expected state names based on actual model name - expected_state_names = tuple( - [f"{f}_{i}_{model_name}" for i in range(test_case["n"]) for f in ["Cos", "Sin"]][ - : test_case["expected_shape"][-1] - ] - ) + state_coords = season.coords["state_season"] + assert len(state_coords) == expected_shape[-1] - # assert coordinate structure - if len(test_case["observed_state_names"]) == 1: - assert len(season.coords[f"state_{model_name}"]) == test_case["expected_shape"][0] - assert season.coords[f"state_{model_name}"] == expected_state_names - else: - assert len(season.coords[f"endog_{model_name}"]) == test_case["expected_shape"][0] - assert len(season.coords[f"state_{model_name}"]) == test_case["expected_shape"][1] - assert season.coords[f"state_{model_name}"] == expected_state_names - - # Check coords match the expected shape - param_shape = season.param_info[f"params_{model_name}"].shape - state_coords = season.coords[f"state_{model_name}"] - endog_coords = season.coords.get(f"endog_{model_name}") - - assert len(state_coords) == param_shape[-1] - if endog_coords: - assert len(endog_coords) == param_shape[0] + if len(observed) > 1: + endog_coords = season.coords["endog_season"] + assert len(endog_coords) == expected_shape[0] diff --git a/tests/statespace/models/structural/test_against_statsmodels.py b/tests/statespace/models/structural/test_against_statsmodels.py index 95214b65c..c4009742b 100644 --- a/tests/statespace/models/structural/test_against_statsmodels.py +++ b/tests/statespace/models/structural/test_against_statsmodels.py @@ -303,8 +303,23 @@ def create_structural_model_and_equivalent_statsmodel( expected_coords[ALL_STATE_DIM] += state_names expected_coords[ALL_STATE_AUX_DIM] += state_names + # Our implementation reorders the initial state to produce output order [gamma_0, gamma_1, gamma_2, + # ..., gamma_{s-1}] + # Internal state is: [gamma_0, gamma_{s-1}, gamma_{s-2}, ..., gamma_2] + # where gamma_0 = -sum(seasonal_coefs) and seasonal_coefs = [gamma_1, gamma_2, ..., gamma_{s-1}] + + # Statsmodels expects state: [gamma_t, gamma_{t-1}, ..., gamma_{t-s+2}] + # which with the same labeling is [current, lag1, lag2, ...] + + # To match, we set sm_init to our reordered state: [gamma_0, gamma_{s-1}, ..., gamma_2] + gamma_0 = -seasonal_coefs.sum() + if len(seasonal_coefs) > 1: + reordered_state = np.concatenate([[gamma_0], seasonal_coefs[1:][::-1]]) + else: + reordered_state = np.array([gamma_0]) + seasonal_dict = { - "seasonal" if i == 0 else f"seasonal.L{i}": c for i, c in enumerate(seasonal_coefs) + "seasonal" if i == 0 else f"seasonal.L{i}": c for i, c in enumerate(reordered_state) } sm_init.update(seasonal_dict) diff --git a/tests/statespace/test_utilities.py b/tests/statespace/test_utilities.py index e4775e8d1..3abf65bd2 100644 --- a/tests/statespace/test_utilities.py +++ b/tests/statespace/test_utilities.py @@ -13,6 +13,8 @@ from numpy.testing import assert_allclose from pymc import modelcontext +from pytensor.graph.replace import graph_replace +from pytensor.graph.traversal import explicit_graph_inputs from pymc_extras.statespace.filters.kalman_smoother import KalmanSmoother from pymc_extras.statespace.utils.constants import ( @@ -219,16 +221,29 @@ def unpack_statespace(ssm): return [ssm[SHORT_NAME_TO_LONG[x]] for x in MATRIX_NAMES] -def unpack_symbolic_matrices_with_params(mod, param_dict, data_dict=None, mode="FAST_COMPILE"): +def unpack_symbolic_matrices_with_params( + mod, param_dict, steps=None, data_dict=None, mode="FAST_COMPILE" +): inputs = list(mod._name_to_variable.values()) if data_dict is not None: inputs += list(mod._name_to_data.values()) else: data_dict = {} + matrices = unpack_statespace(mod.ssm) + n_timestep_variables = tuple( + variable for variable in explicit_graph_inputs(matrices) if variable.name == "n_timesteps" + ) + if n_timestep_variables: + if steps is None: + raise ValueError("steps must be provided when the model contains time-varying matrices") + steps_pt = pt.constant(steps, dtype="int32") + replacement_dict = {var: steps_pt for var in n_timestep_variables} + matrices = graph_replace(matrices, replacement_dict, strict=False) + f_matrices = pytensor.function( inputs, - unpack_statespace(mod.ssm), + matrices, on_unused_input="raise", mode=mode, ) @@ -242,7 +257,9 @@ def simulate_from_numpy_model(mod, rng, param_dict, data_dict=None, steps=100): """ Helper function to visualize the components outside of a PyMC model context """ - x0, P0, c, d, T, Z, R, H, Q = unpack_symbolic_matrices_with_params(mod, param_dict, data_dict) + x0, P0, c, d, T, Z, R, H, Q = unpack_symbolic_matrices_with_params( + mod, param_dict, steps, data_dict + ) k_endog = mod.k_endog k_states = mod.k_states k_posdef = mod.k_posdef @@ -268,7 +285,10 @@ def simulate_from_numpy_model(mod, rng, param_dict, data_dict=None, steps=100): else: error = 0 - x[t] = c + T @ x[t - 1] + innov + if T.ndim == 2: + x[t] = c + T @ x[t - 1] + innov + else: + x[t] = c + T[t - 1] @ x[t - 1] + innov if Z.ndim == 2: y[t] = (d + Z @ x[t] + error).squeeze() else: