-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevaluate.py
More file actions
308 lines (238 loc) · 11.5 KB
/
evaluate.py
File metadata and controls
308 lines (238 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import pandas as pd
import numpy as np
from scipy.spatial.distance import cdist
from sentence_transformers import SentenceTransformer
import os, json
import argparse
import matplotlib.pyplot as plt
import seaborn as sns
class Metrics():
@staticmethod
def get_confusion_matrix(predicted, actual):
conf_matrix = np.zeros((2, 2))
for pred,act in zip(predicted, actual):
conf_matrix[act, pred] += 1
return conf_matrix
@staticmethod
def get_TP(confusion_matrix, label):
tp = confusion_matrix[label][label]
return tp
@staticmethod
def get_FN(confusion_matrix,label):
row = confusion_matrix[label,]
row_truepositives = row[label]
fn = row.sum() - row_truepositives
return fn
@staticmethod
def get_FP(confusion_matrix,tag):
col = confusion_matrix[:,tag]
col_tp = col[tag]
fp = col.sum() - col_tp # sum of all values in column except tp
return fp
@staticmethod
def macro_precision(conf_matrix):
precision = 0.0
for label in [0,1]:
dividor= Metrics.get_TP(conf_matrix, label) + \
Metrics.get_FP(conf_matrix, label)
if dividor != 0.0:
precision += (Metrics.get_TP(conf_matrix, label))/dividor
return (precision / 2)
@staticmethod
def macro_recall(conf_matrix):
recall = 0.0
for label in [0,1]:
dividor = Metrics.get_TP(conf_matrix, label) + \
Metrics.get_FN(conf_matrix, label)
if dividor != 0.0:
recall += (Metrics.get_TP(conf_matrix,label)) / dividor
return (recall / 2)
@staticmethod
def F1(precision,recall):
return (2*precision*recall)/(precision+recall)
@staticmethod
def get_macro_metrics(predictions, test_labels):
conf_matrix = Metrics.get_confusion_matrix(predictions, test_labels)
precision = Metrics.macro_precision(conf_matrix)
recall = Metrics.macro_recall(conf_matrix)
f1_score = Metrics.F1(precision,recall)
return (precision, recall, f1_score)
@staticmethod
def weighted_precision(conf_matrix, test_samples):
accum =0
for label in [0,1]:
true_sample = [sample for sample in test_samples if sample==label]
if (Metrics.get_TP(conf_matrix,label) + Metrics.get_FP(conf_matrix,label)) !=0:
accum += float(len(true_sample)) *(Metrics.get_TP(conf_matrix,label)/ \
(Metrics.get_TP(conf_matrix,label) + Metrics.get_FP(conf_matrix,label)))
precision = accum / len(test_samples)
return precision
@staticmethod
def weighted_recall(conf_matrix, test_samples):
accum =0
for label in [0,1]:
true_sample = [sample for sample in test_samples if sample==label ]
if (Metrics.get_TP(conf_matrix,label) + Metrics.get_FN(conf_matrix,label)) != 0:
accum += float(len(true_sample)) * (Metrics.get_TP(conf_matrix,label) / \
(Metrics.get_TP(conf_matrix,label) + Metrics.get_FN(conf_matrix,label)))
recall = accum / len(test_samples)
return recall
@staticmethod
def get_weighted_metrics(predictions,test_labels):
conf_matrix = Metrics.get_confusion_matrix(predictions, test_labels)
precision = Metrics.weighted_precision(conf_matrix, test_labels)
recall = Metrics.weighted_recall(conf_matrix, test_labels)
f1_score = Metrics.F1(precision, recall)
return (precision, recall, f1_score)
@staticmethod
def compute(df, column, thresh=0.5):
metrics = {}
preds = list(df[column].apply(lambda x: 1 if x>thresh else 0).values)
true = list(df['label'].values)
# Confusion matrix
conf_matrix = Metrics.get_confusion_matrix(preds, true)
# Calculating the macro metrics
macro_precision, macro_recall, macro_f1 = Metrics.get_macro_metrics(preds, true)
metrics["macro_precision"] = macro_precision; metrics["macro_recall"] = macro_recall
metrics["macro_f1"] = macro_f1
# Calculating the weighted metrics
weighted_precision, weighted_recall, weighted_f1 = Metrics.get_weighted_metrics(preds, true)
metrics["weighted_precision"] = weighted_precision; metrics["weighted_recall"] = weighted_recall
metrics["weighted_f1"] = weighted_f1
# Calculating the mean difference
pos_mean = df[df["label"]==1][column].mean()
neg_mean = df[df["label"]==0][column].mean()
diff = pos_mean - neg_mean
metrics["mean_difference"] = diff
metrics["pos_mean"] = pos_mean; metrics["neg_mean"] = neg_mean
return metrics, conf_matrix
class Evaluator():
def __init__(self, model_path, dataset_path, out_path, method, column_1="question",
column_2="paraphrase", verbose=False, **kwargs):
"""
Initialize the evaluator.
@param model_path: Path to the sentence embedding model.
@param dataset_path: Path to the dataset.
@param out_path: Path to the output file.
@param method: Name of the current method.
@param column_1: Name of the column containing the first sentence.
@param column_2: Name of the column containing the second sentence.
@param verbose: Whether to print the progress.
"""
self.verbose = verbose
self.data = pd.read_csv(dataset_path)
self.out_path = out_path
self.method = method
self.column_1 = column_1; self.column_2 = column_2
if verbose:
print("[INFO] Loading model...")
self.model = SentenceTransformer(model_path)
self.threshold = 0.5
if "threshold" in kwargs: self.threshold = kwargs["threshold"]
self.dataset_name = kwargs.get("dataset_name", "unknown")
def __score_biencoder(self, sentence_1, sentence_2):
"""
Helper function to find the cosine distance between two sentences.
"""
emb_1 = self.model.encode(sentence_1, convert_to_numpy = True)
emb_2 = self.model.encode(sentence_2, convert_to_numpy = True)
return (1-cdist(emb_1.reshape(1,-1),emb_2.reshape(1,-1),'cosine'))[0][0]
def __get_score(self):
"""
Adds a column in data with the cosine scores for all pairs
"""
if self.verbose: print("[INFO] Calculating the scores...")
self.data[self.method] = self.data.apply(lambda x: self.__score_biencoder( \
x[self.column_1], x[self.column_2]), axis=1)
def __round_metrics(self, metrics):
"""
rounds all the numbers to 3 decimal places
"""
for key in metrics:
metrics[key] = round(metrics[key], 3)
return metrics
def get_metrics(self):
self.__get_score()
if self.verbose: print("[INFO] Computing metrics...")
metrics, conf_matrix = Metrics.compute(self.data, self.method, self.threshold)
metrics = self.__round_metrics(metrics)
metrics["method"] = self.method
metrics["threshold"] = self.threshold
return metrics, conf_matrix
def __save_conf_matrix(self, conf_matrix):
"""
Saves the confusion matrix in a csv file.
"""
if self.verbose: print("[INFO] Saving the confusion matrix...")
# Convert numpy array of floats into a list of integers
conf_matrix = conf_matrix.astype(int).tolist()
ax = sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="YlGnBu")
ax.set_title(f'{self.method}');
ax.set_xlabel('Predicted Values')
ax.set_ylabel('Actual Values');
## Ticket labels - List must be in alphabetical order
ax.xaxis.set_ticklabels(['Invalid','Valid'])
ax.yaxis.set_ticklabels(['Invalid','Valid'])
directory = os.path.join(os.path.dirname(self.out_path), self.dataset_name)
if not os.path.exists(directory): os.makedirs(directory)
out_path = os.path.join(directory, f"{self.method}.pdf")
plt.savefig(out_path)
def __write_metrics_json(self):
info = []
metrics, conf_matrix = self.get_metrics()
self.__save_conf_matrix(conf_matrix)
if os.path.exists(self.out_path):
with open(self.out_path, "r") as f:
info = json.load(f)
if type(info) != list:
info = [info]
info.append(metrics)
if self.verbose: print("[INFO] Writing metrics to file...")
with open(self.out_path, "w") as f:
json.dump(info, f)
def __write_metrics_csv(self):
metrics, conf_matrix = self.get_metrics()
self.__save_conf_matrix(conf_matrix)
if os.path.exists(self.out_path):
df = pd.read_csv(self.out_path)
columns = set(df.columns); metrics_keys = set(metrics.keys())
if columns != metrics_keys:
raise Exception("[ERROR] Metrics columns do not match")
metrics_df = pd.DataFrame(metrics, index=[0])
df = pd.concat([df, metrics_df], ignore_index=True).reset_index(drop=True)
else:
df = pd.DataFrame(metrics, index=[0])
if self.verbose: print("[INFO] Writing metrics to file...")
df.to_csv(self.out_path, index=False)
def save_metrics(self):
if self.out_path.endswith(".json"):
self.__write_metrics_json()
elif self.out_path.endswith(".csv"):
self.__write_metrics_csv()
else:
raise Exception("[ERROR] Output file must be a .json or .csv file.")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model_path", "-m", type=str, required=True,
help="Path to the sentence embedding model.")
parser.add_argument("--dataset_path", "-d", type=str, required=True,
help="Path to the dataset.")
parser.add_argument("--out_path", "-o", type=str, required=True,
help="Path to the output file.")
parser.add_argument("--method", "-mth", type=str, default="ParaQD",
help="Name of the current method.")
parser.add_argument("--column_1", "-c1", type=str, default="question",
help="Name of the column containing the first sentence.")
parser.add_argument("--column_2", "-c2", type=str, default="paraphrase",
help="Name of the column containing the second sentence.")
parser.add_argument("--verbose", "-v", action="store_true",
help="Whether to print the progress.")
parser.add_argument("--threshold", "-t", type=float, default=0.5,
help="Threshold to use for the evaluation.")
parser.add_argument("--dataset_name", "-n", type=str, default="unknown",
help="Name of the dataset.")
args = parser.parse_args()
evaluator = Evaluator(args.model_path, args.dataset_path, args.out_path, args.method,
args.column_1, args.column_2, args.verbose, threshold=args.threshold,
dataset_name=args.dataset_name)
evaluator.save_metrics()