Skip to content

Commit

Permalink
Feature/repeated maps (#41)
Browse files Browse the repository at this point in the history
* default data_cov for updated distribution

* more efficient for repeated measurements

* type handling for std_list

* whitespace / indent

* Update src/mud/util.py

* add unit tests for transform_linear_map

* Update src/mud/util.py

* testing

* more coverage

* readd test deleted by accident
  • Loading branch information
mathematicalmichael authored Apr 25, 2021
1 parent 13946c8 commit 2daea51
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 14 deletions.
27 changes: 25 additions & 2 deletions src/mud/funs.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def mud_sol(A, b, y=None,
return mud_point


def updated_cov(X, init_cov, data_cov):
def updated_cov(X, init_cov=None, data_cov=None):
"""
We start with the posterior covariance from ridge regression
Our matrix R = init_cov^(-1) - X.T @ pred_cov^(-1) @ X
Expand All @@ -186,8 +186,31 @@ def updated_cov(X, init_cov, data_cov):
np.linalg.inv(init_cov) )
We return the updated covariance using a form of it derived
which applies Hua's identity in order to use Woodbury's identity
which applies Hua's identity in order to use Woodbury's identity.
>>> updated_cov(np.eye(2))
array([[1., 0.],
[0., 1.]])
>>> updated_cov(np.eye(2)*2)
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, data_cov=np.eye(3))
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, init_cov=np.eye(2))
array([[0.25, 0. ],
[0. , 0.25]])
"""
if init_cov is None:
init_cov = np.eye(X.shape[1])
else:
assert X.shape[1] == init_cov.shape[1]

if data_cov is None:
data_cov = np.eye(X.shape[0])
else:
assert X.shape[0] == data_cov.shape[1]

pred_cov = X @ init_cov @ X.T
inv_pred_cov = np.linalg.pinv(pred_cov)
# pinv b/c inv unstable for rank-deficient A
Expand Down
61 changes: 49 additions & 12 deletions src/mud/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,61 @@ def std_from_equipment(tolerance=0.1, probability=0.95):
def transform_linear_map(operator, data, std):
"""
Takes a linear map `operator` of size (len(data), dim_input)
or (1, dim_input) for repeated observations, along with
a vector `data` representing observations. It is assumed
that `data` is formed with `M@truth + sigma` where `sigma ~ N(0, std)`
This then transforms it to the MWE form expected by the DCI framework.
It returns a matrix `A` of shape (1, dim_input) and np.float `b`
and transforms it to the MWE form expected by the DCI framework.
>>> X = np.ones((10, 2))
>>> x = np.array([0.5, 0.5]).reshape(-1, 1)
>>> std = 1
>>> d = X @ x
>>> A, b = transform_linear_map(X, d, std)
>>> np.linalg.norm(A @ x + b)
0.0
>>> A, b = transform_linear_map(X, d, [std]*10)
>>> np.linalg.norm(A @ x + b)
0.0
>>> A, b = transform_linear_map(np.array([[1, 1]]), d, std)
>>> np.linalg.norm(A @ x + b)
0.0
>>> A, b = transform_linear_map(np.array([[1, 1]]), d, [std]*10)
Traceback (most recent call last):
...
ValueError: For repeated measurements, pass a float for std
"""
num_observations = len(data)
assert operator.shape[0] == num_observations, "Operator shape mismatch"
if isinstance(std, int) or isinstance(std, float):
std = np.array([std] * num_observations)
if isinstance(std, list) or isinstance(std, tuple):
std = np.array(std)
if isinstance(data, np.ndarray):
data = list(data.ravel())
assert len(std) == num_observations, "Standard deviation shape mismatch"
D = np.diag(1.0 / (std * np.sqrt(num_observations)))
A = np.sum(D @ operator, axis=0)
b = np.sum(np.divide(data, std))
return A, (-1.0 / np.sqrt(num_observations)) * b.reshape(-1, 1)
data = data.ravel()

num_observations = len(data)

if operator.shape[0] > 1: # if not repeated observations
assert operator.shape[0] == num_observations, \
f"Operator shape mismatch, op={operator.shape}, obs={num_observations}"
if isinstance(std, (float, int)):
std = np.array([std] * num_observations)
if isinstance(std, (list, tuple)):
std = np.array(std)
assert len(std) == num_observations, "Standard deviation shape mismatch"
assert 0 not in np.round(std, 14), "Std must be > 1E-14"
D = np.diag(1.0 / (std * np.sqrt(num_observations)))
A = np.sum(D @ operator, axis=0)
else:
if isinstance(std, (list, tuple, np.ndarray)):
raise ValueError("For repeated measurements, pass a float for std")
assert std > 1E-14, "Std must be > 1E-14"
A = np.sqrt(num_observations) / std * operator

b = -1.0 / np.sqrt(num_observations) * np.sum(np.divide(data, std))
return A, b


def transform_linear_setup(operator_list, data_list, std_list):
if isinstance(std_list, (float, int)):
std_list = [std_list] * len(data_list)
# repeat process for multiple quantities of interest
results = [transform_linear_map(o, d, s) for
o, d, s in zip(operator_list, data_list, std_list)]
Expand Down

0 comments on commit 2daea51

Please sign in to comment.