Skip to content

CB: added PerfMetrics support #1524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ struct PipelineMetrics {
* Running average of the KV cache usage during the lifetime of the pipeline, with max window size of 1000 steps
*/
float avg_cache_usage = 0.0;

/**
* Number of tokens scheduled for processing at the previous step of the pipeline.
*/
size_t total_num_scheduled_tokens;
};

class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/include/openvino/genai/generation_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "openvino/genai/generation_config.hpp"
#include "openvino/genai/visibility.hpp"
#include "openvino/genai/perf_metrics.hpp"

namespace ov::genai {
enum class GenerationStatus {
Expand All @@ -30,6 +31,9 @@ struct EncodedGenerationResult {

// Status of generation
GenerationStatus m_status = GenerationStatus::RUNNING;

// PerfMetrics but with empty tokenization/detokenization durations.
PerfMetrics perf_metrics;
};

enum class GenerationFinishReason {
Expand All @@ -50,6 +54,9 @@ struct GenerationResult {

// Status of generation
GenerationStatus m_status = GenerationStatus::RUNNING;

// PerfMetrics
PerfMetrics perf_metrics;
};

struct GenerationOutput {
Expand Down
59 changes: 54 additions & 5 deletions src/cpp/src/continuous_batching_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
OptionalGenerationConfig generation_config,
StreamerVariant streamer
) override {
// Get the currrent timestamp in order to evaluate total generate duration.
auto start_time = std::chrono::steady_clock::now();

std::vector<std::string> prompts = std::visit(overloaded{
[](const std::string& prompt) {
return std::vector{prompt};
Expand All @@ -87,8 +90,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
}, inputs);
const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<GenerationResult> generated = m_impl.generate(
prompts,
std::vector<GenerationResult> generated = m_impl.generate(prompts,
std::vector<GenerationConfig>{prompts.size(), config},
streamer
);
Expand All @@ -99,14 +101,44 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_replies), std::move(plain_scores)};
PerfMetrics perf_metrics;
// For GenerationResults, all perf_metrics are the same except tokenization and detokenization durations.
// Since we return here only one perf_metrics, we should accumulate all tokenization and detokenization times.
if (generated.size() > 0) {
perf_metrics = generated[0].perf_metrics;
}

// Tokenizations and detokenization times are dispersed across GenerationResult vector.
// Need to collect them into a single perf_metric for DecodedResult.
auto& raw_metrics = perf_metrics.raw_metrics;
for (size_t i = 1; i < generated.size(); ++i){
auto tok_durations = generated[i].perf_metrics.raw_metrics.tokenization_durations;
auto detok_durations = generated[i].perf_metrics.raw_metrics.detokenization_durations;
for (size_t j = 0; j < tok_durations.size(); ++j) {
raw_metrics.tokenization_durations.emplace_back(tok_durations[j]);
}
for (size_t j = 0; j < detok_durations.size(); ++j) {
raw_metrics.detokenization_durations.emplace_back(detok_durations[j]);
}
}

raw_metrics.generate_durations.clear();
raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Need to reevaluate statistics with the updated start_time which includes tokenization/detokenization durations.
perf_metrics.m_evaluated = false;
perf_metrics.evaluate_statistics(start_time);

return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)};
}

EncodedResults generate(
const EncodedInputs& inputs,
OptionalGenerationConfig generation_config,
StreamerVariant streamer
) override {
// Get the currrent timestamp in order to evaluate total generate duration.
auto start_time = std::chrono::steady_clock::now();

std::vector<ov::Tensor> input_ids = std::visit(overloaded{
[](const ov::Tensor& inp) {
size_t batch_size = inp.get_shape().at(0);
Expand Down Expand Up @@ -148,15 +180,32 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {

const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<EncodedGenerationResult> generated = m_impl.generate(input_ids, std::vector<GenerationConfig>{input_ids.size(), config}, streamer);
std::vector<EncodedGenerationResult> generated = m_impl.generate(input_ids,
std::vector<GenerationConfig>{input_ids.size(), config},
streamer
);

std::vector<std::vector<int64_t>> plain_tokens;
std::vector<float> plain_scores;
for (EncodedGenerationResult& res : generated) {
OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::DROPPED_BY_HANDLE, "Got unfinished GenerationStatus");
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_tokens), std::move(plain_scores)};

PerfMetrics perf_metrics;
// For EncodedGenerationResults, all perf_metrics are the same.
if (generated.size() > 0) {
perf_metrics = generated[0].perf_metrics;
}
auto& raw_counters = perf_metrics.raw_metrics;
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Updated generate duration, need to reevaluate statistics.
perf_metrics.m_evaluated = false;
perf_metrics.evaluate_statistics(start_time);

return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)};
}

void start_chat(const std::string& system_message) override {
Expand Down
25 changes: 22 additions & 3 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage);
_register_step_cache_usage(scheduler_output.m_cache_usage);
m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage();
m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens;

static ManualTimer copy_blocks_timer("scheduling");
copy_blocks_timer.start();
m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map);
copy_blocks_timer.end();
}

// if no tokens were scheduled, we are out of memory => free all requests and return
if (scheduler_output.m_total_num_scheduled_tokens == 0) {
for (size_t i = 0; i < m_requests.size(); ++i) {
Expand All @@ -170,7 +170,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
_free_non_running_requests();
return;
}

ov::Tensor logits;
{
static ManualTimer timer("forward");
Expand Down Expand Up @@ -257,6 +256,11 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
const StreamerVariant& streamer) {
OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

auto start_time = std::chrono::steady_clock::now();
PerfMetrics perf_metrics;
auto& raw_perf_counters = perf_metrics.raw_metrics;
raw_perf_counters.m_inference_durations = {{ MicroSeconds(0.0f) }};

// checks that all requests has the same LoRA adapters property value
for (size_t i = 1; i < sampling_params.size(); ++i) {
Expand Down Expand Up @@ -303,13 +307,21 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
auto num_generated_tokens = get_metrics().total_num_scheduled_tokens;
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}

auto & generation = generations.at(0);
GenerationHandle & generation = generations.at(0);
if (streamer_ptr && generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
Expand Down Expand Up @@ -358,6 +370,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}

result.m_status = generations[request_id]->get_status();

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
perf_metrics.raw_metrics.generate_durations.clear();
perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
perf_metrics.evaluate_statistics(start_time);

result.perf_metrics = perf_metrics;
results.push_back(std::move(result));
}

Expand Down
27 changes: 24 additions & 3 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,66 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
std::vector<ov::genai::GenerationConfig> sampling_params,
const StreamerVariant& streamer) {
std::vector<ov::Tensor> input_ids;
auto start_time = std::chrono::steady_clock::now();

std::vector<MicroSeconds> tokenization_durations;
static ManualTimer timer("tokenize");
if (m_is_chat_conversation) {
OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts");
m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}});
constexpr bool add_generation_prompt = true;
std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt);
timer.start();
const auto encode_start = std::chrono::steady_clock::now();
// ov::genai::add_special_tokens(false) is aligned with stateful pipeline
input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids);
tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start));
timer.end();
} else {
input_ids.reserve(prompts.size());
timer.start();
for (const std::string& prompt : prompts) {
const auto encode_start = std::chrono::steady_clock::now();
input_ids.push_back(m_tokenizer.encode(prompt).input_ids);
tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start));
}
timer.end();
}

std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);

std::vector<GenerationResult> decoded;
decoded.reserve(encoded.size());
for (EncodedGenerationResult& res : encoded) {
for (size_t i = 0; i < encoded.size(); ++i) {
EncodedGenerationResult res = encoded[i];
auto& perf_metrics = res.perf_metrics;
auto& raw_counters = perf_metrics.raw_metrics;
raw_counters.tokenization_durations.emplace_back(tokenization_durations[i]);

std::vector<std::string> generated;
generated.reserve(res.m_generation_ids.size());
for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) {
const auto decode_start = std::chrono::steady_clock::now();
generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx)));
raw_counters.detokenization_durations.emplace_back(std::chrono::steady_clock::now() - decode_start);
if (m_is_chat_conversation && 0 == idx) {
m_history.push_back({{"role", "assistant"}, {"content", generated.back()}});
}
}

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
// The same perf metrics for each sequence, only tokenization/detokenization will differ.
perf_metrics.raw_metrics.generate_durations.clear();
perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Reevaluate taking into accound tokenization/detokenization times.
perf_metrics.m_evaluated = false;
perf_metrics.evaluate_statistics(start_time);

decoded.push_back(GenerationResult{
res.m_request_id,
std::move(generated),
std::move(res.m_scores),
res.m_status
res.m_status,
perf_metrics,
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/llm_pipeline_stateful.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ DecodedResults StatefulLLMPipeline::generate(

auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/llm_pipeline_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ DecodedResults StatefulLLMPipeline::generate(
decoded_results.perf_metrics = encoded_results.perf_metrics;
auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down Expand Up @@ -1221,7 +1221,7 @@ DecodedResults StatelessLLMPipeline::generate(
decoded_results.perf_metrics = encoded_results.perf_metrics;
auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down
Loading