forked from rtaormina/aeed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaeed.py
181 lines (147 loc) · 6.31 KB
/
aeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# numpy stack
import numpy as np
import pandas as pd
# keras
from keras.layers import Input, Dense
from keras.models import Model, load_model
from keras import optimizers
# sklearn
from sklearn.metrics import mean_squared_error
# json
import json
# classes
class AutoEncoder(object):
""" Keras-based AutoEncoder (AE) class used for event detection.
Attributes:
params: dictionary with parameters defining the AE structure,
"""
def __init__(self, **kwargs):
""" Class constructor, stores parameters and initialize AE Keras model. """
# Default parameters values. If nI is not given, the code will crash later.
params = {
'nI': None,
'nH': 3,
'cf': 1,
'activation' : 'tanh',
'optimizer' : None,
'verbose' : 0
}
for key,item in kwargs.items():
params[key] = item
self.params = params
def create_model(self):
""" Creates Keras AE model.
The model has nI inputs, nH hidden layers in the encoder (and decoder)
and cf compression factor. The compression factor is the ratio between
the number of inputs and the innermost hidden layer which stands between
the encoder and the decoder. The size of the hidden layers between the
input (output) layer and the innermost layer decreases (increase) linearly
according to the cg.
"""
# retrieve params
nI = self.params['nI'] # number of inputs
nH = self.params['nH'] # number of hidden layers in encoder (decoder)
cf = self.params['cf'] # compression factor
activation = self.params['activation'] # autoencoder activation function
optimizer = self.params['optimizer'] # Keras optimizer
verbose = self.params['verbose'] # echo on screen
# get number/size of hidden layers for encoder and decoder
temp = np.linspace(nI,nI/cf,nH + 1).astype(int)
nH_enc = temp[1:]
nH_dec = temp[:-1][::-1]
# input layer placeholder
input_layer = Input(shape=(nI,))
# build encoder
for i, layer_size in enumerate(nH_enc):
if i == 0:
# first hidden layer
encoder = Dense(layer_size, activation=activation)(input_layer)
else:
# other hidden layers
encoder = Dense(layer_size, activation=activation)(encoder)
# build decoder
for i, layer_size in enumerate(nH_dec):
if i == 0:
# first hidden layer
decoder = Dense(layer_size, activation=activation)(encoder)
else:
# other hidden layers
decoder = Dense(layer_size, activation=activation)(decoder)
# create autoencoder
autoencoder = Model(input_layer, decoder)
if optimizer == None:
optimizer = optimizers.Adam(lr = 0.001)
# print autoencoder specs
if verbose > 0:
print('Created autoencoder with structure:');
print(', '.join('layer_{}: {}'.format(v, i) for v, i in enumerate(np.hstack([nI,nH_enc,nH_dec]))))
# compile and return model
autoencoder.compile(optimizer=optimizer, loss='mean_squared_error')
return autoencoder
def train(self, x, **train_params):
""" Train autoencoder,
x: inputs (inputs == targets, AE are self-supervised ANN).
"""
if self.params['verbose']:
if self.ann == None:
print('Creating model.')
self.create_model()
self.ann.fit(x, x, **train_params)
def predict(self, x, test_params={}):
""" Yields reconstruction error for all inputs,
x: inputs.
"""
return self.ann.predict(x, **test_params)
class AEED(AutoEncoder):
""" This class extends the AutoEncoder class to include event detection
functionalities.
"""
def initialize(self):
""" Create the underlying Keras model. """
self.ann = self.create_model()
def predict(self, x, **keras_params):
""" Predict with autoencoder. """
preds = pd.DataFrame(index=x.index,columns=x.columns,
data=super(AEED, self).predict(x.values,keras_params))
errors = (x-preds)**2
return preds, errors
def detect(self, x, theta, window = 1, average=False, sys_theta = 0, **keras_params):
""" Detection performed based on (smoothed) reconstruction errors.
x = inputs,
theta = threshold, attack flagged if reconstruction error > threshold,
window = length of the smoothing window (default = 1 timestep, i.e. no smoothing),
average = boolean (default = False), if True the detection is performed
on the average reconstruction error across all outputs,
keras_params = parameters for the Keras-based AE prediction.
"""
# preds = super(AEED, self).predict(x,keras_params)
preds, temp = self.predict(x, **keras_params)
temp = (x-preds)**2
if average:
errors = temp.mean(axis=1).rolling(window=window).mean()
detection = errors > theta
else:
errors = temp.rolling(window=window).mean()
detection = errors.apply(lambda x: x>np.max(theta.name, sys_theta))
return detection, errors
def save(self, filename):
""" Save AEED model.
AEED parameters saved in a .json, while Keras model is stored in .h5 .
"""
# parameters
with open(filename+'.json', 'w') as fp:
json.dump(self.params, fp)
# keras model
self.ann.save(filename+'.h5')
# echo
print('Saved AEED parameters to {0}.\nKeras model saved to {1}'.format(filename+'.json', filename+'.h5'))
# functions
def load_AEED(params_filename, model_filename):
""" Load stored AEED. """
# load params and create AEED
with open(params_filename) as fd:
params = json.load(fd)
aeed = AEED(**params)
# load keras model
aeed.ann = load_model(model_filename)
return aeed