Skip to content

Commit

Permalink
Merge pull request #16 from INM-6/extra/clean_up
Browse files Browse the repository at this point in the history
Extra/clean up
  • Loading branch information
rjurkus authored Jun 23, 2020
2 parents 052062b + 74cbba2 commit f1edefd
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 132 deletions.
171 changes: 67 additions & 104 deletions elephant/causality/granger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
from neo.core import AnalogSignal

Causality = namedtuple('causality',
'directional_causality_x_y directional_causality_y_x instantaneous_causality total_interdependence')
['directional_causality_x_y',
'directional_causality_y_x',
'instantaneous_causality',
'total_interdependence'])


# TODO: The result of pairwise granger outputs three arrays and one float.

def bic(cov, order, dimension, length):
'''
"""
Calculate Bayesian Information Criterion
Parameters
----------
Expand All @@ -47,16 +48,17 @@ def bic(cov, order, dimension, length):
Returns
-------
bic : float
information criterion
'''
bic = 2 * np.log(np.linalg.det(cov)) \
+ 2*(dimension**2)*order*np.log(length)/length
criterion : float
Bayesian Information Criterion
"""
criterion = 2 * np.log(np.linalg.det(cov)) \
+ 2*(dimension**2)*order*np.log(length)/length

return criterion

return bic

def aic(cov, order, dimension, length):
'''
"""
Calculate Akaike Information Criterion
Parameters
----------
Expand All @@ -71,16 +73,17 @@ def aic(cov, order, dimension, length):
Returns
-------
aic : float
information criterion
'''
aic = 2 * np.log(np.linalg.det(cov)) \
+ 2*(dimension**2)*order/length
criterion : float
Akaike Information Criterion
"""
criterion = 2 * np.log(np.linalg.det(cov)) \
+ 2*(dimension**2)*order/length

return criterion

return aic

def _lag_covariances(signals, dimension, max_lag):
"""
r"""
Determine covariances of time series and time shift of itself up to a
maximal lag
Parameters
Expand Down Expand Up @@ -110,20 +113,21 @@ def _lag_covariances(signals, dimension, max_lag):
assert (length >= max_lag), 'maximum lag larger than size of data'

# centralize time series
signals_mean = (signals - np.mean(signals, keepdims = True)).T
signals_mean = (signals - np.mean(signals, keepdims=True)).T

lag_covariances = np.zeros((max_lag+1, dimension, dimension))

# determine lagged covariance for different time lags
for lag in range(0,max_lag+1):
for lag in range(0, max_lag+1):
lag_covariances[lag] = \
np.mean(np.einsum('ij,ik -> ijk',signals_mean[:length-lag],
signals_mean[lag:]), axis = 0)
np.mean(np.einsum('ij,ik -> ijk', signals_mean[:length-lag],
signals_mean[lag:]), axis=0)

return lag_covariances


def _yule_walker_matrix(data, dimension, order):
"""
r"""
Generate matrix for Yule-Walker equation
Parameters
----------
Expand Down Expand Up @@ -163,16 +167,19 @@ def _yule_walker_matrix(data, dimension, order):
for block_column in range(block_row, order):
yule_walker_matrix[block_row*dimension: (block_row+1)*dimension,
block_column*dimension:
(block_column+1)*dimension] = lag_covariances[block_column-block_row].T
(block_column+1)*dimension] = \
lag_covariances[block_column-block_row].T

yule_walker_matrix[block_column*dimension: (block_column+1)*dimension,
yule_walker_matrix[block_column*dimension:
(block_column+1)*dimension,
block_row*dimension:
(block_row+1)*dimension] = lag_covariances[block_column-block_row]
(block_row+1)*dimension] = \
lag_covariances[block_column-block_row]
return yule_walker_matrix, lag_covariances


def _vector_arm(signals, dimension, order):
"""
r"""
Determine coefficients of autoregressive model from time series data
Parameters
----------
Expand Down Expand Up @@ -206,34 +213,36 @@ def _vector_arm(signals, dimension, order):
"""

yule_walker_matrix, lag_covariances = _yule_walker_matrix(signals, dimension, order)
yule_walker_matrix, lag_covariances = \
_yule_walker_matrix(signals, dimension, order)

positive_lag_covariances = np.reshape(lag_covariances[1:], (dimension*order, dimension))
positive_lag_covariances = np.reshape(lag_covariances[1:],
(dimension*order, dimension))

lstsq_coeffs = np.linalg.lstsq(yule_walker_matrix, positive_lag_covariances)[0]
lstsq_coeffs = \
np.linalg.lstsq(yule_walker_matrix, positive_lag_covariances)[0]

coeffs = []
for index in range(order):
coeffs.append(lstsq_coeffs[index*dimension:(index+1)*dimension, ].T)

coeffs = np.stack(coeffs)

#cov_matrix = np.zeros((dimension, dimension))

cov_matrix = np.copy(lag_covariances[0])
for i in range(order):
cov_matrix -= np.matmul(coeffs[i], lag_covariances[i+1])

return coeffs, cov_matrix


def _optimal_vector_arm(signals, dimension, max_order,
information_criterion = 'bic'):
'''
information_criterion='bic'):
"""
Determine optimal auto regressive model by choosing optimal order via
Information Criterion
Parameters
----------
signal : np.ndarray
signals : np.ndarray
time series data
dimension : int
dimensionality of the data
Expand All @@ -251,24 +260,22 @@ def _optimal_vector_arm(signals, dimension, max_order,
covariance matrix of
optimal_order : int
optimal order
'''

current_optimal_order = 1
"""

length = np.size(signals[0])

optimal_ic = np.infty
optimal_order = 1
optimal_coeffs = np.zeros((dimension,dimension, optimal_order))
optimal_coeffs = np.zeros((dimension, dimension, optimal_order))
optimal_cov_matrix = np.zeros((dimension, dimension))

if information_criterion == 'bic':
evaluate_ic = bic
elif information_criterion == 'aic':
evaluate_ic = aic
else:
raise ValueError(f"Information criterion {information_criterion} not valid")

raise ValueError(
f"Information criterion {information_criterion} not valid")

for order in range(1, max_order + 1):
coeffs, cov_matrix = _vector_arm(signals, dimension, order)
Expand All @@ -283,16 +290,17 @@ def _optimal_vector_arm(signals, dimension, max_order,

return optimal_coeffs, optimal_cov_matrix, optimal_order

def pairwise_granger(signals, max_order, information_criterion = 'bic'):
"""

def pairwise_granger(signals, max_order, information_criterion='bic'):
r"""
Determine Granger Causality of two time series
Note: order parameter should be removed
Parameters
----------
signals : np.ndarray or neo.AnalogSignal
time series data
order : int
order of autoregressive model (should be removed)
max_order : int
maximal order of autoregressive model
information_criterion : string
bic for Bayesian information_criterion,
aic for Akaike information criterion
Expand Down Expand Up @@ -336,30 +344,15 @@ def pairwise_granger(signals, max_order, information_criterion = 'bic'):
coeffs_y, var_y, p_2 = _optimal_vector_arm(signal_y, 1, max_order,
information_criterion)
coeffs_xy, cov_xy, p_3 = _optimal_vector_arm(signals, 2, max_order,
information_criterion)
print('########################################')
print(p_1)
print(p_2)
print(p_3)
print('########################################')
print(coeffs_xy)
print(cov_xy)
print('########################################')
print(coeffs_x)
print(var_x)
print('########################################')
print(coeffs_y)
print(var_y)
print('########################################')
information_criterion)

directional_causality_y_x = np.log(var_x[0]/cov_xy[0, 0])
# print(f'directional_y_x is {var_x[0]/cov_xy[0, 0]}. The variance of x is {var_x[0]}, the covariance_xy is {cov_xy[0, 0]}')
directional_causality_x_y = np.log(var_y[0]/cov_xy[1, 1])
# print(f'directional_x_y is {var_y[0]/cov_xy[0, 0]}. The variance of x is {var_y[0]}, the covariance_xy is {cov_xy[0, 0]}')

cov_determinant = np.linalg.det(cov_xy)

instantaneous_causality = np.log((cov_xy[0, 0]*cov_xy[1, 1])/cov_determinant)
instantaneous_causality = \
np.log((cov_xy[0, 0]*cov_xy[1, 1])/cov_determinant)
instantaneous_causality = np.asarray(instantaneous_causality)

total_interdependence = np.log(var_x[0]*var_y[0]/cov_determinant)
Expand All @@ -372,48 +365,18 @@ def pairwise_granger(signals, max_order, information_criterion = 'bic'):
length = np.size(signal_x)
asymptotic_std_error = 1/np.sqrt(length)
est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))
print(est_sig_figures)

directional_causality_x_y_round = np.around(directional_causality_x_y,
est_sig_figures)
est_sig_figures)
directional_causality_y_x_round = np.around(directional_causality_y_x,
est_sig_figures)
est_sig_figures)
instantaneous_causality_round = np.around(instantaneous_causality,
est_sig_figures)
total_interdependence_round = directional_causality_x_y_round \
+ directional_causality_y_x_round \
+ instantaneous_causality_round

return Causality(directional_causality_x_y=directional_causality_x_y_round.item(),
directional_causality_y_x=directional_causality_y_x_round.item(),
instantaneous_causality=instantaneous_causality_round.item(),
total_interdependence=total_interdependence_round.item())


if __name__ == "__main__":

np.random.seed(1)
length_2d = 300
signal = np.zeros((2, length_2d))

order = 2
weights_1 = np.array([[0.9, 0], [0.9, -0.8]])
weights_2 = np.array([[-0.5, 0], [-0.2, -0.5]])

weights = np.stack((weights_1, weights_2))

noise_covariance = np.array([[1., 0.2], [0.2, 1.]])

for i in range(length_2d):
for lag in range(order):
signal[:, i] += np.dot(weights[lag],
signal[:, i - lag - 1])
rnd_var = np.random.multivariate_normal([0, 0], noise_covariance)
signal[0, i] += rnd_var[0]
signal[1, i] += rnd_var[1]

np.save('/home/jurkus/granger_data', signal)
causality = pairwise_granger(signal, 10, 'bic')

print(causality)

est_sig_figures)
total_interdependence_round = np.around(total_interdependence,
est_sig_figures)

return Causality(
directional_causality_x_y=directional_causality_x_y_round.item(),
directional_causality_y_x=directional_causality_y_x_round.item(),
instantaneous_causality=instantaneous_causality_round.item(),
total_interdependence=total_interdependence_round.item())
Loading

0 comments on commit f1edefd

Please sign in to comment.