This repository has been archived by the owner on Oct 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevaluator.py
243 lines (203 loc) · 8.93 KB
/
evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
from sklearn.decomposition import PCA
from itertools import combinations
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from ray.air.result import Result
from sklearn.model_selection import KFold
from torch.nn import CosineSimilarity, MSELoss, PairwiseDistance
from torch.optim import AdamW
from torch.optim.optimizer import Optimizer
from torch.utils.data import DataLoader, Subset
from data import CustomDataset
from models.probes import DoubleLinearProbe, SingleLinearProbe
from tqdm import tqdm
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class CrossEvaluator:
def __init__(
self,
tuned_params: Result,
config: Dict[str, Any],
dataset: CustomDataset,
exp_name: str,
state: int,
data_name: str,
) -> None:
self._config = config
self._dataset = dataset
self._data_name = data_name
self._splits = KFold(
n_splits=self._config["cv_k"], shuffle=True, random_state=42
)
self._tuned_params = self._get_config(tuned_params)
self._cosine_sim = CosineSimilarity(dim=1, eps=1e-6)
self._euclid_dis = PairwiseDistance(p=2.0, eps=1e-06, keepdim=False)
self._state = state
self._exp_name = exp_name
def _get_config(self, tuned_results: Result) -> Dict[str, Any]:
if tuned_results.config:
return tuned_results.config
else:
raise ValueError("No tuned hyper-parameters available.")
def _get_model(self, config: Dict[str, Any]) -> nn.Module:
if config["id"] == "single_linear_probe":
return SingleLinearProbe(
config["dim_input"]
)
elif config["id"] == "double_linear_probe":
return DoubleLinearProbe(
config["dim_input"],
config["dim_output"],
)
else:
raise ValueError(f"{self._config['model']} is not supported")
def _get_optimizer(self, model: nn.Module) -> Optimizer:
return AdamW(
params=model.parameters(),
lr=self._tuned_params["optimizer"]["AdamW"]["lr"],
betas=(
self._tuned_params["optimizer"]["AdamW"]["beta1"],
self._tuned_params["optimizer"]["AdamW"]["beta2"],
),
weight_decay=self._tuned_params["optimizer"]["AdamW"]["weight_decay"],
)
@torch.no_grad()
def _compute_similarity(
self, batch_embeddings: List[torch.Tensor], measure: str, model: nn.Module
) -> torch.Tensor:
model.eval()
# retrieve X embeddings and run them through the model
x = batch_embeddings[0].to(DEVICE)
pred = model(x)
if measure == "cosine-similarity":
similarity = self._cosine_sim
elif measure == "euclidean-distance":
similarity = self._euclid_dis
else:
raise ValueError(f"{measure} is not supported")
# each row stores the similarities for the sentences (a, b, c, d) in a
# given sample: combinations([a, b, c, d], 2) -> ab, ac, ad, bc, bd, cd
similarities = torch.stack(
[
similarity(e1, e2)
# *batch_embeddings[1:]: all tensors except the first one i.e.
# the original x tensor embedding from which we got pred
for e1, e2 in combinations([pred, *batch_embeddings[1:]], 2)
]
).T
return similarities
def _store_as_dataframe(
self, sents: List[Tuple], sims: Union[torch.Tensor, None]
) -> pd.DataFrame:
df_sent = pd.DataFrame(sents)
if sims is None:
raise ValueError("No similaritis have been stored.")
else:
df_sims = pd.DataFrame(sims.cpu().numpy())
df = pd.concat([df_sent, df_sims], axis=1)
headers_sent = [f"sent_{i}" for i in range(len(sents[0]))]
# assumption: there are 6 similarity measures in the resulting dataframe
headers_sims = [f"pred-Y", "pred-d1", "pred-d2", "Y-d1", "Y-d2", "d1-d2"]
df.columns = [*headers_sent, *headers_sims]
return df
def _train(
self,
train_loader: DataLoader,
model: nn.Module,
optimizer: Optimizer,
criterion: MSELoss
) -> None:
model.train()
for batch in train_loader:
input_embedding = batch.get("x").to(DEVICE)
target_embedding = batch.get("y").to(DEVICE)
out = model(input_embedding)
batch_loss = criterion(out, target_embedding)
optimizer.zero_grad()
batch_loss.backward()
optimizer.step()
def _test(self, test_loader: DataLoader, fold: int, model: nn.Module) -> None:
for measure in ["cosine-similarity", "euclidean-distance"]:
all_sents = []
all_sims = None
for batch in test_loader:
# retrieve and store sentences pertaining to a single sample
all_sents.extend([sample for sample in zip(*batch["sample"])])
sims = self._compute_similarity(
[batch["x"], batch["y"], batch["d1"], batch["d2"]], measure, model
)
all_sims = sims if all_sims is None else torch.cat([all_sims, sims])
res_dir = (
Path(__file__).parent
/ f"results_{self._exp_name}_{self._data_name}"
/ f"{model}_{self._state}_{measure}"
)
res_dir.mkdir(parents=True, exist_ok=True)
df = self._store_as_dataframe(all_sents, all_sims)
csv_name = (
f"{self._exp_name}_{model}_{measure}_state[{self._state}]_fold[{fold}]"
)
df.to_csv(res_dir / f"{csv_name}.csv", index=False)
@torch.no_grad()
def _get_low_dim_embeddings(
self, test_loader: DataLoader, model: nn.Module
) -> pd.DataFrame:
model.eval()
fold_embeddings = np.array([]).reshape(0, self._config["dim_output"])
labels = []
for batch in test_loader:
pred_x = model(batch["x"]).detach()
pred_x = pred_x.view(-1, self._config["dim_output"]).cpu().numpy()
fold_embeddings = np.vstack([fold_embeddings, pred_x])
labels.extend([0 for _ in range(len(batch["x"]))])
batch_y = batch["y"].view(-1, self._config["dim_output"]).cpu().numpy()
fold_embeddings = np.vstack([fold_embeddings, batch_y])
labels.extend([1 for _ in range(len(batch["y"]))])
batch_d1 = batch["d1"].view(-1, self._config["dim_output"]).cpu().numpy()
fold_embeddings = np.vstack([fold_embeddings, batch_d1])
labels.extend([2 for _ in range(len(batch["d1"]))])
batch_d2 = batch["d2"].view(-1, self._config["dim_output"]).cpu().numpy()
fold_embeddings = np.vstack([fold_embeddings, batch_d2])
labels.extend([3 for _ in range(len(batch["d2"]))])
fold_embeddings = PCA(n_components=2).fit_transform(fold_embeddings)
labels = np.asarray(labels).reshape(-1, 1)
df = pd.DataFrame(
np.hstack([fold_embeddings, labels]), columns=["x", "y", "labels"]
)
return df
def cross_evaluate(self) -> None:
dataset_idx = np.arange(len(self._dataset))
splits_indices = list(self._splits.split(dataset_idx))
for fold, (train_idx, test_idx) in enumerate((pbar := (tqdm(splits_indices)))):
# get a new model, loss and optimizer for each fold
model = self._get_model(self._config).to(DEVICE)
criterion = MSELoss()
optimizer = self._get_optimizer(model)
pbar.set_description(
(
f"\n{self._exp_name} -- evaluate {model} on {self._data_name}"
f" | state: {self._state} | fold: {fold}"
)
)
train_loader = DataLoader(
dataset=Subset(indices=train_idx.tolist(), dataset=self._dataset),
batch_size=self._tuned_params["batch_size"],
)
test_loader = DataLoader(
dataset=Subset(indices=test_idx.tolist(), dataset=self._dataset),
batch_size=self._tuned_params["batch_size"],
)
for _ in range(self._config["train_epochs"]):
self._train(train_loader, model, optimizer, criterion)
# evaluate on current test fold
self._test(test_loader, fold, model)
# store fold embeddings
fold_embeddings = self._get_low_dim_embeddings(test_loader, model)
(Path(__file__).parent / f"embeddings_{self._exp_name}").mkdir(exist_ok=True)
fold_embeddings.to_csv(Path(__file__).parent /
f"embeddings_{self._exp_name}" /
f"{self._exp_name}_{self._config['id']}_state[{self._state}]_fold[{fold}].csv"
)