Skip to content

Commit

Permalink
Add GP Wrapped Periodic Kernel (#6742)
Browse files Browse the repository at this point in the history
Co-authored-by: Joseph Hall <joseph.hall@bp.com>
  • Loading branch information
jahall and Joseph Hall authored Jul 19, 2023
1 parent 4a65148 commit 82c6318
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 21 deletions.
113 changes: 92 additions & 21 deletions pymc/gp/cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"Cosine",
"Periodic",
"WarpedInput",
"WrappedPeriodic",
"Gibbs",
"Coregion",
"ScaledCov",
Expand Down Expand Up @@ -551,6 +552,11 @@ def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape[0])

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r2 = self.square_dist(X, Xs)
return self.full_from_distance(r2, squared=True)

def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
raise NotImplementedError

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
Expand All @@ -568,9 +574,8 @@ class ExpQuad(Stationary):
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r2 = self.square_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r2 = dist if squared else dist**2
return pt.exp(-0.5 * r2)

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
Expand Down Expand Up @@ -609,9 +614,8 @@ def __init__(
super().__init__(input_dim, ls, ls_inv, active_dims)
self.alpha = alpha

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r2 = self.square_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r2 = dist if squared else dist**2
return pt.power(
(1.0 + 0.5 * r2 * (1.0 / self.alpha)),
-1.0 * self.alpha,
Expand All @@ -629,9 +633,8 @@ class Matern52(Stationary):
\mathrm{exp}\left[ - \frac{\sqrt{5(x - x')^2}}{\ell} \right]
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return (1.0 + np.sqrt(5.0) * r + 5.0 / 3.0 * pt.square(r)) * pt.exp(-1.0 * np.sqrt(5.0) * r)

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
Expand Down Expand Up @@ -669,9 +672,8 @@ class Matern32(Stationary):
\mathrm{exp}\left[ - \frac{\sqrt{3(x - x')^2}}{\ell} \right]
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return (1.0 + np.sqrt(3.0) * r) * pt.exp(-np.sqrt(3.0) * r)

def power_spectral_density(self, omega: TensorLike) -> TensorVariable:
Expand Down Expand Up @@ -708,9 +710,8 @@ class Matern12(Stationary):
k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{\ell} \right]
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.exp(-r)


Expand All @@ -723,9 +724,8 @@ class Exponential(Stationary):
k(x, x') = \mathrm{exp}\left[ -\frac{||x - x'||}{2\ell} \right]
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.exp(-0.5 * r)


Expand All @@ -737,9 +737,8 @@ class Cosine(Stationary):
k(x, x') = \mathrm{cos}\left( 2 \pi \frac{||x - x'||}{ \ell^2} \right)
"""

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
r = self.euclidean_dist(X, Xs)
def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
r = self._sqrt(dist) if squared else dist
return pt.cos(2.0 * np.pi * r)


Expand Down Expand Up @@ -781,6 +780,12 @@ def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable
f2 = pt.expand_dims(Xs, axis=(1,))
r = np.pi * (f1 - f2) / self.period
r2 = pt.sum(pt.square(pt.sin(r) / self.ls), 2)
return self.full_from_distance(r2, squared=True)

def full_from_distance(self, dist: TensorLike, squared: bool = False) -> TensorVariable:
# NOTE: This is the same as the ExpQuad as we assume the periodicity
# has already been accounted for in the distance
r2 = dist if squared else dist**2
return pt.exp(-0.5 * r2)


Expand Down Expand Up @@ -882,6 +887,72 @@ def diag(self, X: TensorLike) -> TensorVariable:
return self.cov_func(self.w(X, self.args), diag=True)


class WrappedPeriodic(Covariance):
r"""
The `WrappedPeriodic` kernel constructs periodic kernels from any `Stationary` kernel.
This is done by warping the input with the function
.. math::
\mathbf{u}(x) = \left(
\mathrm{sin} \left( \frac{2\pi x}{T} \right),
\mathrm{cos} \left( \frac{2\pi x}{T} \right)
\right)
The original derivation as applied to the squared exponential kernel can
be found in [1] or referenced in Chapter 4, page 92 of [2].
Notes
-----
Note that we drop the resulting scaling by 4 found in the original derivation
so that the interpretation of the length scales is consistent between the
underlying kernel and the periodic version.
Examples
--------
In order to construct a kernel equivalent to the `Periodic` kernel you
can do the following (though using `Periodic` will likely be a bit faster):
.. code:: python
exp_quad = pm.gp.cov.ExpQuad(1, ls=0.5)
cov = pm.gp.cov.WrappedPeriodic(exp_quad, period=5)
References
----------
.. [1] MacKay, D. J. C. (1998). Introduction to Gaussian Processes.
In Bishop, C. M., editor, Neural Networks and Machine Learning. Springer-Verlag
.. [2] Rasmussen, C. E., & Williams, C. K. I. (2006). Gaussian processes for machine learning. MIT Press.
http://gaussianprocess.org/gpml/chapters/
Parameters
----------
cov_func: Stationary
Base kernel or covariance function
period: Period
"""

def __init__(self, cov_func: Stationary, period):
super().__init__(cov_func.input_dim, cov_func.active_dims)
if not isinstance(cov_func, Stationary):
raise TypeError("Must inherit from the Stationary class")
self.cov_func = cov_func
self.period = period

def full(self, X: TensorLike, Xs: Optional[TensorLike] = None) -> TensorVariable:
X, Xs = self._slice(X, Xs)
if Xs is None:
Xs = X
f1 = pt.expand_dims(X, axis=(0,))
f2 = pt.expand_dims(Xs, axis=(1,))
r = np.pi * (f1 - f2) / self.period
r2 = pt.sum(pt.square(pt.sin(r) / self.cov_func.ls), 2)
return self.cov_func.full_from_distance(r2, squared=True)

def diag(self, X: TensorLike) -> TensorVariable:
return self._alloc(1.0, X.shape[0])


class Gibbs(Covariance):
r"""
The Gibbs kernel. Use an arbitrary lengthscale function defined
Expand Down
38 changes: 38 additions & 0 deletions tests/gp/test_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,22 @@ def test_psd(self):
)
npt.assert_allclose(true_1d_psd, test_1d_psd, atol=1e-5)

def test_euclidean_dist(self):
X = np.arange(0, 3)[:, None]
Xs = np.arange(1, 4)[:, None]
with pm.Model():
cov = pm.gp.cov.ExpQuad(1, ls=1)
result = cov.euclidean_dist(X, Xs).eval()
expected = np.array(
[
[1, 2, 3],
[0, 1, 2],
[1, 0, 1],
]
)
print(result, expected)
npt.assert_allclose(result, expected, atol=1e-5)


class TestWhiteNoise:
def test_1d(self):
Expand Down Expand Up @@ -678,6 +694,28 @@ def test_raises(self):
pm.gp.cov.WarpedInput(1, "str is not Covariance object", lambda x: x)


class TestWrappedPeriodic:
def test_1d(self):
X = np.linspace(0, 1, 10)[:, None]
with pm.Model():
cov1 = pm.gp.cov.Periodic(1, ls=0.2, period=1)
cov2 = pm.gp.cov.WrappedPeriodic(
cov_func=pm.gp.cov.ExpQuad(1, ls=0.2),
period=1,
)
K1 = cov1(X).eval()
K2 = cov2(X).eval()
npt.assert_allclose(K1, K2, atol=1e-3)
K1d = cov1(X, diag=True).eval()
K2d = cov2(X, diag=True).eval()
npt.assert_allclose(K1d, K2d, atol=1e-3)

def test_raises(self):
lin_cov = pm.gp.cov.Linear(1, c=1)
with pytest.raises(TypeError, match="Must inherit from the Stationary class"):
pm.gp.cov.WrappedPeriodic(lin_cov, period=1)


class TestGibbs:
def test_1d(self):
X = np.linspace(0, 2, 10)[:, None]
Expand Down

0 comments on commit 82c6318

Please sign in to comment.