From fc25c8ed897970378e540c62e1cc6fced4ba37bd Mon Sep 17 00:00:00 2001 From: Pavel Esir Date: Fri, 10 Jan 2025 13:03:07 +0100 Subject: [PATCH] Add ContinuousBatching PerfMetrics --- .../genai/continuous_batching_pipeline.hpp | 1 + src/cpp/src/continuous_batching_adapter.hpp | 9 ++++--- src/cpp/src/continuous_batching_impl.cpp | 27 +++++++++++++------ src/cpp/src/continuous_batching_impl.hpp | 4 +-- src/cpp/src/continuous_batching_pipeline.cpp | 4 +-- src/cpp/src/icontinuous_batching.cpp | 19 ++++++++++--- src/cpp/src/icontinuous_batching.hpp | 6 ++--- .../src/prompt_lookup/prompt_lookup_impl.cpp | 11 +++++--- .../src/prompt_lookup/prompt_lookup_impl.hpp | 4 +-- .../speculative_decoding_impl.cpp | 11 +++++--- .../speculative_decoding_impl.hpp | 4 +-- 11 files changed, 67 insertions(+), 33 deletions(-) diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index ed9fc3a30d..c68479bb21 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -64,6 +64,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { friend class ContinuousBatchingForPromptLookupImpl; friend class SpeculativeDecodingImpl; friend class PromptLookupImpl; + friend class ContinuousBatchingAdapter; std::shared_ptr m_impl; diff --git a/src/cpp/src/continuous_batching_adapter.hpp b/src/cpp/src/continuous_batching_adapter.hpp index 0b0065aa1f..4e1636908d 100644 --- a/src/cpp/src/continuous_batching_adapter.hpp +++ b/src/cpp/src/continuous_batching_adapter.hpp @@ -4,6 +4,7 @@ #include "llm_pipeline_base.hpp" +#include "icontinuous_batching.hpp" #include "openvino/genai/continuous_batching_pipeline.hpp" namespace ov::genai { @@ -87,7 +88,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { }, inputs); const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate( + auto [generated, perf_metrics] = m_impl.m_impl->generate( prompts, std::vector{prompts.size(), config}, streamer @@ -99,7 +100,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_replies), std::move(plain_scores)}; + return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)}; } EncodedResults generate( @@ -148,7 +149,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; // -1 == config.eos_token_id and config.validate() are handled in m_impl. - std::vector generated = m_impl.generate(input_ids, std::vector{input_ids.size(), config}, streamer); + auto [generated, perf_metrics] = m_impl.m_impl->generate(input_ids, std::vector{input_ids.size(), config}, streamer); std::vector> plain_tokens; std::vector plain_scores; for (EncodedGenerationResult& res : generated) { @@ -156,7 +157,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase { std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); } - return {std::move(plain_tokens), std::move(plain_scores)}; + return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)}; } void start_chat(const std::string& system_message) override { diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 44bfaf7f21..05e9d3a9eb 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -133,7 +133,7 @@ bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_reques return !m_awaiting_requests.empty() || !m_requests.empty(); } -void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { +size_t ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { static ManualTimer step_timer("step()"); step_timer.start(); @@ -157,7 +157,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map); copy_blocks_timer.end(); } - // if no tokens were scheduled, we are out of memory => free all requests and return if (scheduler_output.m_total_num_scheduled_tokens == 0) { for (size_t i = 0; i < m_requests.size(); ++i) { @@ -168,9 +167,10 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } } _free_non_running_requests(); - return; + return 0; } - + size_t num_generated_tokens = scheduler_output.m_total_num_scheduled_tokens; + ov::Tensor logits; { static ManualTimer timer("forward"); @@ -243,6 +243,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } step_timer.end(); + return num_generated_tokens; } void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std::optional& adapters) { @@ -251,13 +252,16 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std: } } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + PerfMetrics perf_metrics; + auto& raw_perf_counters = perf_metrics.raw_metrics; + // checks that all requests has the same LoRA adapters property value for (size_t i = 1; i < sampling_params.size(); ++i) { OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters, @@ -303,13 +307,20 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorcan_read()) { std::unordered_map token = generation->back(); for (const auto& gen_token : token.begin()->second.generated_ids) { @@ -362,7 +373,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index c1c0677ff3..b68f7cad05 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -140,11 +140,11 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() { } std::vector ContinuousBatchingPipeline::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(input_ids, sampling_params, streamer); + return m_impl->generate(input_ids, sampling_params, streamer).first; } std::vector ContinuousBatchingPipeline::generate(const std::vector& prompts, const std::vector& sampling_params, const StreamerVariant& streamer) { - return m_impl->generate(prompts, sampling_params, streamer); + return m_impl->generate(prompts, sampling_params, streamer).first; } void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index e32616b0aa..5fb626a453 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -29,12 +29,13 @@ void ContinuousBatchingPipeline::IContinuousBatchingPipeline::finish_chat() { m_history.clear(); }; -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer) { std::vector input_ids; + auto start_time = std::chrono::steady_clock::now(); static ManualTimer timer("tokenize"); if (m_is_chat_conversation) { @@ -55,9 +56,10 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( timer.end(); } - std::vector encoded = generate(input_ids, sampling_params, streamer); - + // std::vector encoded = generate(input_ids, sampling_params, streamer); + auto [encoded, perf_metrics] = generate(input_ids, sampling_params, streamer); std::vector decoded; + auto decode_start_time = std::chrono::steady_clock::now(); decoded.reserve(encoded.size()); for (EncodedGenerationResult& res : encoded) { std::vector generated; @@ -76,7 +78,16 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( res.m_status }); } + auto stop_time = std::chrono::steady_clock::now(); + + auto& raw_counters = perf_metrics.raw_metrics; + raw_counters.generate_durations = std::vector(); + raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time)); + raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_start_time - start_time)); + raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(stop_time - decode_start_time)); + perf_metrics.m_evaluated = false; + perf_metrics.evaluate_statistics(start_time); - return decoded; + return {decoded, perf_metrics}; } } diff --git a/src/cpp/src/icontinuous_batching.hpp b/src/cpp/src/icontinuous_batching.hpp index 12030f06f7..d59384765b 100644 --- a/src/cpp/src/icontinuous_batching.hpp +++ b/src/cpp/src/icontinuous_batching.hpp @@ -70,12 +70,12 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs a single inference step of all running (and pulls awaiting) requests */ - virtual void step() = 0; + virtual size_t step() = 0; /** * Performs monolitic generation based on encoded prompts */ - virtual std::vector + virtual std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) = 0; @@ -83,7 +83,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { /** * Performs monolitic generation based on text prompts */ - std::vector + std::pair, PerfMetrics> generate(const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer); diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index 7a893a2603..57794fd51e 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -28,7 +28,7 @@ bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() { return m_pipeline->has_non_finished_requests(); } -void ContinuousBatchingPipeline::PromptLookupImpl::step() { +size_t ContinuousBatchingPipeline::PromptLookupImpl::step() { ManualTimer candidates_timer("prompt_lookup_decoding: generate_candidates()"); candidates_timer.start(); m_pipeline->generate_candidates(); @@ -67,9 +67,12 @@ void ContinuousBatchingPipeline::PromptLookupImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } + + // TODO: add valid number of output tokens + return 0; } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -110,6 +113,8 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector results; results.reserve(input_ids.size()); + PerfMetrics perf_metrics; + // TODO: add collecting statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -158,7 +163,7 @@ ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 526c5df2d4..144712f6f5 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -128,7 +128,7 @@ void print_generated_request(const ov::genai::GeneratedRequests& requests) { } } -void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { +size_t ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { // this blocks adding new requests during step as it may break coherence between main and draft models std::lock_guard lock{m_draft_generations_mutex}; m_draft_pipeline->pull_awaiting_requests(true); @@ -186,9 +186,12 @@ void ContinuousBatchingPipeline::SpeculativeDecodingImpl::step() { m_sd_metrics.print(true); m_sd_metrics.clean_up(); } + + // TODO: return valid number of generated tokens + return 0; } -std::vector +std::pair, PerfMetrics> ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { @@ -236,6 +239,8 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< std::vector results; results.reserve(input_ids.size()); + PerfMetrics perf_metrics; + // TODO: add collecting perf statistics bool continue_generation = true; while (has_non_finished_requests() && continue_generation) { @@ -280,7 +285,7 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< OPENVINO_ASSERT(results.size() == input_ids.size()); generate_timer.end(); - return results; + return {results, perf_metrics}; } SpeculativeDecodingMetrics diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index 2f8067cbab..dfc3e76545 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -54,9 +54,9 @@ class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBat bool has_non_finished_requests() override; - void step() override; + size_t step() override; - std::vector + std::pair, PerfMetrics> generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override;