Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use circular buffer of infer requests in VLM components #1833

Merged
merged 3 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties = {},
const ov::AnyMap& tokenizer_properties = {});
const ov::AnyMap& tokenizer_properties = {},
const ov::AnyMap& vision_encoder_properties = {});

/**
* @brief Constructs a ContinuousBatchingPipeline when ov::genai::Tokenizer is initialized manually using file from the different dirs.
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties,
const ov::AnyMap& tokenizer_properties) {
const ov::AnyMap& tokenizer_properties,
const ov::AnyMap& vision_encoder_properties) {
auto start_time = std::chrono::steady_clock::now();

auto properties_without_draft_model = properties;
Expand All @@ -73,7 +74,7 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p

std::shared_ptr<InputsEmbedder> embedder;
if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml")) {
embedder = std::make_shared<InputsEmbedder>(directory, device, properties);
embedder = std::make_shared<InputsEmbedder>(directory, device, vision_encoder_properties);
}

if (is_prompt_lookup_enabled) {
Expand Down
8 changes: 2 additions & 6 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,8 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::add_request(uint64_t re
GenerationConfig sampling_params) {
OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings.");
ov::genai::VLMPerfMetrics metrics;
ov::Tensor inputs;
{
const std::lock_guard<std::mutex> lock(m_inputs_embedder_mutex);
m_inputs_embedder->set_apply_chat_template_status(sampling_params.apply_chat_template);
inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics);
}
m_inputs_embedder->set_apply_chat_template_status(sampling_params.apply_chat_template);
ov::Tensor inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics);
return add_request(request_id, inputs, sampling_params);
}

Expand Down
1 change: 0 additions & 1 deletion src/cpp/src/icontinuous_batching.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {

ModelInputType m_model_input_type = ModelInputType::TOKENS;
std::shared_ptr<InputsEmbedder> m_inputs_embedder;
std::mutex m_inputs_embedder_mutex;

void stream_tokens(const std::shared_ptr<ThreadedStreamerWrapper>& streamer_ptr, const GenerationHandle& handle);
public:
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/llm_pipeline_stateful.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ EncodedResults StatefulLLMPipeline::generate(
}

ov::genai::utils::GenerationFinishInfo finish_info = get_lm_encoded_results(m_model_runner, input_ids, concatenated_attention_mask, streamer_ptr, m_sampler,
requests, position_ids, m_kv_cache_state, std::nullopt, std::nullopt, m_max_kv_cache_size);
requests, position_ids, m_kv_cache_state, nullptr, std::nullopt, m_max_kv_cache_size);
ov::genai::EncodedResults& result = finish_info.results;
m_chat_generation_finish_status = finish_info.streaming_finish_status;

Expand Down
8 changes: 4 additions & 4 deletions src/cpp/src/lm_encoding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results(
std::vector<SequenceGroup::Ptr> sequence_groups,
std::optional<ov::Tensor> position_ids,
utils::KVCacheState& kv_cache_state,
std::optional<EmbeddingsModel> m_embedding,
EmbeddingsModel::Ptr m_embedding,
std::optional<int64_t> rope_delta,
const size_t max_kv_cache_size
) {
Expand Down Expand Up @@ -134,7 +134,7 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results(

// Initialize inputs

if (m_embedding.has_value()) {
if (m_embedding) {
m_llm.set_tensor("inputs_embeds", input_ids);
} else {
kv_cache_state.add_inputs(input_ids);
Expand Down Expand Up @@ -224,9 +224,9 @@ ov::genai::utils::GenerationFinishInfo get_lm_encoded_results(
beam_offets[active_sequence_groups.at(i)->get_request_id()] = i == 0 ? 0 : (active_sequence_groups.at(i - 1)->num_running_seqs() + beam_offets[i - 1]);
}

if (m_embedding.has_value()) {
if (m_embedding) {
constexpr bool return_remote_tensor = true;
const ov::Tensor& embed_prompt_tensor = (*m_embedding).infer(new_input_ids, return_remote_tensor);
const ov::Tensor& embed_prompt_tensor = m_embedding->infer(new_input_ids, return_remote_tensor);
m_llm.set_tensor("inputs_embeds", embed_prompt_tensor);
} else {
m_llm.set_tensor("input_ids", new_input_ids);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/lm_encoding.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace genai {

ov::genai::utils::GenerationFinishInfo get_lm_encoded_results(ov::InferRequest& m_llm, const ov::Tensor& input_ids, const ov::Tensor& attention_mask,
const std::shared_ptr<StreamerBase>& streamer_ptr, Sampler& sampler, std::vector<SequenceGroup::Ptr> sequence_groups,
std::optional<ov::Tensor> position_ids, utils::KVCacheState& m_kv_cache_state, std::optional<EmbeddingsModel> m_embedding,
std::optional<ov::Tensor> position_ids, utils::KVCacheState& m_kv_cache_state, EmbeddingsModel::Ptr m_embedding,
std::optional<int64_t> rope_delta = std::nullopt, const size_t max_kv_cache_size = std::numeric_limits<size_t>::max());


Expand Down
9 changes: 3 additions & 6 deletions src/cpp/src/model_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ModelRunner {
// A model to compute token embeddings.
// Input shape: [N, conversation length].
// Output shape: [1, conversation length, hidden_size].
EmbeddingsModel m_embedding;
EmbeddingsModel::Ptr m_embedding;

public:
/**
Expand Down Expand Up @@ -81,7 +81,7 @@ class ModelRunner {
return m_request;
}

void set_embedding_model(const EmbeddingsModel& embedder) {
void set_embedding_model(const EmbeddingsModel::Ptr& embedder) {
m_embedding = embedder;
}

Expand Down Expand Up @@ -157,7 +157,6 @@ class ModelRunner {
int64_t *input_ids_data = nullptr;

if (sequence_group_type == SequenceGroupType::EMBEDDINGS) {
OPENVINO_ASSERT(m_embedding.get_request(), "Got sequence group with embeddings, but embeddings model wasn't set.");
inputs_embeds_data = inputs_embeds.data<float>();
} else if (sequence_group_type == SequenceGroupType::TOKENS) {
input_ids_data = input_ids.data<int64_t>();
Expand Down Expand Up @@ -330,8 +329,6 @@ class ModelRunner {

ov::Tensor generated_ids_embeds;
float *generated_ids_embeds_data = nullptr;

OPENVINO_ASSERT(m_embedding.get_request(), "Got sequence group with embeddings, but embeddings model wasn't set.");

ov::Tensor generated_ids = ov::Tensor(ov::element::i64, {1, num_generated_ids_without_embeddings});
int64_t *generated_ids_data = generated_ids.data<int64_t>();
Expand All @@ -348,7 +345,7 @@ class ModelRunner {
}
}
if (pos > 0) {
generated_ids_embeds = m_embedding.infer(generated_ids);
generated_ids_embeds = m_embedding->infer(generated_ids);
generated_ids_embeds_data = generated_ids_embeds.data<float>();

for (size_t i = 0; i < num_sequence_groups; ++i) {
Expand Down
15 changes: 5 additions & 10 deletions src/cpp/src/tokenizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,7 @@ class Tokenizer::TokenizerImpl {
set_state_if_necessary(infer_request_guard, tokenization_params);
size_t batch_size = 1;
infer_request_guard.get().set_input_tensor(ov::Tensor{ov::element::string, {batch_size}, &prompt});
infer_request_guard.get().start_async();
infer_request_guard.get().wait();
infer_request_guard.get().infer();

return get_copied_results(
infer_request_guard.get().get_tensor("input_ids"),
Expand All @@ -457,8 +456,7 @@ class Tokenizer::TokenizerImpl {
set_state_if_necessary(infer_request_guard, tokenization_params);
infer_request_guard.get().set_input_tensor(ov::Tensor{ov::element::string, {prompts.size()}, prompts.data()});
auto size_ = infer_request_guard.get().get_input_tensor().get_shape();
infer_request_guard.get().start_async();
infer_request_guard.get().wait();
infer_request_guard.get().infer();

unpadded = get_copied_results(
infer_request_guard.get().get_tensor("input_ids"),
Expand All @@ -485,8 +483,7 @@ class Tokenizer::TokenizerImpl {
set_state_if_necessary(infer_request_guard, detokenization_params);
size_t batch_size = 1;
infer_request_guard.get().set_input_tensor(ov::Tensor{ov::element::i64, {batch_size, tokens.size()}, tokens.data()});
infer_request_guard.get().start_async();
infer_request_guard.get().wait();
infer_request_guard.get().infer();
return infer_request_guard.get().get_output_tensor().data<std::string>()[0];
}

Expand All @@ -498,8 +495,7 @@ class Tokenizer::TokenizerImpl {
CircularBufferQueueElementGuard<ov::InferRequest> infer_request_guard(this->m_ireq_queue_detokenizer.get());
set_state_if_necessary(infer_request_guard, detokenization_params);
infer_request_guard.get().set_input_tensor(tokens);
infer_request_guard.get().start_async();
infer_request_guard.get().wait();
infer_request_guard.get().infer();

auto res = infer_request_guard.get().get_output_tensor();
auto res_data = res.data<std::string>();
Expand Down Expand Up @@ -527,8 +523,7 @@ class Tokenizer::TokenizerImpl {
CircularBufferQueueElementGuard<ov::InferRequest> infer_request_guard(this->m_ireq_queue_detokenizer.get());
set_state_if_necessary(infer_request_guard, detokenization_params);
infer_request_guard.get().set_input_tensor(tokens);
infer_request_guard.get().start_async();
infer_request_guard.get().wait();
infer_request_guard.get().infer();
auto res = infer_request_guard.get().get_output_tensor();
auto res_data = res.data<std::string>();
return std::vector<std::string>(res_data, res_data + res.get_shape()[0]);
Expand Down
48 changes: 28 additions & 20 deletions src/cpp/src/visual_language/embedding_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@

namespace {

std::tuple<ov::InferRequest, ov::Tensor, ov::Tensor> init(ov::CompiledModel& compiled) {
ov::InferRequest ireq = compiled.create_infer_request();
ov::Tensor cpu_tensor = ireq.get_output_tensor();
ov::RemoteContext context;
try {
context = compiled.get_context();
} catch (const ov::Exception&) {
return {std::move(ireq), cpu_tensor, cpu_tensor};
}
ov::RemoteTensor remote = context.create_tensor(ov::element::f32, cpu_tensor.get_shape());
return {std::move(ireq), std::move(cpu_tensor), std::move(remote)};
std::unique_ptr<ov::genai::CircularBufferQueue<ov::genai::EmbeddingsRequest>> init(ov::CompiledModel& compiled) {
auto embeddings_requests_queue = std::make_unique<ov::genai::CircularBufferQueue<ov::genai::EmbeddingsRequest>>(
compiled.get_property(ov::optimal_number_of_infer_requests),
[&compiled]() -> ov::genai::EmbeddingsRequest {
ov::genai::EmbeddingsRequest req;
req.ireq = compiled.create_infer_request();
req.cpu_tensor = req.ireq.get_output_tensor();
ov::RemoteContext context;
try {
context = compiled.get_context();
} catch (const ov::Exception&) {
req.remote_tensor = req.cpu_tensor;
return req;
}
req.remote_tensor = context.create_tensor(ov::element::f32, req.cpu_tensor.get_shape());
return req;
});
return embeddings_requests_queue;
}

} // namespace
Expand All @@ -44,7 +51,7 @@ EmbeddingsModel::EmbeddingsModel(const std::filesystem::path& model_dir,

ov::CompiledModel compiled_model = core.compile_model(m_model, device, properties);
ov::genai::utils::print_compiled_model_properties(compiled_model, "text embeddings model");
std::tie(m_request, m_cpu_tensor, m_remote_tensor) = init(compiled_model);
m_embeddings_requests_queue = init(compiled_model);
}

EmbeddingsModel::EmbeddingsModel(const std::string& model,
Expand All @@ -58,20 +65,21 @@ EmbeddingsModel::EmbeddingsModel(const std::string& model,
merge_postprocess(m_model, scale_emb);

ov::CompiledModel compiled_model = core.compile_model(m_model, device, properties);
std::tie(m_request, m_cpu_tensor, m_remote_tensor) = init(compiled_model);
m_embeddings_requests_queue = init(compiled_model);
}

ov::Tensor EmbeddingsModel::infer(const ov::Tensor& input_idx, bool return_remote_tensor) {
OPENVINO_ASSERT(m_request, "Text embeddings decoder model must be compiled first. Cannot infer non-compiled model");

m_request.set_input_tensor(input_idx);
CircularBufferQueueElementGuard<EmbeddingsRequest> embeddings_request_guard(this->m_embeddings_requests_queue.get());
EmbeddingsRequest& req = embeddings_request_guard.get();
OPENVINO_ASSERT(req.ireq, "Text embeddings decoder model must be compiled first. Cannot infer non-compiled model");
req.ireq.set_input_tensor(input_idx);
if (return_remote_tensor) {
m_request.set_output_tensor(m_remote_tensor);
req.ireq.set_output_tensor(req.remote_tensor);
} else {
m_request.set_output_tensor(m_cpu_tensor);
req.ireq.set_output_tensor(req.cpu_tensor);
}
m_request.infer();
return m_request.get_output_tensor();
req.ireq.infer();
return req.ireq.get_output_tensor();
}

void EmbeddingsModel::merge_postprocess(std::shared_ptr<ov::Model> model, float scale_emb) const {
Expand Down
31 changes: 25 additions & 6 deletions src/cpp/src/visual_language/embedding_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <filesystem>
#include <vector>
#include <string>
#include <memory>

#include "openvino/core/any.hpp"
#include "openvino/runtime/core.hpp"
Expand All @@ -17,12 +18,21 @@

#include <openvino/openvino.hpp>
#include "visual_language/processor_config.hpp"
#include "circular_buffer_queue.hpp"

namespace ov {
namespace genai {

struct EmbeddingsRequest {
ov::InferRequest ireq;
ov::Tensor cpu_tensor;
ov::Tensor remote_tensor;
};

class EmbeddingsModel {
public:
using Ptr = std::shared_ptr<EmbeddingsModel>;

EmbeddingsModel(const std::filesystem::path& model_dir,
const float scale_emb,
const std::string& device,
Expand All @@ -36,17 +46,26 @@ class EmbeddingsModel {

EmbeddingsModel() = default;

ov::Tensor infer(const ov::Tensor& input_idx, bool return_remote_tensor=false);
static Ptr create(const std::filesystem::path& model_dir,
const float scale_emb,
const std::string& device,
const ov::AnyMap& properties) {
return std::make_shared<EmbeddingsModel>(model_dir, scale_emb, device, properties);
}

ov::InferRequest get_request() {
return m_request;
static Ptr create(const std::string& model,
const ov::Tensor& weights,
const float scale_emb,
const std::string& device,
const ov::AnyMap& properties) {
return std::make_shared<EmbeddingsModel>(model, weights, scale_emb, device, properties);
}

ov::Tensor infer(const ov::Tensor& input_idx, bool return_remote_tensor=false);
private:
void merge_postprocess(std::shared_ptr<ov::Model> model, float scale_emb) const;

ov::InferRequest m_request;
ov::Tensor m_cpu_tensor;
ov::Tensor m_remote_tensor;
std::unique_ptr<CircularBufferQueue<EmbeddingsRequest>> m_embeddings_requests_queue;
};

} // namespace genai
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/src/visual_language/inputs_embedder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ InputsEmbedder::IInputsEmbedder::IInputsEmbedder(
const ov::AnyMap device_config) :
m_vlm_config{vlm_config},
m_vision_encoder(VisionEncoder::create(model_dir, m_vlm_config.model_type, device, device_config)),
m_embedding(model_dir, m_vlm_config.scale_emb, device, device_config),
m_embedding(EmbeddingsModel::create(model_dir, m_vlm_config.scale_emb, device, device_config)),
m_tokenizer{model_dir, device_config} { }

InputsEmbedder::IInputsEmbedder::IInputsEmbedder(
Expand All @@ -102,13 +102,13 @@ InputsEmbedder::IInputsEmbedder::IInputsEmbedder(
device,
device_config
)),
m_embedding(
m_embedding(EmbeddingsModel::create(
utils::get_model_weights_pair(models_map, "text_embeddings").first,
utils::get_model_weights_pair(models_map, "text_embeddings").second,
m_vlm_config.scale_emb,
device,
device_config
),
)),
m_tokenizer(tokenizer) { }

ov::Tensor InputsEmbedder::IInputsEmbedder::apply_chat_template_tokenize(const std::string& prompt, ov::genai::VLMPerfMetrics& metrics) {
Expand Down Expand Up @@ -243,7 +243,7 @@ std::pair<ov::Tensor, std::optional<int64_t>> InputsEmbedder::get_position_ids(c
return m_impl->get_position_ids(inputs_embeds_size, history_size);
}

EmbeddingsModel InputsEmbedder::get_embedding_model() const {
EmbeddingsModel::Ptr InputsEmbedder::get_embedding_model() const {
return m_impl->get_embedding_model();
}

Expand Down
6 changes: 3 additions & 3 deletions src/cpp/src/visual_language/inputs_embedder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class InputsEmbedder {
std::pair<ov::Tensor, std::optional<int64_t>> get_position_ids(const size_t inputs_embeds_size, const size_t history_size);

// returns embedding model which converts token_id(s) to embedding vectors
EmbeddingsModel get_embedding_model() const;
EmbeddingsModel::Ptr get_embedding_model() const;

// returns tokenizer
Tokenizer get_tokenizer() const;
Expand Down Expand Up @@ -71,7 +71,7 @@ class InputsEmbedder {
// A model to compute token embeddings.
// Input shape: [N, conversation length].
// Output shape: [1, conversation length, hidden_size].
EmbeddingsModel m_embedding;
EmbeddingsModel::Ptr m_embedding;
// A tokenizer encoding a prompt.
Tokenizer m_tokenizer;
// True if chat mode is activated to save conversation
Expand All @@ -95,7 +95,7 @@ class InputsEmbedder {

virtual std::pair<ov::Tensor, std::optional<int64_t>> get_position_ids(const size_t inputs_embeds_size, const size_t history_size);

EmbeddingsModel get_embedding_model() const {
EmbeddingsModel::Ptr get_embedding_model() const {
return m_embedding;
}

Expand Down
Loading
Loading