-
Notifications
You must be signed in to change notification settings - Fork 0
/
rakuda.py
330 lines (273 loc) · 12 KB
/
rakuda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""Main module for shin-rakuda evaluation."""
import datetime
import gc
import json
import os
import logging
from typing import List
import ray
import torch
from omegaconf import DictConfig, OmegaConf
from tqdm.asyncio import tqdm
from rakuda.helper import (
bootstrap_percentiles,
create_judge_prompt,
generate_full_prompt,
generate_llm_response,
generate_models_bar_chat,
generate_radar_chart,
init_local_model,
load_datasets_jsonl,
parse_score_from_llm_response,
save_datasets_jsonl,
process_dataset_model_pair,
read_jsonl,
write_jsonl,
)
from rakuda.constants import DEFAULT_BOOTSTRAP_ITERATIONS
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
async def shin_rakuda_eval(config: DictConfig):
"""Main function for shin-rakuda evaluation."""
# Loading config YAML file
try:
config_dict = OmegaConf.to_container(config, resolve=True)
assert isinstance(config_dict, dict)
except AssertionError as exc:
logging.error("Config is not a valid dictionary")
raise ValueError("Invalid configuration") from exc
inference_library = config.get("inference_library", "huggingface")
# Check existing_eval directory
existing_eval_dir = config.get("existing_eval_dir", None)
if existing_eval_dir is not None:
result_dir = f"{config.result_dir}/{existing_eval_dir}"
else:
result_dir = f"{config.result_dir}/{datetime.datetime.now()}"
try:
os.makedirs(result_dir, exist_ok=True)
except OSError as e:
logging.error("Failed to create result directory: %s", e)
raise
# Loop through each dataset and model
for dataset in tqdm(
config_dict["eval_datasets"], desc="Processing datasets", unit="dataset"
):
print(f"Evaluating {dataset['dataset_name']}...")
dataset_path = os.path.join(
config.eval_datasets_dir, f"{dataset['dataset_name']}.jsonl"
)
if not os.path.exists(dataset_path):
print(f"{dataset_path} does not exist.")
continue
input_column = dataset.get("input_column", None)
num_questions = dataset.get("num_questions", 0)
random_questions = dataset.get("random_questions", None)
queries = load_datasets_jsonl(
random_questions, num_questions, dataset_path, input_column
)
# save selected queries as dataset jsonl for future reference
save_datasets_jsonl(result_dir, queries, dataset["dataset_name"])
# loop through each model for evaluation
for model in tqdm(
config_dict["models"], desc="Processing models", unit="model"
):
print("Evaluating model: ", model["model_name"])
eval_result_path, file_exists = process_dataset_model_pair(
dataset, model, config, result_dir, ""
)
if file_exists:
continue
# save the full prompt for evaluation purpose
full_prompt_path = os.path.join(
result_dir,
f"{dataset['dataset_name']}++{model['model_name']}_full_prompt.jsonl",
)
# Initialize the local model first
provider = model.get("provider", "huggingface").strip().lower()
if provider == "huggingface" or provider == "hf":
local_pipeline = init_local_model(model, inference_library)
else:
local_pipeline = None
# Make sure all the turns are being considered
MAX_CONVERSATION_TURNS = max(len(query["turns"]) for query in queries)
chats = [[] for _ in queries]
full_prompts = [[] for _ in queries] # store the full prompt for each query
for turn_idx in range(MAX_CONVERSATION_TURNS):
query_indices = []
prompt_indices = []
llm_coroutines = []
for query_idx, query in enumerate(queries):
if turn_idx < len(query["turns"]):
# Get the user content for this turn
content = query["turns"][turn_idx]
chats[query_idx].append({"role": "user", "content": content})
query_indices.append(query_idx)
prompt_indices.append(
generate_full_prompt(
local_pipeline=local_pipeline,
model_info=model,
chat_prompts=chats[query_idx],
inference_library=inference_library,
)
)
llm_coroutines.append(
# passing local model to generate_llm_response
generate_llm_response(
model=model,
local_model_pipeline=local_pipeline,
chat_prompts=chats[query_idx],
config=config_dict,
)
)
full_prompts[query_idx] = prompt_indices
print(f"Processing {len(llm_coroutines)} queries...")
responses = await tqdm.gather(*llm_coroutines)
for query_idx, llm_response in zip(query_indices, responses):
chats[query_idx].append(
{"role": "assistant", "content": llm_response}
)
write_jsonl(full_prompt_path, full_prompts)
write_jsonl(eval_result_path, chats)
# destroy the local model and free up memory (vllm 0.4.0.post1)
# this method only works for vllm ver <= 0.4.0.post1
if inference_library == "vllm":
del local_pipeline.llm_engine.model_executor
del local_pipeline
gc.collect()
torch.cuda.reset_peak_memory_stats()
torch.cuda.empty_cache()
ray.shutdown()
# pass the result directory to the next function
return result_dir
async def shin_rakuda_judgement(config: DictConfig, result_dir):
"""Load results and judge the responses"""
try:
config_dict = OmegaConf.to_container(config, resolve=True)
assert isinstance(config_dict, dict)
except AssertionError as exc:
logging.error("Config is not a valid dictionary")
raise ValueError("Invalid configuration") from exc
# load the dataset jsonl file
# based on the dataset and models, load the corresponding jsonl file
dataset_result = []
for dataset in tqdm(
config_dict["eval_datasets"], desc="Processing datasets", unit="dataset"
):
# check to see if dataset jsonl file exists in result dir
result_dataset_path = os.path.join(
result_dir, f"{dataset['dataset_name']}.jsonl"
)
if not os.path.exists(
os.path.join(result_dir, f"{dataset['dataset_name']}.jsonl")
):
# load the dataset jsonl file
result_dataset_path = os.path.join(
config.eval_datasets_dir, f"{dataset['dataset_name']}.jsonl"
)
if not os.path.exists(result_dataset_path):
print(f"{result_dataset_path} does not exist.")
continue
input_column = dataset.get("input_column", None)
queries = load_datasets_jsonl(None, 0, result_dataset_path, input_column)
for model in tqdm(
config_dict["models"], desc="Processing models", unit="model"
):
# load the jsonl file
result_path = os.path.join(
result_dir,
f"{dataset['dataset_name']}++{model['model_name']}.jsonl",
)
try:
results = read_jsonl(result_path)
except FileNotFoundError: # if the file does not exist
print(f"{result_path} does not exist.")
continue
# create a new file for judged results
judged_result_path, file_exists = process_dataset_model_pair(
dataset, model, config, result_dir, "_judged"
)
# check if the judge file already exists
if config.get("existing_eval", False) and file_exists:
print(f"{judged_result_path} already exists.")
judged_result = True
else:
judged_result = False
if not judged_result:
scores: List = []
judge_coroutines = []
for i, result in tqdm(
enumerate(results), desc="Judging results", unit="result"
):
# result has all the interaction between the user and the model
# create a judge prompt and have the judge model score the response
judge_prompt = create_judge_prompt(
query=queries[i],
messages=result,
judge_prompt=dataset["judge_prompt_template"],
use_jinja=dataset.get("use_jinja", False),
)
# the judge model doesn't work with local model yet
judge_coroutines.append(
generate_llm_response(
model=config.judge_models[0],
local_model_pipeline=None,
chat_prompts=judge_prompt,
config=config,
)
)
query_category = queries[i].get("category", None)
scores.append(
{
"query": judge_prompt,
"category": query_category,
}
)
responses = await tqdm.gather(*judge_coroutines)
for i, judge_response in enumerate(responses):
scores[i]["score"] = parse_score_from_llm_response(
judge_response, dataset["score_keyword"]
)
scores[i]["response"] = str(judge_response)
# save this model's results to a json file
write_jsonl(judged_result_path, scores)
else:
# load the judged results
scores = read_jsonl(judged_result_path)
mean_score = sum([score["score"] for score in scores]) / len(scores)
n_bootstrap = DEFAULT_BOOTSTRAP_ITERATIONS
print(
f"Running {n_bootstrap} bootstrap samples for {dataset['dataset_name']} {model['model_name']}"
)
confidence_region = bootstrap_percentiles(
scores,
lambda x: sum([score["score"] for score in x]) / len(x),
percentiles=[2.5, 97.5],
n_bootstrap=n_bootstrap,
)
# generate radar graph if multiple categories
generate_radar_chart(
scores=scores,
model_name=model["model_name"],
dataset_name=dataset["dataset_name"],
judge_model=config.judge_models[0],
result_dir=result_dir,
)
dataset_result.append(
{
"dataset": dataset,
"model": model["model_name"],
"result_path": result_path,
"score": mean_score,
"95_confidence_lower": confidence_region[0],
"95_confidence_upper": confidence_region[1],
}
)
# Generate a overall graph with all the scores, based on each dataset with models
generate_models_bar_chat(all_scores=dataset_result, result_dir=result_dir)
# save all the result
final_result_path = os.path.join(
config.result_dir,
f"results_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
)
write_jsonl(final_result_path, dataset_result)