-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgenerator.py
147 lines (123 loc) · 4.94 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
'''
This module contains the data generator class
'''
# from future.utils import implements_iterator
from .rv_gen import mv_rejective
from ica import ica
import numpy as np
#import rpy2.robjects as robjects
#from rpy2.robjects.packages import importr
#from scipy.stats import ttest_ind
from sklearn.decomposition import PCA, SparsePCA
from sklearn.preprocessing import LabelEncoder
import logging
logging.basicConfig(format="[%(module)s:%(levelname)s]:%(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def empirical_mn(mean, covar, size):
return np.random.multivariate_normal(mean, covar, size)
#def empirical_mnr(mean, covar, size):
# MASS = importr('MASS')
# sample_mean = robjects.FloatVector(np.array(mean))
# temp = robjects.FloatVector(covar.ravel())
# sample_cov = robjects.r['matrix'](temp, nrow=covar.shape[0])
# new_mixing = np.array(MASS.mvrnorm(n=size, mu=sample_mean,
# Sigma=sample_cov,
# empirical=True))
# return new_mixing
# @implements_iterator
class DataGeneratorByGroup(object):
def __init__(self, X, y, n_components=20, n_samples=100,
n_batches=1000, method='normal',
decomposition_method='ica'):
self.decomposition_method = decomposition_method
self.n_components = n_components
self.n_samples = n_samples
self.method = method
self.n_batches = n_batches
assert len(X) == len(y)
if decomposition_method == 'ica':
model = ica(n_components)
self.mixing, self.sources = model.fit(X)
elif decomposition_method == 'pca':
model = PCA(n_components)
self.mixing = model.fit_transform(X)
self.sources = model.components_
elif decomposition_method == 'sparsePCA':
model = SparsePCA(n_components, alpha=0.01)
self.mixing = model.fit_transform(X)
self.sources = model.components_
else:
logger.info('Method: {}, not implemented'.format(
decomposition_method))
# Encode labels
self.le = LabelEncoder()
self.le.fit(y)
logger.info('Classes: {}'.format(self.le.classes_))
self.y = self.le.transform(y)
self.n_classes = len(self.le.classes_)
# partition mixing matrix by label
self.params = {'mean': [], 'cov': [], 'hist': []}
for label in range(self.n_classes):
keep = np.where(self.y == label)
mix = self.mixing[keep]
if method == 'normal':
self.params['mean'].append(mix.mean(axis=0))
self.params['cov'].append(np.cov(mix, rowvar=0))
elif method == 'rejective':
self.params['hist'].append(
[np.histogram(column, density=True, bins=20)
for column in mix.T])
else:
logger.info('Method {}, not implemented'.format(method))
self.batch = 0
@property
def batch_label(self):
labels = []
for label in range(self.n_classes):
true_label = self.le.inverse_transform(label)
labels.extend([true_label] * self.n_samples)
return labels
def __iter__(self):
self.batch = 0
return self
def __next__(self):
self.batch += 1
if self.batch > self.n_batches:
raise StopIteration
new_data = []
labels = []
for aclass in range(self.n_classes):
if self.method == 'normal':
new_mix = empirical_mn(self.params['mean'][aclass],
self.params['cov'][aclass],
self.n_samples)
elif self.method == 'rejective':
new_mix = mv_rejective(self.params['hist'][aclass],
self.n_samples)
new_data.append(np.dot(new_mix, self.sources))
labels = self.batch_label
return (np.vstack(new_data), labels)
# @implements_iterator
#class DataGenerator(object):
# '''
# Class that generates data using ICA and a RV generator method
# '''
# def __init__(self, data,
# n_components=20,
# n_samples=100,
# n_batches=1000,#
#
# def __next__(self):
# self.batch += 1
# if self.batch > self.n_batches:
# raise StopIteration
# if self.method == 'normal':
# new_mixing = empirical_mn(self.parameters['sample_mean'],
# self.parameters['sample_cov'],
# self.parameters['n_samples'])#
# if self.method == 'rejective':
# new_mixing = mv_rejective(self.parameters['sample_hist'],
# self.parameters['n_samples'])#
# new_data = np.dot(new_mixing, self.sources) # + self.data_mean
# return new_data