diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index ed9fc3a30d..706986deff 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -48,6 +48,11 @@ struct PipelineMetrics { * Running average of the KV cache usage during the lifetime of the pipeline, with max window size of 1000 steps */ float avg_cache_usage = 0.0; + + /** + * Number of tokens scheduled for processing at the previous step of the pipeline. + */ + size_t total_num_scheduled_tokens; }; class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { diff --git a/src/cpp/include/openvino/genai/generation_handle.hpp b/src/cpp/include/openvino/genai/generation_handle.hpp index 7ff172e645..953e573edd 100644 --- a/src/cpp/include/openvino/genai/generation_handle.hpp +++ b/src/cpp/include/openvino/genai/generation_handle.hpp @@ -8,6 +8,7 @@ #include "openvino/genai/generation_config.hpp" #include "openvino/genai/visibility.hpp" +#include "openvino/genai/perf_metrics.hpp" namespace ov::genai { enum class GenerationStatus { @@ -30,6 +31,9 @@ struct EncodedGenerationResult { // Status of generation GenerationStatus m_status = GenerationStatus::RUNNING; + + // PerfMetrics but with empty tokenization/detokenization durations. + PerfMetrics perf_metrics; }; enum class GenerationFinishReason { @@ -50,6 +54,9 @@ struct GenerationResult { // Status of generation GenerationStatus m_status = GenerationStatus::RUNNING; + + // PerfMetrics + PerfMetrics perf_metrics; }; struct GenerationOutput { diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 0b0065aa1f..a7c657eecb 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -77,6 +77,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { OptionalGenerationConfig generation_config, StreamerVariant streamer ) override { + // Get the currrent timestamp in order to evaluate total generate duration. + auto start_time = std::chrono::steady_clock::now(); + std::vector prompts = std::visit(overloaded{ [](const std::string& prompt) { return std::vector{prompt}; @@ -87,8 +90,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { }, inputs); const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate( - prompts, + std::vector generated = m_impl.generate(prompts, std::vector{prompts.size(), config}, streamer ); @@ -99,7 +101,34 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_replies), std::move(plain_scores)}; + PerfMetrics perf_metrics; + // For GenerationResults, all perf_metrics are the same except tokenization and detokenization durations. + // Since we return here only one perf_metrics, we should accumulate all tokenization and detokenization times. + if (generated.size() > 0) { + perf_metrics = generated[0].perf_metrics; + } + + // Tokenizations and detokenization times are dispersed across GenerationResult vector. + // Need to collect them into a single perf_metric for DecodedResult. + auto& raw_metrics = perf_metrics.raw_metrics; + for (size_t i = 1; i < generated.size(); ++i){ + auto tok_durations = generated[i].perf_metrics.raw_metrics.tokenization_durations; + auto detok_durations = generated[i].perf_metrics.raw_metrics.detokenization_durations; + for (size_t j = 0; j < tok_durations.size(); ++j) { + raw_metrics.tokenization_durations.emplace_back(tok_durations[j]); + } + for (size_t j = 0; j < detok_durations.size(); ++j) { + raw_metrics.detokenization_durations.emplace_back(detok_durations[j]); + } + } + + raw_metrics.generate_durations.clear(); + raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Need to reevaluate statistics with the updated start_time which includes tokenization/detokenization durations. + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); + + return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)}; } EncodedResults generate( @@ -107,6 +136,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { OptionalGenerationConfig generation_config, StreamerVariant streamer ) override { + // Get the currrent timestamp in order to evaluate total generate duration. + auto start_time = std::chrono::steady_clock::now(); + std::vector input_ids = std::visit(overloaded{ [](const ov::Tensor& inp) { size_t batch_size = inp.get_shape().at(0); @@ -148,7 +180,11 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate(input_ids, std::vector{input_ids.size(), config}, streamer); + std::vector generated = m_impl.generate(input_ids, + std::vector{input_ids.size(), config}, + streamer + ); + std::vector> plain_tokens; std::vector plain_scores; for (EncodedGenerationResult& res : generated) { @@ -156,7 +192,20 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_tokens), std::move(plain_scores)}; + + PerfMetrics perf_metrics; + // For EncodedGenerationResults, all perf_metrics are the same. + if (generated.size() > 0) { + perf_metrics = generated[0].perf_metrics; + } + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.generate_durations.clear(); + raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Reevaluate statistics with the new start_time which includes time for preparing inputs + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); + + return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)}; } void start_chat(const std::string& system_message) override { diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 44bfaf7f21..2ea567cd60 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -151,13 +151,13 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage); _register_step_cache_usage(scheduler_output.m_cache_usage); m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage(); + m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens; static ManualTimer copy_blocks_timer("scheduling"); copy_blocks_timer.start(); m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map); copy_blocks_timer.end(); } - // if no tokens were scheduled, we are out of memory => free all requests and return if (scheduler_output.m_total_num_scheduled_tokens == 0) { for (size_t i = 0; i < m_requests.size(); ++i) { @@ -170,7 +170,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { _free_non_running_requests(); return; } - ov::Tensor logits; { static ManualTimer timer("forward"); @@ -257,6 +256,11 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector 0) { + const auto infer_end = std::chrono::steady_clock::now(); + const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start); + raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms); + raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms); + raw_perf_counters.m_new_token_times.emplace_back(infer_end); + raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens); + } } catch (...) { drop_requests(); // remove all requests from pipeline state in case of exception throw; } - auto & generation = generations.at(0); + GenerationHandle & generation = generations.at(0); if (streamer_ptr && generation->can_read()) { std::unordered_map token = generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { @@ -358,6 +372,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorget_status(); + + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + perf_metrics.raw_metrics.generate_durations.clear(); + perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + perf_metrics.evaluate_statistics(start_time); + + result.perf_metrics = perf_metrics; results.push_back(std::move(result)); } diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index e32616b0aa..0c0d62886b 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -35,7 +35,9 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( std::vector sampling_params, const StreamerVariant& streamer) { std::vector input_ids; + auto start_time = std::chrono::steady_clock::now(); + std::vector tokenization_durations; static ManualTimer timer("tokenize"); if (m_is_chat_conversation) { OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); @@ -43,37 +45,55 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( constexpr bool add_generation_prompt = true; std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt); timer.start(); + const auto encode_start = std::chrono::steady_clock::now(); // ov::genai::add_special_tokens(false) is aligned with stateful pipeline input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids); + tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start)); timer.end(); } else { input_ids.reserve(prompts.size()); timer.start(); for (const std::string& prompt : prompts) { + const auto encode_start = std::chrono::steady_clock::now(); input_ids.push_back(m_tokenizer.encode(prompt).input_ids); + tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start)); } timer.end(); } std::vector encoded = generate(input_ids, sampling_params, streamer); - std::vector decoded; decoded.reserve(encoded.size()); - for (EncodedGenerationResult& res : encoded) { + for (size_t i = 0; i < encoded.size(); ++i) { + EncodedGenerationResult res = encoded[i]; + auto& perf_metrics = res.perf_metrics; + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.tokenization_durations.emplace_back(tokenization_durations[i]); + std::vector generated; generated.reserve(res.m_generation_ids.size()); for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + const auto decode_start = std::chrono::steady_clock::now(); generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + raw_counters.detokenization_durations.emplace_back(std::chrono::steady_clock::now() - decode_start); if (m_is_chat_conversation && 0 == idx) { m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); } } + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + perf_metrics.raw_metrics.generate_durations.clear(); + perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Reevaluate taking into accound tokenization/detokenization times. + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); + decoded.push_back(GenerationResult{ res.m_request_id, std::move(generated), std::move(res.m_scores), - res.m_status + res.m_status, + perf_metrics, }); } diff --git a/src/cpp/src/llm_pipeline_stateful.cpp b/src/cpp/src/llm_pipeline_stateful.cpp index 153fcc6fce..3cec7d74a8 100644 --- a/src/cpp/src/llm_pipeline_stateful.cpp +++ b/src/cpp/src/llm_pipeline_stateful.cpp @@ -181,7 +181,7 @@ DecodedResults StatefulLLMPipeline::generate( auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); diff --git a/src/cpp/src/llm_pipeline_static.cpp b/src/cpp/src/llm_pipeline_static.cpp index f0e6f82545..87a4965d6d 100644 --- a/src/cpp/src/llm_pipeline_static.cpp +++ b/src/cpp/src/llm_pipeline_static.cpp @@ -789,7 +789,7 @@ DecodedResults StatefulLLMPipeline::generate( decoded_results.perf_metrics = encoded_results.perf_metrics; auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); @@ -1220,7 +1220,7 @@ DecodedResults StatelessLLMPipeline::generate( decoded_results.perf_metrics = encoded_results.perf_metrics; auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); diff --git a/src/cpp/src/perf_metrics.cpp b/src/cpp/src/perf_metrics.cpp index 3725dc0cfc..c1833c8b9f 100644 --- a/src/cpp/src/perf_metrics.cpp +++ b/src/cpp/src/perf_metrics.cpp @@ -93,7 +93,7 @@ void PerfMetrics::evaluate_statistics(std::optional start_time) { return; } // If start_item is specified then recalculate durations according to start times and calculate statistics only after that. - if (start_time.has_value()) { + if (start_time.has_value() && raw_metrics.m_new_token_times.size() > 0 && raw_metrics.m_batch_sizes.size() > 0) { auto start_time_val = *start_time; auto& tok_times = raw_metrics.m_new_token_times; auto& batch_sizes = raw_metrics.m_batch_sizes; diff --git a/src/python/openvino_genai/py_openvino_genai.pyi b/src/python/openvino_genai/py_openvino_genai.pyi index 4145aa7620..03707deb6e 100644 --- a/src/python/openvino_genai/py_openvino_genai.pyi +++ b/src/python/openvino_genai/py_openvino_genai.pyi @@ -440,6 +440,8 @@ class EncodedGenerationResult: IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. """ m_generation_ids: list[list[int]] @@ -449,6 +451,9 @@ class EncodedGenerationResult: @property def m_request_id(self) -> int: ... + @property + def perf_metrics(self) -> PerfMetrics: + ... class EncodedResults: """ @@ -693,6 +698,8 @@ class GenerationResult: IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. """ m_generation_ids: list[str] @@ -707,6 +714,9 @@ class GenerationResult: @property def m_request_id(self) -> int: ... + @property + def perf_metrics(self) -> PerfMetrics: + ... class GenerationStatus: """ Members: @@ -1211,6 +1221,9 @@ class PipelineMetrics: :param avg_cache_usage: Running average of the KV cache usage (in %) during the lifetime of the pipeline, with max window size of 1000 steps :type avg_cache_usage: float + + :param total_num_scheduled_tokens: Number of tokens scheduled for processing at the previous step of the pipeline. + :type total_num_scheduled_tokens: int """ def __init__(self) -> None: ... @@ -1229,6 +1242,9 @@ class PipelineMetrics: @property def scheduled_requests(self) -> int: ... + @property + def total_num_scheduled_tokens(self) -> int: + ... class RawPerfMetrics: """ diff --git a/src/python/py_continuous_batching_pipeline.cpp b/src/python/py_continuous_batching_pipeline.cpp index 975100cb11..b2fb81fb5b 100644 --- a/src/python/py_continuous_batching_pipeline.cpp +++ b/src/python/py_continuous_batching_pipeline.cpp @@ -80,6 +80,8 @@ auto generation_result_docstring = R"( IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. )"; @@ -102,6 +104,9 @@ auto pipeline_metrics_docstring = R"( :param avg_cache_usage: Running average of the KV cache usage (in %) during the lifetime of the pipeline, with max window size of 1000 steps :type avg_cache_usage: float + + :param total_num_scheduled_tokens: Number of tokens scheduled for processing at the previous step of the pipeline. + :type total_num_scheduled_tokens: int )"; std::ostream& operator << (std::ostream& stream, const GenerationResult& generation_result) { @@ -138,6 +143,7 @@ void init_continuous_batching_pipeline(py::module_& m) { }) .def_readwrite("m_scores", &GenerationResult::m_scores) .def_readwrite("m_status", &GenerationResult::m_status) + .def_readonly("perf_metrics", &GenerationResult::perf_metrics) .def("__repr__", [](const GenerationResult &r) -> py::str { std::stringstream stream; @@ -149,12 +155,13 @@ void init_continuous_batching_pipeline(py::module_& m) { [](GenerationResult &r) -> py::typing::List { return pyutils::handle_utf8(r.m_generation_ids); }); - + py::class_(m, "EncodedGenerationResult", generation_result_docstring) .def(py::init<>()) .def_readonly("m_request_id", &EncodedGenerationResult::m_request_id) .def_readwrite("m_generation_ids", &EncodedGenerationResult::m_generation_ids) - .def_readwrite("m_scores", &EncodedGenerationResult::m_scores); + .def_readwrite("m_scores", &EncodedGenerationResult::m_scores) + .def_readonly("perf_metrics", &EncodedGenerationResult::perf_metrics); py::enum_(m, "GenerationFinishReason") .value("NONE", ov::genai::GenerationFinishReason::NONE) @@ -210,7 +217,8 @@ void init_continuous_batching_pipeline(py::module_& m) { .def_readonly("scheduled_requests", &PipelineMetrics::scheduled_requests) .def_readonly("cache_usage", &PipelineMetrics::cache_usage) .def_readonly("avg_cache_usage", &PipelineMetrics::avg_cache_usage) - .def_readonly("max_cache_usage", &PipelineMetrics::max_cache_usage); + .def_readonly("max_cache_usage", &PipelineMetrics::max_cache_usage) + .def_readonly("total_num_scheduled_tokens", &PipelineMetrics::total_num_scheduled_tokens); py::class_(m, "ContinuousBatchingPipeline", "This class is used for generation with LLMs with continuous batchig") .def(py::init([](const std::filesystem::path& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const std::map& llm_plugin_config, const std::map& tokenizer_plugin_config) {