From fc25c8ed897970378e540c62e1cc6fced4ba37bd Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 10 Jan 2025 13:03:07 +0100 Subject: [PATCH 01/13] Add ContinuousBatching PerfMetrics --- .../genai/continuous_batching_pipeline.hpp | 1 + src/cpp/src/continuous_batching_adapter.hpp | 9 ++++--- src/cpp/src/continuous_batching_impl.cpp | 27 +++++++++++++------ src/cpp/src/continuous_batching_impl.hpp | 4 +-- src/cpp/src/continuous_batching_pipeline.cpp | 4 +-- src/cpp/src/icontinuous_batching.cpp | 19 ++++++++++--- src/cpp/src/icontinuous_batching.hpp | 6 ++--- .../src/prompt_lookup/prompt_lookup_impl.cpp | 11 +++++--- .../src/prompt_lookup/prompt_lookup_impl.hpp | 4 +-- .../speculative_decoding_impl.cpp | 11 +++++--- .../speculative_decoding_impl.hpp | 4 +-- 11 files changed, 67 insertions(+), 33 deletions(-) diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index ed9fc3a30d..c68479bb21 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -64,6 +64,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { friend class ContinuousBatchingForPromptLookupImpl; friend class SpeculativeDecodingImpl; friend class PromptLookupImpl; + friend class ContinuousBatchingAdapter; std::shared_ptr m_impl; diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 0b0065aa1f..4e1636908d 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -4,6 +4,7 @@ #include "llm_pipeline_base.hpp" +#include "icontinuous_batching.hpp" #include "openvino/genai/continuous_batching_pipeline.hpp" namespace ov::genai { @@ -87,7 +88,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { }, inputs); const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate( + auto [generated, perf_metrics] = m_impl.m_impl->generate( prompts, std::vector{prompts.size(), config}, streamer @@ -99,7 +100,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_replies), std::move(plain_scores)}; + return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)}; } EncodedResults generate( @@ -148,7 +149,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate(input_ids, std::vector{input_ids.size(), config}, streamer); + auto [generated, perf_metrics] = m_impl.m_impl->generate(input_ids, std::vector{input_ids.size(), config}, streamer); std::vector> plain_tokens; std::vector plain_scores; for (EncodedGenerationResult& res : generated) { @@ -156,7 +157,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_tokens), std::move(plain_scores)}; + return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)}; } void start_chat(const std::string& system_message) override { diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 44bfaf7f21..05e9d3a9eb 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -133,7 +133,7 @@ bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_reques return !m_awaiting_requests.empty() || !m_requests.empty(); } -void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { +size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { static ManualTimer step_timer("step()"); step_timer.start(); @@ -157,7 +157,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map); copy_blocks_timer.end(); } - // if no tokens were scheduled, we are out of memory => free all requests and return if (scheduler_output.m_total_num_scheduled_tokens == 0) { for (size_t i = 0; i < m_requests.size(); ++i) { @@ -168,9 +167,10 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } } _free_non_running_requests(); - return; + return 0; } - + size_t num_generated_tokens = scheduler_output.m_total_num_scheduled_tokens; + ov::Tensor logits; { static ManualTimer timer("forward"); @@ -243,6 +243,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } step_timer.end(); + return num_generated_tokens; } void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std::optional& adapters) { @@ -251,13 +252,16 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std: } } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + PerfMetrics perf_metrics; + auto& raw_perf_counters = perf_metrics.raw_metrics; + // checks that all requests has the same LoRA adapters property value for (size_t i = 1; i < sampling_params.size(); ++i) { OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters, @@ -303,13 +307,20 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorcan_read()) { std::unordered_map token = generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { @@ -362,7 +373,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index c1c0677ff3..b68f7cad05 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -140,11 +140,11 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() { } std::vector ContinuousBatchingPipeline::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(input_ids, sampling_params, streamer); + return m_impl->generate(input_ids, sampling_params, streamer).first; } std::vector ContinuousBatchingPipeline::generate(const std::vector& prompts, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(prompts, sampling_params, streamer); + return m_impl->generate(prompts, sampling_params, streamer).first; } void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index e32616b0aa..5fb626a453 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -29,12 +29,13 @@ void ContinuousBatchingPipeline::IContinuousBatchingPipeline::finish_chat() { m_history.clear(); }; -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer) { std::vector input_ids; + auto start_time = std::chrono::steady_clock::now(); static ManualTimer timer("tokenize"); if (m_is_chat_conversation) { @@ -55,9 +56,10 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( timer.end(); } - std::vector encoded = generate(input_ids, sampling_params, streamer); - + // std::vector encoded = generate(input_ids, sampling_params, streamer); + auto [encoded, perf_metrics] = generate(input_ids, sampling_params, streamer); std::vector decoded; + auto decode_start_time = std::chrono::steady_clock::now(); decoded.reserve(encoded.size()); for (EncodedGenerationResult& res : encoded) { std::vector generated; @@ -76,7 +78,16 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( res.m_status }); } + auto stop_time = std::chrono::steady_clock::now(); + + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); + raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time)); + raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time)); + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); - return decoded; + return {decoded, perf_metrics}; } } diff --git a/src/cpp/src/icontinuous_batching.hpp b/src/cpp/src/icontinuous_batching.hpp index 12030f06f7..d59384765b 100644 --- a/src/cpp/src/icontinuous_batching.hpp +++ b/src/cpp/src/icontinuous_batching.hpp @@ -70,12 +70,12 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs a single inference step of all running (and pulls awaiting) requests */ - virtual void step() = 0; + virtual size_t step() = 0; /** * Performs monolitic generation based on encoded prompts */ - virtual std::vector + virtual std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) = 0; @@ -83,7 +83,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs monolitic generation based on text prompts */ - std::vector + std::pair, PerfMetrics> generate(const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer); diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index 7a893a2603..57794fd51e 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -28,7 +28,7 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() { return m_pipeline->has_non_finished_requests(); } -void ContinuousBatchingPipeline::PromptLookupImpl::step() { +size_t ContinuousBatchingPipeline::PromptLookupImpl::step() { ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()"); candidates_timer.start(); m_pipeline->generate_candidates(); @@ -67,9 +67,12 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } + + // TODO: add valid number of output tokens + return 0; } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -110,6 +113,8 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector results; results.reserve(input_ids.size()); + PerfMetrics perf_metrics; + // TODO: add collecting statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -158,7 +163,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 526c5df2d4..144712f6f5 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -128,7 +128,7 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) { } } -void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { +size_t ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { // this blocks adding new requests during step as it may break coherence between main and draft models std::lock_guard lock{m_draft_generations_mutex}; m_draft_pipeline->pull_awaiting_requests(true); @@ -186,9 +186,12 @@ void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } + + // TODO: return valid number of generated tokens + return 0; } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -236,6 +239,8 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< std::vector results; results.reserve(input_ids.size()); + PerfMetrics perf_metrics; + // TODO: add collecting perf statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -280,7 +285,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< OPENVINO_ASSERT(results.size() == input_ids.size()); generate_timer.end(); - return results; + return {results, perf_metrics}; } SpeculativeDecodingMetrics diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index 2f8067cbab..dfc3e76545 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -54,9 +54,9 @@ class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBat bool has_non_finished_requests() override; - void step() override; + size_t step() override; - std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; From aa8330482037018a7efa67ba3829033a3bbf503b Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 12:09:22 +0100 Subject: [PATCH 02/13] improve solution; make it clearer --- .../genai/continuous_batching_pipeline.hpp | 6 +- .../openvino/genai/generation_handle.hpp | 7 +++ src/cpp/src/continuous_batching_adapter.hpp | 55 ++++++++++++++++++- src/cpp/src/continuous_batching_impl.cpp | 17 +++--- src/cpp/src/continuous_batching_impl.hpp | 4 +- src/cpp/src/continuous_batching_pipeline.cpp | 4 +- src/cpp/src/icontinuous_batching.cpp | 42 +++++++++----- src/cpp/src/icontinuous_batching.hpp | 6 +- .../src/prompt_lookup/prompt_lookup_impl.cpp | 11 +--- .../src/prompt_lookup/prompt_lookup_impl.hpp | 4 +- .../speculative_decoding_impl.cpp | 11 +--- .../speculative_decoding_impl.hpp | 4 +- 12 files changed, 119 insertions(+), 52 deletions(-) diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index c68479bb21..706986deff 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -48,6 +48,11 @@ struct PipelineMetrics { * Running average of the KV cache usage during the lifetime of the pipeline, with max window size of 1000 steps */ float avg_cache_usage = 0.0; + + /** + * Number of tokens scheduled for processing at the previous step of the pipeline. + */ + size_t total_num_scheduled_tokens; }; class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { @@ -64,7 +69,6 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { friend class ContinuousBatchingForPromptLookupImpl; friend class SpeculativeDecodingImpl; friend class PromptLookupImpl; - friend class ContinuousBatchingAdapter; std::shared_ptr m_impl; diff --git a/src/cpp/include/openvino/genai/generation_handle.hpp b/src/cpp/include/openvino/genai/generation_handle.hpp index 7ff172e645..953e573edd 100644 --- a/src/cpp/include/openvino/genai/generation_handle.hpp +++ b/src/cpp/include/openvino/genai/generation_handle.hpp @@ -8,6 +8,7 @@ #include "openvino/genai/generation_config.hpp" #include "openvino/genai/visibility.hpp" +#include "openvino/genai/perf_metrics.hpp" namespace ov::genai { enum class GenerationStatus { @@ -30,6 +31,9 @@ struct EncodedGenerationResult { // Status of generation GenerationStatus m_status = GenerationStatus::RUNNING; + + // PerfMetrics but with empty tokenization/detokenization durations. + PerfMetrics perf_metrics; }; enum class GenerationFinishReason { @@ -50,6 +54,9 @@ struct GenerationResult { // Status of generation GenerationStatus m_status = GenerationStatus::RUNNING; + + // PerfMetrics + PerfMetrics perf_metrics; }; struct GenerationOutput { diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 4e1636908d..25e89a4b49 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -78,6 +78,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { OptionalGenerationConfig generation_config, StreamerVariant streamer ) override { + // Get the currrent timestamp in order to evaluate total generate duration. + auto start_time = std::chrono::steady_clock::now(); + std::vector prompts = std::visit(overloaded{ [](const std::string& prompt) { return std::vector{prompt}; @@ -88,8 +91,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { }, inputs); const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - auto [generated, perf_metrics] = m_impl.m_impl->generate( - prompts, + std::vector generated = m_impl.generate(prompts, std::vector{prompts.size(), config}, streamer ); @@ -100,6 +102,33 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } + PerfMetrics perf_metrics; + // For GenerationResults, all perf_metrics are the same except tokenization and detokenization durations. + // Since we return here only one perf_metrics, we should accumulate all tokenization and detokenization times. + if (generated.size() > 0) { + perf_metrics = generated[0].perf_metrics; + } + + // Tokenizations and detokenization times are dispersed across GenerationResult vector. + // Need to collect them into a single perf_metric for DecodedResult. + auto& raw_metrics = perf_metrics.raw_metrics; + for (size_t i = 1; i < generated.size(); ++i){ + auto tok_durations = generated[i].perf_metrics.raw_metrics.tokenization_durations; + auto detok_durations = generated[i].perf_metrics.raw_metrics.detokenization_durations; + for (size_t j = 0; j < tok_durations.size(); ++j) { + raw_metrics.tokenization_durations.emplace_back(tok_durations[j]); + } + for (size_t j = 0; j < detok_durations.size(); ++j) { + raw_metrics.detokenization_durations.emplace_back(detok_durations[j]); + } + } + + raw_metrics.generate_durations = std::vector(); + raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Updated generate duration, need to reevaluate statistics. + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); + return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)}; } @@ -108,6 +137,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { OptionalGenerationConfig generation_config, StreamerVariant streamer ) override { + // Get the currrent timestamp in order to evaluate total generate duration. + auto start_time = std::chrono::steady_clock::now(); + std::vector input_ids = std::visit(overloaded{ [](const ov::Tensor& inp) { size_t batch_size = inp.get_shape().at(0); @@ -149,7 +181,11 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - auto [generated, perf_metrics] = m_impl.m_impl->generate(input_ids, std::vector{input_ids.size(), config}, streamer); + std::vector generated = m_impl.generate(input_ids, + std::vector{input_ids.size(), config}, + streamer + ); + std::vector> plain_tokens; std::vector plain_scores; for (EncodedGenerationResult& res : generated) { @@ -157,6 +193,19 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } + + PerfMetrics perf_metrics; + // For EncodedGenerationResults, all perf_metrics are the same. + if (generated.size() > 0) { + perf_metrics = generated[0].perf_metrics; + } + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Updated generate duration, need to reevaluate statistics. + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); + return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)}; } diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 05e9d3a9eb..741d29bdec 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -133,7 +133,7 @@ bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_reques return !m_awaiting_requests.empty() || !m_requests.empty(); } -size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { +void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { static ManualTimer step_timer("step()"); step_timer.start(); @@ -167,10 +167,8 @@ size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } } _free_non_running_requests(); - return 0; } - size_t num_generated_tokens = scheduler_output.m_total_num_scheduled_tokens; - + m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens; ov::Tensor logits; { static ManualTimer timer("forward"); @@ -243,7 +241,6 @@ size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } step_timer.end(); - return num_generated_tokens; } void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std::optional& adapters) { @@ -252,7 +249,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std: } } -std::pair, PerfMetrics> +std::vector ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -308,7 +305,8 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorget_status(); + + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + result.perf_metrics = perf_metrics; results.push_back(std::move(result)); } OPENVINO_ASSERT(results.size() == input_ids.size()); - return {results, perf_metrics}; + return results; } void ContinuousBatchingPipeline::ContinuousBatchingImpl::_free_non_running_requests() { diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index 95eb7cccb8..d319147f2c 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -94,9 +94,9 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc bool has_non_finished_requests() override; - size_t step() override; + void step() override; - std::pair, PerfMetrics> + std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index b68f7cad05..c1c0677ff3 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -140,11 +140,11 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() { } std::vector ContinuousBatchingPipeline::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(input_ids, sampling_params, streamer).first; + return m_impl->generate(input_ids, sampling_params, streamer); } std::vector ContinuousBatchingPipeline::generate(const std::vector& prompts, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(prompts, sampling_params, streamer).first; + return m_impl->generate(prompts, sampling_params, streamer); } void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index 5fb626a453..83562bd567 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -29,7 +29,7 @@ void ContinuousBatchingPipeline::IContinuousBatchingPipeline::finish_chat() { m_history.clear(); }; -std::pair, PerfMetrics> +std::vector ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const std::vector& prompts, std::vector sampling_params, @@ -37,6 +37,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( std::vector input_ids; auto start_time = std::chrono::steady_clock::now(); + std::vector tokenization_durations; static ManualTimer timer("tokenize"); if (m_is_chat_conversation) { OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); @@ -44,50 +45,65 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( constexpr bool add_generation_prompt = true; std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt); timer.start(); + const auto encode_start = std::chrono::steady_clock::now(); // ov::genai::add_special_tokens(false) is aligned with stateful pipeline input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids); + tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start)); timer.end(); } else { input_ids.reserve(prompts.size()); timer.start(); for (const std::string& prompt : prompts) { + const auto encode_start = std::chrono::steady_clock::now(); input_ids.push_back(m_tokenizer.encode(prompt).input_ids); + tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start)); } timer.end(); } - // std::vector encoded = generate(input_ids, sampling_params, streamer); - auto [encoded, perf_metrics] = generate(input_ids, sampling_params, streamer); + std::vector encoded = generate(input_ids, sampling_params, streamer); + + // auto& raw_counters = perf_metrics.raw_metrics; + // For all encoded results perf_metrics are the same except for tokenization time. + // raw_counters.generate_durations = std::vector(); + // raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); + // raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time)); + // raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time)); + // perf_metrics.m_evaluated = false; + // perf_metrics.evaluate_statistics(start_time); + std::vector decoded; auto decode_start_time = std::chrono::steady_clock::now(); decoded.reserve(encoded.size()); - for (EncodedGenerationResult& res : encoded) { + for (size_t i = 0; i < encoded.size(); ++i) { + EncodedGenerationResult res = encoded[i]; + auto& perf_metrics = res.perf_metrics; + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.tokenization_durations.emplace_back(tokenization_durations[i]); + std::vector generated; generated.reserve(res.m_generation_ids.size()); for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + const auto decode_start = std::chrono::steady_clock::now(); generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + raw_counters.detokenization_durations.emplace_back(std::chrono::steady_clock::now() - decode_start); if (m_is_chat_conversation && 0 == idx) { m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); } } + decoded.push_back(GenerationResult{ res.m_request_id, std::move(generated), std::move(res.m_scores), - res.m_status + res.m_status, + perf_metrics, }); } auto stop_time = std::chrono::steady_clock::now(); - auto& raw_counters = perf_metrics.raw_metrics; - raw_counters.generate_durations = std::vector(); - raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); - raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time)); - raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time)); - perf_metrics.m_evaluated = false; - perf_metrics.evaluate_statistics(start_time); - return {decoded, perf_metrics}; + return decoded; } } diff --git a/src/cpp/src/icontinuous_batching.hpp b/src/cpp/src/icontinuous_batching.hpp index d59384765b..12030f06f7 100644 --- a/src/cpp/src/icontinuous_batching.hpp +++ b/src/cpp/src/icontinuous_batching.hpp @@ -70,12 +70,12 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs a single inference step of all running (and pulls awaiting) requests */ - virtual size_t step() = 0; + virtual void step() = 0; /** * Performs monolitic generation based on encoded prompts */ - virtual std::pair, PerfMetrics> + virtual std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) = 0; @@ -83,7 +83,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs monolitic generation based on text prompts */ - std::pair, PerfMetrics> + std::vector generate(const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer); diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index 57794fd51e..7a893a2603 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -28,7 +28,7 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() { return m_pipeline->has_non_finished_requests(); } -size_t ContinuousBatchingPipeline::PromptLookupImpl::step() { +void ContinuousBatchingPipeline::PromptLookupImpl::step() { ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()"); candidates_timer.start(); m_pipeline->generate_candidates(); @@ -67,12 +67,9 @@ size_t ContinuousBatchingPipeline::PromptLookupImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } - - // TODO: add valid number of output tokens - return 0; } -std::pair, PerfMetrics> +std::vector ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -113,8 +110,6 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector results; results.reserve(input_ids.size()); - PerfMetrics perf_metrics; - // TODO: add collecting statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -163,7 +158,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector, PerfMetrics> + std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 144712f6f5..526c5df2d4 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -128,7 +128,7 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) { } } -size_t ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { +void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { // this blocks adding new requests during step as it may break coherence between main and draft models std::lock_guard lock{m_draft_generations_mutex}; m_draft_pipeline->pull_awaiting_requests(true); @@ -186,12 +186,9 @@ size_t ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } - - // TODO: return valid number of generated tokens - return 0; } -std::pair, PerfMetrics> +std::vector ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -239,8 +236,6 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< std::vector results; results.reserve(input_ids.size()); - PerfMetrics perf_metrics; - // TODO: add collecting perf statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -285,7 +280,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< OPENVINO_ASSERT(results.size() == input_ids.size()); generate_timer.end(); - return {results, perf_metrics}; + return results; } SpeculativeDecodingMetrics diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index dfc3e76545..2f8067cbab 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -54,9 +54,9 @@ class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBat bool has_non_finished_requests() override; - size_t step() override; + void step() override; - std::pair, PerfMetrics> + std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; From 447bb0e099ed7865353c5848ab9d81c3748f5bcb Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 12:17:35 +0100 Subject: [PATCH 03/13] calcualte inference durations --- src/cpp/src/continuous_batching_impl.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 741d29bdec..da3efadc01 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -258,6 +258,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector Date: Mon, 13 Jan 2025 13:49:20 +0100 Subject: [PATCH 04/13] corrections --- src/cpp/src/continuous_batching_adapter.hpp | 2 +- src/cpp/src/continuous_batching_impl.cpp | 10 ++++++++-- src/cpp/src/icontinuous_batching.cpp | 22 ++++++++------------- src/cpp/src/llm_pipeline_stateful.cpp | 2 +- src/cpp/src/llm_pipeline_static.cpp | 4 ++-- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 25e89a4b49..e9b671942e 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -200,7 +200,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { perf_metrics = generated[0].perf_metrics; } auto& raw_counters = perf_metrics.raw_metrics; - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); // Updated generate duration, need to reevaluate statistics. perf_metrics.m_evaluated = false; diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index da3efadc01..b1075d051f 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -151,6 +151,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage); _register_step_cache_usage(scheduler_output.m_cache_usage); m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage(); + m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens; static ManualTimer copy_blocks_timer("scheduling"); copy_blocks_timer.start(); @@ -167,8 +168,8 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } } _free_non_running_requests(); + return; } - m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens; ov::Tensor logits; { static ManualTimer timer("forward"); @@ -255,7 +256,8 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorget_status(); // The same perf metrics for each sequence, only tokenization/detokenization will differ. + perf_metrics.raw_metrics.generate_durations.clear(); + perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + perf_metrics.evaluate_statistics(start_time); + result.perf_metrics = perf_metrics; results.push_back(std::move(result)); } diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index 83562bd567..39a026aec9 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -35,7 +35,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( std::vector sampling_params, const StreamerVariant& streamer) { std::vector input_ids; - auto start_time = std::chrono::steady_clock::now(); + auto start_time = std::chrono::steady_clock::now(); std::vector tokenization_durations; static ManualTimer timer("tokenize"); @@ -62,18 +62,7 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( } std::vector encoded = generate(input_ids, sampling_params, streamer); - - // auto& raw_counters = perf_metrics.raw_metrics; - // For all encoded results perf_metrics are the same except for tokenization time. - // raw_counters.generate_durations = std::vector(); - // raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); - // raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time)); - // raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time)); - // perf_metrics.m_evaluated = false; - // perf_metrics.evaluate_statistics(start_time); - std::vector decoded; - auto decode_start_time = std::chrono::steady_clock::now(); decoded.reserve(encoded.size()); for (size_t i = 0; i < encoded.size(); ++i) { EncodedGenerationResult res = encoded[i]; @@ -92,6 +81,13 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( } } + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + // The same perf metrics for each sequence, only tokenization/detokenization will differ. + perf_metrics.raw_metrics.generate_durations.clear(); + perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); + // Reevaluate taking into accound tokenization/detokenization times. + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); decoded.push_back(GenerationResult{ res.m_request_id, @@ -101,8 +97,6 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( perf_metrics, }); } - auto stop_time = std::chrono::steady_clock::now(); - return decoded; } diff --git a/src/cpp/src/llm_pipeline_stateful.cpp b/src/cpp/src/llm_pipeline_stateful.cpp index 153fcc6fce..3cec7d74a8 100644 --- a/src/cpp/src/llm_pipeline_stateful.cpp +++ b/src/cpp/src/llm_pipeline_stateful.cpp @@ -181,7 +181,7 @@ DecodedResults StatefulLLMPipeline::generate( auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); diff --git a/src/cpp/src/llm_pipeline_static.cpp b/src/cpp/src/llm_pipeline_static.cpp index c98b571179..ef144f673e 100644 --- a/src/cpp/src/llm_pipeline_static.cpp +++ b/src/cpp/src/llm_pipeline_static.cpp @@ -790,7 +790,7 @@ DecodedResults StatefulLLMPipeline::generate( decoded_results.perf_metrics = encoded_results.perf_metrics; auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); @@ -1221,7 +1221,7 @@ DecodedResults StatelessLLMPipeline::generate( decoded_results.perf_metrics = encoded_results.perf_metrics; auto& raw_counters = decoded_results.perf_metrics.raw_metrics; auto stop_time = std::chrono::steady_clock::now(); - raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time)); raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time)); From faef43dc12080c99f3888c8a5bb79edced4259d9 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 13:51:23 +0100 Subject: [PATCH 05/13] Apply suggestions from code review Co-authored-by: Ilya Lavrenov --- src/cpp/src/continuous_batching_adapter.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index e9b671942e..5100f4a81b 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -4,7 +4,6 @@ #include "llm_pipeline_base.hpp" -#include "icontinuous_batching.hpp" #include "openvino/genai/continuous_batching_pipeline.hpp" namespace ov::genai { @@ -123,9 +122,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { } } - raw_metrics.generate_durations = std::vector(); + raw_metrics.generate_durations.clear(); raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); - // Updated generate duration, need to reevaluate statistics. + // Need to reevaluate statistics with the updated start_time which includes tokenization/detokenization durations. perf_metrics.m_evaluated = false; perf_metrics.evaluate_statistics(start_time); From 9ab27b10e48e2361bbe5bc91a69b7d6b5f9669e5 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 13:57:13 +0100 Subject: [PATCH 06/13] Apply suggestions from code review --- src/cpp/src/continuous_batching_adapter.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 5100f4a81b..a7c657eecb 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -201,7 +201,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { auto& raw_counters = perf_metrics.raw_metrics; raw_counters.generate_durations.clear(); raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); - // Updated generate duration, need to reevaluate statistics. + // Reevaluate statistics with the new start_time which includes time for preparing inputs perf_metrics.m_evaluated = false; perf_metrics.evaluate_statistics(start_time); From b159e6412f5749bb8a708130afab20ce13724c27 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 14:29:59 +0100 Subject: [PATCH 07/13] add PyAPI --- src/python/py_continuous_batching_pipeline.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/python/py_continuous_batching_pipeline.cpp b/src/python/py_continuous_batching_pipeline.cpp index 975100cb11..6fac725bda 100644 --- a/src/python/py_continuous_batching_pipeline.cpp +++ b/src/python/py_continuous_batching_pipeline.cpp @@ -80,6 +80,8 @@ auto generation_result_docstring = R"( IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. )"; @@ -102,6 +104,9 @@ auto pipeline_metrics_docstring = R"( :param avg_cache_usage: Running average of the KV cache usage (in %) during the lifetime of the pipeline, with max window size of 1000 steps :type avg_cache_usage: float + + :param total_num_scheduled_tokens: Number of tokens scheduled for processing at the previous step of the pipeline. + :type total_num_scheduled_tokens: int )"; std::ostream& operator << (std::ostream& stream, const GenerationResult& generation_result) { @@ -138,6 +143,7 @@ void init_continuous_batching_pipeline(py::module_& m) { }) .def_readwrite("m_scores", &GenerationResult::m_scores) .def_readwrite("m_status", &GenerationResult::m_status) + .def_readwrite("perf_metrics", &GenerationResult::perf_metrics) .def("__repr__", [](const GenerationResult &r) -> py::str { std::stringstream stream; @@ -149,12 +155,13 @@ void init_continuous_batching_pipeline(py::module_& m) { [](GenerationResult &r) -> py::typing::List { return pyutils::handle_utf8(r.m_generation_ids); }); - + py::class_(m, "EncodedGenerationResult", generation_result_docstring) .def(py::init<>()) .def_readonly("m_request_id", &EncodedGenerationResult::m_request_id) .def_readwrite("m_generation_ids", &EncodedGenerationResult::m_generation_ids) - .def_readwrite("m_scores", &EncodedGenerationResult::m_scores); + .def_readwrite("m_scores", &EncodedGenerationResult::m_scores) + .def_readwrite("perf_metrics", &EncodedGenerationResult::perf_metrics); py::enum_(m, "GenerationFinishReason") .value("NONE", ov::genai::GenerationFinishReason::NONE) @@ -210,7 +217,8 @@ void init_continuous_batching_pipeline(py::module_& m) { .def_readonly("scheduled_requests", &PipelineMetrics::scheduled_requests) .def_readonly("cache_usage", &PipelineMetrics::cache_usage) .def_readonly("avg_cache_usage", &PipelineMetrics::avg_cache_usage) - .def_readonly("max_cache_usage", &PipelineMetrics::max_cache_usage); + .def_readonly("max_cache_usage", &PipelineMetrics::max_cache_usage) + .def_readonly("total_num_scheduled_tokens", &PipelineMetrics::total_num_scheduled_tokens); py::class_(m, "ContinuousBatchingPipeline", "This class is used for generation with LLMs with continuous batchig") .def(py::init([](const std::filesystem::path& models_path, const SchedulerConfig& scheduler_config, const std::string& device, const std::map& llm_plugin_config, const std::map& tokenizer_plugin_config) { From 1f6540a0eadda8252330ef985a356a5bba073843 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 14:18:15 +0100 Subject: [PATCH 08/13] Update src/cpp/src/icontinuous_batching.cpp Co-authored-by: Ilya Lavrenov --- src/cpp/src/icontinuous_batching.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index 39a026aec9..0c0d62886b 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -81,7 +81,6 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( } } - // The same perf metrics for each sequence, only tokenization/detokenization will differ. // The same perf metrics for each sequence, only tokenization/detokenization will differ. perf_metrics.raw_metrics.generate_durations.clear(); perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time)); From 44eb13ffb1f5b2bb4fdff4a8ca38d2e665fc1dba Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 14:25:25 +0100 Subject: [PATCH 09/13] Apply suggestions from code review Co-authored-by: Ilya Lavrenov --- src/python/py_continuous_batching_pipeline.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/py_continuous_batching_pipeline.cpp b/src/python/py_continuous_batching_pipeline.cpp index 6fac725bda..b2fb81fb5b 100644 --- a/src/python/py_continuous_batching_pipeline.cpp +++ b/src/python/py_continuous_batching_pipeline.cpp @@ -143,7 +143,7 @@ void init_continuous_batching_pipeline(py::module_& m) { }) .def_readwrite("m_scores", &GenerationResult::m_scores) .def_readwrite("m_status", &GenerationResult::m_status) - .def_readwrite("perf_metrics", &GenerationResult::perf_metrics) + .def_readonly("perf_metrics", &GenerationResult::perf_metrics) .def("__repr__", [](const GenerationResult &r) -> py::str { std::stringstream stream; @@ -161,7 +161,7 @@ void init_continuous_batching_pipeline(py::module_& m) { .def_readonly("m_request_id", &EncodedGenerationResult::m_request_id) .def_readwrite("m_generation_ids", &EncodedGenerationResult::m_generation_ids) .def_readwrite("m_scores", &EncodedGenerationResult::m_scores) - .def_readwrite("perf_metrics", &EncodedGenerationResult::perf_metrics); + .def_readonly("perf_metrics", &EncodedGenerationResult::perf_metrics); py::enum_(m, "GenerationFinishReason") .value("NONE", ov::genai::GenerationFinishReason::NONE) From ac96880b695a263a94bb90f0ce9b9f4766b92288 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 14:44:16 +0100 Subject: [PATCH 10/13] updated .pyi signatures with stubgen --- src/python/openvino_genai/py_openvino_genai.pyi | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/python/openvino_genai/py_openvino_genai.pyi b/src/python/openvino_genai/py_openvino_genai.pyi index 5adde32db4..15e3d5c71b 100644 --- a/src/python/openvino_genai/py_openvino_genai.pyi +++ b/src/python/openvino_genai/py_openvino_genai.pyi @@ -440,10 +440,13 @@ class EncodedGenerationResult: IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. """ m_generation_ids: list[list[int]] m_scores: list[float] + perf_metrics: PerfMetrics def __init__(self) -> None: ... @property @@ -693,11 +696,14 @@ class GenerationResult: IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued. DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality. DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped. + perf_metrics: + Performance metrics for each generation result. """ m_generation_ids: list[str] m_scores: list[float] m_status: GenerationStatus + perf_metrics: PerfMetrics def __init__(self) -> None: ... def __repr__(self) -> str: @@ -1211,6 +1217,9 @@ class PipelineMetrics: :param avg_cache_usage: Running average of the KV cache usage (in %) during the lifetime of the pipeline, with max window size of 1000 steps :type avg_cache_usage: float + + :param total_num_scheduled_tokens: Number of tokens scheduled for processing at the previous step of the pipeline. + :type total_num_scheduled_tokens: int """ def __init__(self) -> None: ... @@ -1229,6 +1238,9 @@ class PipelineMetrics: @property def scheduled_requests(self) -> int: ... + @property + def total_num_scheduled_tokens(self) -> int: + ... class RawPerfMetrics: """ From 24508d151034cc5a918a0a39d083305a1427b84e Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 15:49:27 +0100 Subject: [PATCH 11/13] update .pyi --- src/python/openvino_genai/py_openvino_genai.pyi | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/python/openvino_genai/py_openvino_genai.pyi b/src/python/openvino_genai/py_openvino_genai.pyi index 15e3d5c71b..49348d0bb1 100644 --- a/src/python/openvino_genai/py_openvino_genai.pyi +++ b/src/python/openvino_genai/py_openvino_genai.pyi @@ -446,12 +446,14 @@ class EncodedGenerationResult: """ m_generation_ids: list[list[int]] m_scores: list[float] - perf_metrics: PerfMetrics def __init__(self) -> None: ... @property def m_request_id(self) -> int: ... + @property + def perf_metrics(self) -> PerfMetrics: + ... class EncodedResults: """ @@ -703,7 +705,6 @@ class GenerationResult: m_generation_ids: list[str] m_scores: list[float] m_status: GenerationStatus - perf_metrics: PerfMetrics def __init__(self) -> None: ... def __repr__(self) -> str: @@ -713,6 +714,9 @@ class GenerationResult: @property def m_request_id(self) -> int: ... + @property + def perf_metrics(self) -> PerfMetrics: + ... class GenerationStatus: """ Members: From d62d5f20929b780242cf5825ec1a097b274eede1 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 20:06:58 +0100 Subject: [PATCH 12/13] fix division by zero --- src/cpp/src/continuous_batching_impl.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index b1075d051f..2ea567cd60 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -310,12 +310,14 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector 0) { + const auto infer_end = std::chrono::steady_clock::now(); + const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start); + raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms); + raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms); + raw_perf_counters.m_new_token_times.emplace_back(infer_end); + raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens); + } } catch (...) { drop_requests(); // remove all requests from pipeline state in case of exception throw; From 516c6949b9af79498570e8db552312ec14962264 Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Mon, 13 Jan 2025 23:34:02 +0100 Subject: [PATCH 13/13] fix segfault --- src/cpp/src/perf_metrics.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpp/src/perf_metrics.cpp b/src/cpp/src/perf_metrics.cpp index 3725dc0cfc..c1833c8b9f 100644 --- a/src/cpp/src/perf_metrics.cpp +++ b/src/cpp/src/perf_metrics.cpp @@ -93,7 +93,7 @@ void PerfMetrics::evaluate_statistics(std::optional start_time) { return; } // If start_item is specified then recalculate durations according to start times and calculate statistics only after that. - if (start_time.has_value()) { + if (start_time.has_value() && raw_metrics.m_new_token_times.size() > 0 && raw_metrics.m_batch_sizes.size() > 0) { auto start_time_val = *start_time; auto& tok_times = raw_metrics.m_new_token_times; auto& batch_sizes = raw_metrics.m_batch_sizes;