From 3daf0deaa4b3ce6bbbf4027362e3ef8399912627 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 13 Dec 2024 11:42:09 +0400 Subject: [PATCH 1/6] Update streaming in LM Encoding --- src/cpp/src/lm_encoding.cpp | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index c76d9f7edf..46ca219547 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -202,12 +202,22 @@ std::pair get_lm_encoded_results( raw_perf_counters.m_new_token_times.emplace_back(infer_end); raw_perf_counters.m_batch_sizes.emplace_back(batch_size); - if (streamer_ptr) { - // stream data from first sequence - int64_t out_token = sequence_groups.at(0).get()->operator[](0)->get_generated_ids().back(); - if (streamer_ptr->put(out_token)) { - break; + bool continue_generation = true; + if (streamer_ptr && continue_generation) { + // not generated tokens like several prompt phase + if (!generations.at(0).get()->can_read()) { + continue; } + std::unordered_map token = generations.at(0).get()->back(); + OPENVINO_ASSERT(1 <= token.size()); + OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); + for (const auto& gen_token : token.begin()->second.generated_ids) { + continue_generation = !streamer_ptr->put(gen_token); + if (!continue_generation) { + break; + } + } + } sampler_output = sampler.sample(active_sequence_groups, m_llm.get_tensor("logits")); From 310461e5f2609c0e38f145dd6a818a3307515598 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 13 Dec 2024 11:46:28 +0400 Subject: [PATCH 2/6] Improve code style --- src/cpp/src/lm_encoding.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index 46ca219547..54d6955f73 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -202,8 +202,7 @@ std::pair get_lm_encoded_results( raw_perf_counters.m_new_token_times.emplace_back(infer_end); raw_perf_counters.m_batch_sizes.emplace_back(batch_size); - bool continue_generation = true; - if (streamer_ptr && continue_generation) { + if (streamer_ptr) { // not generated tokens like several prompt phase if (!generations.at(0).get()->can_read()) { continue; @@ -212,8 +211,7 @@ std::pair get_lm_encoded_results( OPENVINO_ASSERT(1 <= token.size()); OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); for (const auto& gen_token : token.begin()->second.generated_ids) { - continue_generation = !streamer_ptr->put(gen_token); - if (!continue_generation) { + if (!streamer_ptr->put(gen_token)) { break; } } From be74340067eacefa528da49ccd3eb10f234abf0e Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 13 Dec 2024 12:02:15 +0400 Subject: [PATCH 3/6] CB --- src/cpp/src/continuous_batching_impl.cpp | 10 +++++++--- src/cpp/src/lm_encoding.cpp | 13 +++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index d27e8934dc..337ec0e92f 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -285,9 +285,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorcan_read()) { std::unordered_map token = generations.at(0).get()->back(); - OPENVINO_ASSERT(1 == token.size()); - OPENVINO_ASSERT(1 == token.begin()->second.generated_ids.size()); - continue_generation = !streamer_ptr->put(token.begin()->second.generated_ids.at(0)); + OPENVINO_ASSERT(1 <= token.size()); + OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); + for (const auto& gen_token : token.begin()->second.generated_ids) { + if (!streamer_ptr->put(gen_token)) { + break; + } + } } } diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index 54d6955f73..f1cec6d751 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -126,7 +126,7 @@ std::pair get_lm_encoded_results( get_active_sequence_groups), active_sequence_groups.end()); - while (active_sequence_groups.size() > 0) { + do { size_t total_num_tokens = 0; for (auto& sequence_group : active_sequence_groups) { @@ -214,8 +214,7 @@ std::pair get_lm_encoded_results( if (!streamer_ptr->put(gen_token)) { break; } - } - + } } sampler_output = sampler.sample(active_sequence_groups, m_llm.get_tensor("logits")); @@ -224,13 +223,7 @@ std::pair get_lm_encoded_results( active_sequence_groups.end(), get_active_sequence_groups), active_sequence_groups.end()); - } - - if (streamer_ptr) { - int64_t out_token = sequence_groups.at(0).get()->operator[](0)->get_generated_ids().back(); - streamer_ptr->put(out_token); - streamer_ptr->end(); - } + } while (active_sequence_groups.size() > 0); size_t next_selected_beam = 0; for (size_t i = 0; i < sequence_groups.size(); i++) { From 61e55a67f9777d67e89a627cb7d829e4d2feb406 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Sun, 15 Dec 2024 15:53:31 +0400 Subject: [PATCH 4/6] Update continuous_batching_impl.cpp --- src/cpp/src/continuous_batching_impl.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 337ec0e92f..1e42f5b2d9 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -285,8 +285,6 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vectorcan_read()) { std::unordered_map token = generations.at(0).get()->back(); - OPENVINO_ASSERT(1 <= token.size()); - OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); for (const auto& gen_token : token.begin()->second.generated_ids) { if (!streamer_ptr->put(gen_token)) { break; From ad44b3206d2561aa128746a549a1cda7c6df1a1b Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Sun, 15 Dec 2024 15:54:07 +0400 Subject: [PATCH 5/6] Revert --- src/cpp/src/lm_encoding.cpp | 35 ++++++++++--------- .../speculative_decoding_impl.cpp | 2 -- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index f1cec6d751..4a681b7999 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -126,7 +126,7 @@ std::pair get_lm_encoded_results( get_active_sequence_groups), active_sequence_groups.end()); - do { + while (active_sequence_groups.size() > 0) { size_t total_num_tokens = 0; for (auto& sequence_group : active_sequence_groups) { @@ -202,20 +202,14 @@ std::pair get_lm_encoded_results( raw_perf_counters.m_new_token_times.emplace_back(infer_end); raw_perf_counters.m_batch_sizes.emplace_back(batch_size); - if (streamer_ptr) { - // not generated tokens like several prompt phase - if (!generations.at(0).get()->can_read()) { - continue; + if (streamer_ptr && generations.at(0).get()->can_read()) { + std::unordered_map token = generations.at(0).get()->back(); + for (const auto& gen_token : token.begin()->second.generated_ids) { + if (!streamer_ptr->put(gen_token)) { + break; } - std::unordered_map token = generations.at(0).get()->back(); - OPENVINO_ASSERT(1 <= token.size()); - OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); - for (const auto& gen_token : token.begin()->second.generated_ids) { - if (!streamer_ptr->put(gen_token)) { - break; - } - } - } + } + } sampler_output = sampler.sample(active_sequence_groups, m_llm.get_tensor("logits")); @@ -223,7 +217,16 @@ std::pair get_lm_encoded_results( active_sequence_groups.end(), get_active_sequence_groups), active_sequence_groups.end()); - } while (active_sequence_groups.size() > 0); + } + + if (streamer_ptr && generations.at(0).get()->can_read()) { + std::unordered_map token = generations.at(0).get()->back(); + for (const auto& gen_token : token.begin()->second.generated_ids) { + if (!streamer_ptr->put(gen_token)) { + break; + } + } + } size_t next_selected_beam = 0; for (size_t i = 0; i < sequence_groups.size(); i++) { @@ -247,4 +250,4 @@ std::pair get_lm_encoded_results( } } // namespace genai -} // namespace ov +} // namespace ov \ No newline at end of file diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 2be67320a9..e4f3b1ad1f 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -232,8 +232,6 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector< continue; } std::unordered_map token = main_generations.at(0).get()->back(); - OPENVINO_ASSERT(1 <= token.size()); - OPENVINO_ASSERT(1 <= token.begin()->second.generated_ids.size()); for (const auto& gen_token : token.begin()->second.generated_ids) { continue_generation = !streamer_ptr->put(gen_token); if (!continue_generation) { From 02622c9476b14e49479dc67d673d3a6d2c050641 Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Mon, 16 Dec 2024 13:24:12 +0400 Subject: [PATCH 6/6] end --- src/cpp/src/lm_encoding.cpp | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/cpp/src/lm_encoding.cpp b/src/cpp/src/lm_encoding.cpp index 4a681b7999..3ab041fa58 100644 --- a/src/cpp/src/lm_encoding.cpp +++ b/src/cpp/src/lm_encoding.cpp @@ -125,6 +125,17 @@ std::pair get_lm_encoded_results( active_sequence_groups.end(), get_active_sequence_groups), active_sequence_groups.end()); + + auto stream_generated_tokens = [&streamer_ptr, &generations]() { + if (streamer_ptr && generations.at(0).get()->can_read()) { + std::unordered_map token = generations.at(0).get()->back(); + for (const auto& gen_token : token.begin()->second.generated_ids) { + if (!streamer_ptr->put(gen_token)) { + break; + } + } + } + }; while (active_sequence_groups.size() > 0) { size_t total_num_tokens = 0; @@ -202,14 +213,7 @@ std::pair get_lm_encoded_results( raw_perf_counters.m_new_token_times.emplace_back(infer_end); raw_perf_counters.m_batch_sizes.emplace_back(batch_size); - if (streamer_ptr && generations.at(0).get()->can_read()) { - std::unordered_map token = generations.at(0).get()->back(); - for (const auto& gen_token : token.begin()->second.generated_ids) { - if (!streamer_ptr->put(gen_token)) { - break; - } - } - } + stream_generated_tokens(); sampler_output = sampler.sample(active_sequence_groups, m_llm.get_tensor("logits")); @@ -219,13 +223,10 @@ std::pair get_lm_encoded_results( active_sequence_groups.end()); } - if (streamer_ptr && generations.at(0).get()->can_read()) { - std::unordered_map token = generations.at(0).get()->back(); - for (const auto& gen_token : token.begin()->second.generated_ids) { - if (!streamer_ptr->put(gen_token)) { - break; - } - } + // to stream last token + stream_generated_tokens(); + if (streamer_ptr) { + streamer_ptr->end(); } size_t next_selected_beam = 0;