From 2137d6728af56a9f0d24f46114e9092797fd8a7f Mon Sep 17 00:00:00 2001 From: Ekaterina Aidova Date: Tue, 17 Sep 2024 19:58:27 +0400 Subject: [PATCH] use streamer for metrics calculation (#874) Co-authored-by: Andrei Kochin --- llm_bench/python/benchmark.py | 113 ++++++++++++++++++- llm_bench/python/llm_bench_utils/ov_utils.py | 33 +++++- 2 files changed, 143 insertions(+), 3 deletions(-) diff --git a/llm_bench/python/benchmark.py b/llm_bench/python/benchmark.py index 321441364d..ea0fa8f0a4 100644 --- a/llm_bench/python/benchmark.py +++ b/llm_bench/python/benchmark.py @@ -216,6 +216,112 @@ def run_text_generation(input_text, num, model, tokenizer, args, iter_data_list, bench_hook.clear_time_infer_list() +def run_text_generation_genai_with_stream(input_text, num, model, tokenizer, args, iter_data_list, md5_list, prompt_index, streamer, model_precision, proc_id): + set_seed(args['seed']) + input_text_list = [input_text] * args['batch_size'] + if args["output_dir"] is not None and num == 0: + for bs_index, in_text in enumerate(input_text_list): + llm_bench_utils.output_file.output_input_text(in_text, args, model_precision, prompt_index, bs_index, proc_id) + pt_inputs = tokenizer(input_text_list, return_tensors="pt") + input_token_size = pt_inputs.input_ids.shape[1] + pipe_tokenizer = model.get_tokenizer() + tok_encode_start = time.perf_counter() + input_data = pipe_tokenizer.encode(input_text_list) + tok_encode_end = time.perf_counter() + tok_encode_time = (tok_encode_end - tok_encode_start) * 1000 + if args['batch_size'] > 1: + out_str = '[warm-up]' if num == 0 else '[{}]'.format(num) + out_str += " Batch_size={}, ".format(args['batch_size']) + out_str += 'all input token size after padding: {} * {}, '.format(input_token_size, args['batch_size']) + if args['infer_count'] is not None: + out_str += 'all max_output_token_size: {} * {}'.format(args['infer_count'], args['batch_size']) + log.info(out_str) + + max_rss_mem_consumption = '' + max_uss_mem_consumption = '' + max_shared_mem_consumption = '' + if (args['mem_consumption'] == 1 and num == 0) or args['mem_consumption'] == 2: + mem_consumption.start_collect_memory_consumption() + max_gen_tokens = DEFAULT_OUTPUT_TOKEN_SIZE if args['infer_count'] is None else args['infer_count'] + streamer.reset() + start = time.perf_counter() + generated_tokens = model.generate(input_data, max_new_tokens=max_gen_tokens, num_beams=args["num_beams"], streamer=streamer).tokens + end = time.perf_counter() + if (args['mem_consumption'] == 1 and num == 0) or args['mem_consumption'] == 2: + mem_consumption.end_collect_momory_consumption() + max_rss_mem_consumption, max_shared_mem_consumption, max_uss_mem_consumption = mem_consumption.get_max_memory_consumption() + mem_consumption.clear_max_memory_consumption() + + generation_time = end - start + tok_decode_start = time.perf_counter() + generated_text = pipe_tokenizer.decode(generated_tokens) + tok_decode_end = time.perf_counter() + tok_decode_time = (tok_decode_end - tok_decode_start) * 1000 + # Only text_gen need to minus length of input_data, because generated_text may include input_text + num_tokens = 0 + result_md5_list = [] + for bs_idx in range(args['batch_size']): + generated_text_len = len(generated_tokens[bs_idx]) + num_tokens += generated_text_len + if generated_text_len > max_gen_tokens: + log.error('Output token size is over max output token size!') + result_text = generated_text[bs_idx] + if args["output_dir"] is not None: + llm_bench_utils.output_file.output_gen_text(result_text, args, model_precision, prompt_index, num, bs_idx, proc_id) + result_md5_list.append(hashlib.new("md5", result_text.encode(), usedforsecurity=False).hexdigest()) + if len(md5_list[num]) == 0: + md5_list[num] = {prompt_index : result_md5_list} + else: + md5_list[num][prompt_index] = result_md5_list + per_token_time = generation_time * 1000 / (num_tokens / args['batch_size']) + tm_list = streamer.get_time_list() + log.debug('latency of all tokens:') + [log.debug('[{}]{:.4f}'.format(idx, tm)) for idx, tm in enumerate(tm_list)] + iter_data = gen_iterate_data( + num, + input_token_size * args['batch_size'], + len(tm_list), + num_tokens, + generation_time, + per_token_time, + result_md5_list, + max_rss_mem=max_rss_mem_consumption, + max_shared_mem=max_shared_mem_consumption, + max_uss_mem=max_uss_mem_consumption, + prompt_idx=prompt_index, + tokenization_time=(tok_encode_time, tok_decode_time) + ) + iter_data_list.append(iter_data) + llm_bench_utils.metrics_print.print_metrics( + num, + iter_data, + tm_list, + [], + warm_up=(num == 0), + max_rss_mem=max_rss_mem_consumption, + max_shared_mem=max_shared_mem_consumption, + max_uss_mem=max_uss_mem_consumption, + tokenization_time=(tok_encode_time, tok_decode_time), + batch_size=args['batch_size'] + ) + if num > 0: + prev_md5 = md5_list[num - 1][prompt_index] + if result_md5_list != prev_md5: + log.warning(f"[{num}] Prompt[{prompt_index}]'s md5 {result_md5_list} " + f"is different from md5 of the {num - 1} iteration {prev_md5}") + llm_bench_utils.metrics_print.print_generated(num, warm_up=(num == 0), generated=generated_text[0]) + if num == 1: + # if the device is CPU, throw exception + if args['devices'].lower().startswith('cpu') is True: + assert (result_md5_list == prev_md5) + else: + # throw exception + assert (result_md5_list == prev_md5) + else: + llm_bench_utils.metrics_print.print_generated(num, warm_up=(num == 0), generated=generated_text[0]) + streamer.reset() + + def run_text_generation_genai(input_text, num, model, tokenizer, args, iter_data_list, md5_list, prompt_index, streamer, model_precision, proc_id): set_seed(args['seed']) input_text_list = [input_text] * args['batch_size'] @@ -341,7 +447,12 @@ def run_text_generation_benchmark(model_path, framework, device, args, num_iters f'prompt nums: {len(text_list)}, prompt idx: {prompt_idx_list}') # if num_iters == 0, just output warm-up data - text_gen_fn = run_text_generation if not use_genai else run_text_generation_genai + if not use_genai: + text_gen_fn = run_text_generation + elif bench_hook is not None: + text_gen_fn = run_text_generation_genai_with_stream + else: + text_gen_fn = run_text_generation_genai proc_id = os.getpid() if args['subsequent'] is False: for num in range(num_iters + 1): diff --git a/llm_bench/python/llm_bench_utils/ov_utils.py b/llm_bench/python/llm_bench_utils/ov_utils.py index 1f9c6b6a31..d379275880 100644 --- a/llm_bench/python/llm_bench_utils/ov_utils.py +++ b/llm_bench/python/llm_bench_utils/ov_utils.py @@ -86,7 +86,7 @@ def build_ov_tokenizer(hf_tokenizer): try: from openvino_tokenizers import convert_tokenizer except ImportError: - log.warn("OV Tokenizer is unavailable, tokenizer conversion will be skipped") + log.warning("OV Tokenizer is unavailable, tokenizer conversion will be skipped") return hf_tokenizer ov_tokenizer, ov_detokenizer = convert_tokenizer(hf_tokenizer, with_detokenizer=True) @@ -191,7 +191,36 @@ def create_genai_text_gen_model(model_path, device, ov_config, **kwargs): end = time.perf_counter() log.info(f'Pipeline initialization time: {end - start:.2f}s') - return llm_pipe, tokenizer, end - start, None, True + class TokenStreamer(openvino_genai.StreamerBase): + def __init__(self, tokenizer): + openvino_genai.StreamerBase.__init__(self) + self.tokenizer = tokenizer + self.token_generation_time = [] + self.generated_tokens = [] + self.start_time = time.perf_counter() + + def put(self, token_id): + self.token_generation_time.append(time.perf_counter() - self.start_time) + self.generated_tokens.append(token_id) + self.start_time = time.perf_counter() + return False + + def reset(self): + self.token_generation_time = [] + self.generated_tokens = [] + self.start_time = time.perf_counter() + + def end(self): + pass + + def get_tokens(self): + return self.generated_tokens + + def get_time_list(self): + return self.token_generation_time + streamer = TokenStreamer(llm_pipe.get_tokenizer()) if "NPU" in device.upper() else None + + return llm_pipe, tokenizer, end - start, streamer, True def convert_ov_tokenizer(tokenizer_path):