Skip to content

Add Spline Interpolation using basis functions #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
# The ">-" in the next line replaces newlines with spaces (see https://stackoverflow.com/a/66809682).
run: >-
conda activate pymc-test-py37 &&
python -m pytest -vv --cov=pymc_experimental --cov-append --cov-report=xml --cov-report term --durations=50 %TEST_SUBSET%
python -m pytest -vv --cov=pymc_experimental --doctest-modules pymc_experimental --cov-append --cov-report=xml --cov-report term --durations=50 %TEST_SUBSET%
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
Expand Down
7 changes: 7 additions & 0 deletions docs/api_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ methods in the current release of PyMC experimental.
.. automodule:: pymc_experimental.distributions.histogram_utils
:members: histogram_approximation


:mod:`pymc_experimental.utils`
=============================

.. automodule:: pymc_experimental.utils.spline
:members: bspline_interpolation

2 changes: 2 additions & 0 deletions pymc_experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@

from pymc_experimental.bart import *
from pymc_experimental import distributions
from pymc_experimental import gp
from pymc_experimental import utils
2 changes: 1 addition & 1 deletion pymc_experimental/gp/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from pymc_experimental.gp.latent_approx import HSGP, ProjectedProcess, KarhunenLoeveExpansion
from pymc_experimental.gp.latent_approx import HSGP, ProjectedProcess, KarhunenLoeveExpansion
60 changes: 60 additions & 0 deletions pymc_experimental/tests/test_splines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import aesara
import pymc_experimental as pmx
import aesara.tensor as at
import numpy as np
import pytest


@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("sparse", [True, False])
def test_spline_construction(dtype, sparse):
x = np.linspace(0, 1, 20, dtype=dtype)
np_out = pmx.utils.spline.numpy_bspline_basis(x, 10, 3)
assert np_out.shape == (20, 10)
assert np_out.dtype == dtype
spline_op = pmx.utils.spline.BSplineBasis(sparse=sparse)
out = spline_op(x, at.constant(10), at.constant(3))
if not sparse:
assert isinstance(out.type, at.TensorType)
else:
assert isinstance(out.type, aesara.sparse.SparseTensorType)
B = out.eval()
if not sparse:
np.testing.assert_allclose(B, np_out)
else:
np.testing.assert_allclose(B.todense(), np_out)
assert B.shape == (20, 10)


@pytest.mark.parametrize("shape", [(100,), (100, 5)])
@pytest.mark.parametrize("sparse", [True, False])
@pytest.mark.parametrize("points", [dict(n=1001), dict(eval_points=np.linspace(0, 1, 1001))])
def test_interpolation_api(shape, sparse, points):
x = np.random.randn(*shape)
yt = pmx.utils.spline.bspline_interpolation(x, **points, sparse=sparse)
y = yt.eval()
assert y.shape == (1001, *shape[1:])


@pytest.mark.parametrize(
"params",
[
(dict(sparse="foo", n=100, degree=1), TypeError, "sparse should be True or False"),
(dict(n=100, degree=0.5), TypeError, "degree should be integer"),
(
dict(n=100, eval_points=np.linspace(0, 1), degree=1),
ValueError,
"Please provide one of n or eval_points",
),
(
dict(degree=1),
ValueError,
"Please provide one of n or eval_points",
),
],
)
def test_bad_calls(params):
kw, E, err = params
x = np.random.randn(10)
with pytest.raises(E, match=err):
pmx.utils.spline.bspline_interpolation(x, **kw)
1 change: 1 addition & 0 deletions pymc_experimental/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pymc_experimental.utils import spline
115 changes: 115 additions & 0 deletions pymc_experimental/utils/spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import aesara
import numpy as np
import scipy.interpolate
from aesara.graph.op import Op, Apply
import aesara.tensor as at
import aesara.sparse


def numpy_bspline_basis(eval_points: np.ndarray, k: int, degree=3):
k_knots = k + degree + 1
knots = np.linspace(0, 1, k_knots - 2 * degree)
knots = np.r_[[0] * degree, knots, [1] * degree]
basis_funcs = scipy.interpolate.BSpline(knots, np.eye(k), k=degree)
Bx = basis_funcs(eval_points).astype(eval_points.dtype)
return Bx


class BSplineBasis(Op):
__props__ = ("sparse",)

def __init__(self, sparse=True) -> None:
super().__init__()
if not isinstance(sparse, bool):
raise TypeError("sparse should be True or False")
self.sparse = sparse

def make_node(self, *inputs) -> Apply:
eval_points, k, d = map(at.as_tensor, inputs)
if not (eval_points.ndim == 1 and np.issubdtype(eval_points.dtype, np.floating)):
raise TypeError("eval_points should be a vector of floats")
if not k.type in at.int_types:
raise TypeError("k should be integer")
if not d.type in at.int_types:
raise TypeError("degree should be integer")
if self.sparse:
out_type = aesara.sparse.SparseTensorType("csr", eval_points.dtype)()
else:
out_type = aesara.tensor.matrix(dtype=eval_points.dtype)
return Apply(self, [eval_points, k, d], [out_type])

def perform(self, node, inputs, output_storage, params=None) -> None:
eval_points, k, d = inputs
Bx = numpy_bspline_basis(eval_points, int(k), int(d))
if self.sparse:
Bx = scipy.sparse.csr_matrix(Bx, dtype=eval_points.dtype)
output_storage[0][0] = Bx

def infer_shape(self, fgraph, node, ins_shapes):
return [(node.inputs[0].shape[0], node.inputs[1])]


def bspline_basis(n, k, degree=3, dtype=None, sparse=True):
dtype = dtype or aesara.config.floatX
eval_points = np.linspace(0, 1, n, dtype=dtype)
return BSplineBasis(sparse=sparse)(eval_points, k, degree)


def bspline_interpolation(x, *, n=None, eval_points=None, degree=3, sparse=True):
"""Interpolate sparse grid to dense grid using bsplines.

Parameters
----------
x : Variable
Input Variable to interpolate.
0th coordinate assumed to be mapped regularly on [0, 1] interval
n : int (optional)
Resolution of interpolation
eval_points : vector (optional)
Custom eval points in [0, 1] interval (or scaled properly using min/max scaling)
degree : int, optional
BSpline degree, by default 3
sparse : bool, optional
Use sparse operation, by default True

Returns
-------
Variable
The interpolated variable, interpolation is across 0th axis

Examples
--------
>>> import pymc as pm
>>> import numpy as np
>>> half_months = np.linspace(0, 365, 12*2)
>>> with pm.Model(coords=dict(knots_time=half_months, time=np.arange(365))) as model:
... kernel = pm.gp.cov.ExpQuad(1, ls=365/12)
... # ready to define gp (a latent process over parameters)
... gp = pm.gp.gp.Latent(
... cov_func=kernel
... )
... y_knots = gp.prior("y_knots", half_months[:, None], dims="knots_time")
... y = pm.Deterministic(
... "y",
... bspline_interpolation(y_knots, n=365, degree=3),
... dims="time"
... )
... trace = pm.sample_prior_predictive(1)

Notes
-----
Adopted from `BayesAlpha <https://github.com/quantopian/bayesalpha/blob/676f4f194ad20211fd040d3b0c6e82969aafb87e/bayesalpha/dists.py#L97>`_
where it was written by @aseyboldt
"""
x = at.as_tensor(x)
if n is not None and eval_points is not None:
raise ValueError("Please provide one of n or eval_points")
elif n is not None:
eval_points = np.linspace(0, 1, n, dtype=x.dtype)
elif eval_points is None:
raise ValueError("Please provide one of n or eval_points")
basis = BSplineBasis(sparse=sparse)(eval_points, x.shape[0], degree)
if sparse:
return aesara.sparse.dot(basis, x)
else:
return aesara.tensor.dot(basis, x)
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
[tool.pytest.ini_options]
minversion = "6.0"
xfail_strict=true
addopts = "--doctest-modules pymc_experimental"

[tool.black]
line-length = 100
Expand Down