Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CB: added PerfMetrics support #1524

Merged
merged 14 commits into from
Jan 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ struct PipelineMetrics {
* Running average of the KV cache usage during the lifetime of the pipeline, with max window size of 1000 steps
*/
float avg_cache_usage = 0.0;

/**
* Number of tokens scheduled for processing at the previous step of the pipeline.
*/
size_t total_num_scheduled_tokens;
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
};

class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/include/openvino/genai/generation_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "openvino/genai/generation_config.hpp"
#include "openvino/genai/visibility.hpp"
#include "openvino/genai/perf_metrics.hpp"

namespace ov::genai {
enum class GenerationStatus {
Expand All @@ -30,6 +31,9 @@ struct EncodedGenerationResult {

// Status of generation
GenerationStatus m_status = GenerationStatus::RUNNING;

// PerfMetrics but with empty tokenization/detokenization durations.
PerfMetrics perf_metrics;
};

enum class GenerationFinishReason {
Expand All @@ -50,6 +54,9 @@ struct GenerationResult {

// Status of generation
GenerationStatus m_status = GenerationStatus::RUNNING;

// PerfMetrics
PerfMetrics perf_metrics;
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
};

struct GenerationOutput {
Expand Down
59 changes: 54 additions & 5 deletions src/cpp/src/continuous_batching_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
OptionalGenerationConfig generation_config,
StreamerVariant streamer
) override {
// Get the currrent timestamp in order to evaluate total generate duration.
auto start_time = std::chrono::steady_clock::now();

std::vector<std::string> prompts = std::visit(overloaded{
[](const std::string& prompt) {
return std::vector{prompt};
Expand All @@ -87,8 +90,7 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
}, inputs);
const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<GenerationResult> generated = m_impl.generate(
prompts,
std::vector<GenerationResult> generated = m_impl.generate(prompts,
std::vector<GenerationConfig>{prompts.size(), config},
streamer
);
Expand All @@ -99,14 +101,44 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_replies), std::move(plain_scores)};
PerfMetrics perf_metrics;
// For GenerationResults, all perf_metrics are the same except tokenization and detokenization durations.
// Since we return here only one perf_metrics, we should accumulate all tokenization and detokenization times.
if (generated.size() > 0) {
perf_metrics = generated[0].perf_metrics;
}

// Tokenizations and detokenization times are dispersed across GenerationResult vector.
// Need to collect them into a single perf_metric for DecodedResult.
auto& raw_metrics = perf_metrics.raw_metrics;
for (size_t i = 1; i < generated.size(); ++i){
auto tok_durations = generated[i].perf_metrics.raw_metrics.tokenization_durations;
auto detok_durations = generated[i].perf_metrics.raw_metrics.detokenization_durations;
for (size_t j = 0; j < tok_durations.size(); ++j) {
raw_metrics.tokenization_durations.emplace_back(tok_durations[j]);
}
for (size_t j = 0; j < detok_durations.size(); ++j) {
raw_metrics.detokenization_durations.emplace_back(detok_durations[j]);
}
}

raw_metrics.generate_durations.clear();
raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Need to reevaluate statistics with the updated start_time which includes tokenization/detokenization durations.
perf_metrics.m_evaluated = false;
ilya-lavrenov marked this conversation as resolved.
Show resolved Hide resolved
perf_metrics.evaluate_statistics(start_time);

return {std::move(plain_replies), std::move(plain_scores), std::move(perf_metrics)};
}

EncodedResults generate(
const EncodedInputs& inputs,
OptionalGenerationConfig generation_config,
StreamerVariant streamer
) override {
// Get the currrent timestamp in order to evaluate total generate duration.
auto start_time = std::chrono::steady_clock::now();

std::vector<ov::Tensor> input_ids = std::visit(overloaded{
[](const ov::Tensor& inp) {
size_t batch_size = inp.get_shape().at(0);
Expand Down Expand Up @@ -148,15 +180,32 @@ class ContinuousBatchingAdapter final : public LLMPipelineImplBase {

const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config;
// -1 == config.eos_token_id and config.validate() are handled in m_impl.
std::vector<EncodedGenerationResult> generated = m_impl.generate(input_ids, std::vector<GenerationConfig>{input_ids.size(), config}, streamer);
std::vector<EncodedGenerationResult> generated = m_impl.generate(input_ids,
std::vector<GenerationConfig>{input_ids.size(), config},
streamer
);

std::vector<std::vector<int64_t>> plain_tokens;
std::vector<float> plain_scores;
for (EncodedGenerationResult& res : generated) {
OPENVINO_ASSERT(res.m_status == GenerationStatus::FINISHED || res.m_status == GenerationStatus::DROPPED_BY_HANDLE, "Got unfinished GenerationStatus");
std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens));
std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores));
}
return {std::move(plain_tokens), std::move(plain_scores)};

PerfMetrics perf_metrics;
// For EncodedGenerationResults, all perf_metrics are the same.
if (generated.size() > 0) {
perf_metrics = generated[0].perf_metrics;
}
auto& raw_counters = perf_metrics.raw_metrics;
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Reevaluate statistics with the new start_time which includes time for preparing inputs
perf_metrics.m_evaluated = false;
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
perf_metrics.evaluate_statistics(start_time);

return {std::move(plain_tokens), std::move(plain_scores), std::move(perf_metrics)};
}

void start_chat(const std::string& system_message) override {
Expand Down
27 changes: 24 additions & 3 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage);
_register_step_cache_usage(scheduler_output.m_cache_usage);
m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage();
m_pipeline_metrics.total_num_scheduled_tokens = scheduler_output.m_total_num_scheduled_tokens;

static ManualTimer copy_blocks_timer("scheduling");
copy_blocks_timer.start();
m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map);
copy_blocks_timer.end();
}

// if no tokens were scheduled, we are out of memory => free all requests and return
if (scheduler_output.m_total_num_scheduled_tokens == 0) {
for (size_t i = 0; i < m_requests.size(); ++i) {
Expand All @@ -170,7 +170,6 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() {
_free_non_running_requests();
return;
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
}

ov::Tensor logits;
{
static ManualTimer timer("forward");
Expand Down Expand Up @@ -257,6 +256,11 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
const StreamerVariant& streamer) {
OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());

auto start_time = std::chrono::steady_clock::now();
PerfMetrics perf_metrics;
auto& raw_perf_counters = perf_metrics.raw_metrics;
raw_perf_counters.m_inference_durations = {{ MicroSeconds(0.0f) }};

// checks that all requests has the same LoRA adapters property value
for (size_t i = 1; i < sampling_params.size(); ++i) {
Expand Down Expand Up @@ -303,13 +307,23 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
try {
const auto infer_start = std::chrono::steady_clock::now();
step();
auto num_generated_tokens = get_metrics().total_num_scheduled_tokens;
if (num_generated_tokens > 0) {
const auto infer_end = std::chrono::steady_clock::now();
const auto infer_ms = PerfMetrics::get_microsec(infer_end - infer_start);
raw_perf_counters.m_token_infer_durations.emplace_back(infer_ms);
raw_perf_counters.m_inference_durations[0] += MicroSeconds(infer_ms);
raw_perf_counters.m_new_token_times.emplace_back(infer_end);
raw_perf_counters.m_batch_sizes.emplace_back(num_generated_tokens);
}
} catch (...) {
drop_requests(); // remove all requests from pipeline state in case of exception
throw;
}

auto & generation = generations.at(0);
GenerationHandle & generation = generations.at(0);
if (streamer_ptr && generation->can_read()) {
std::unordered_map<uint64_t, GenerationOutput> token = generation->back();
for (const auto& gen_token : token.begin()->second.generated_ids) {
Expand Down Expand Up @@ -358,6 +372,13 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector<o
}

result.m_status = generations[request_id]->get_status();

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
perf_metrics.raw_metrics.generate_durations.clear();
perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
perf_metrics.evaluate_statistics(start_time);

result.perf_metrics = perf_metrics;
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
results.push_back(std::move(result));
}

Expand Down
26 changes: 23 additions & 3 deletions src/cpp/src/icontinuous_batching.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,65 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate(
std::vector<ov::genai::GenerationConfig> sampling_params,
const StreamerVariant& streamer) {
std::vector<ov::Tensor> input_ids;
auto start_time = std::chrono::steady_clock::now();

std::vector<MicroSeconds> tokenization_durations;
static ManualTimer timer("tokenize");
if (m_is_chat_conversation) {
OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts");
m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}});
constexpr bool add_generation_prompt = true;
std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt);
timer.start();
const auto encode_start = std::chrono::steady_clock::now();
// ov::genai::add_special_tokens(false) is aligned with stateful pipeline
input_ids.push_back(m_tokenizer.encode(history, ov::genai::add_special_tokens(false)).input_ids);
tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start));
timer.end();
} else {
input_ids.reserve(prompts.size());
timer.start();
for (const std::string& prompt : prompts) {
const auto encode_start = std::chrono::steady_clock::now();
input_ids.push_back(m_tokenizer.encode(prompt).input_ids);
tokenization_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - encode_start));
}
timer.end();
}

std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);

std::vector<GenerationResult> decoded;
decoded.reserve(encoded.size());
for (EncodedGenerationResult& res : encoded) {
for (size_t i = 0; i < encoded.size(); ++i) {
EncodedGenerationResult res = encoded[i];
auto& perf_metrics = res.perf_metrics;
auto& raw_counters = perf_metrics.raw_metrics;
raw_counters.tokenization_durations.emplace_back(tokenization_durations[i]);

std::vector<std::string> generated;
generated.reserve(res.m_generation_ids.size());
for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) {
const auto decode_start = std::chrono::steady_clock::now();
generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx)));
raw_counters.detokenization_durations.emplace_back(std::chrono::steady_clock::now() - decode_start);
if (m_is_chat_conversation && 0 == idx) {
m_history.push_back({{"role", "assistant"}, {"content", generated.back()}});
}
}

// The same perf metrics for each sequence, only tokenization/detokenization will differ.
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
perf_metrics.raw_metrics.generate_durations.clear();
perf_metrics.raw_metrics.generate_durations.emplace_back(PerfMetrics::get_microsec(std::chrono::steady_clock::now() - start_time));
// Reevaluate taking into accound tokenization/detokenization times.
perf_metrics.m_evaluated = false;
perf_metrics.evaluate_statistics(start_time);

decoded.push_back(GenerationResult{
res.m_request_id,
std::move(generated),
std::move(res.m_scores),
res.m_status
res.m_status,
perf_metrics,
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/llm_pipeline_stateful.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ DecodedResults StatefulLLMPipeline::generate(

auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/src/llm_pipeline_static.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ DecodedResults StatefulLLMPipeline::generate(
decoded_results.perf_metrics = encoded_results.perf_metrics;
auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down Expand Up @@ -1220,7 +1220,7 @@ DecodedResults StatelessLLMPipeline::generate(
decoded_results.perf_metrics = encoded_results.perf_metrics;
auto& raw_counters = decoded_results.perf_metrics.raw_metrics;
auto stop_time = std::chrono::steady_clock::now();
raw_counters.generate_durations = std::vector<MicroSeconds>();
raw_counters.generate_durations.clear();
raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(stop_time - start_time));
raw_counters.tokenization_durations.emplace_back(PerfMetrics::get_microsec(encode_stop_time - start_time));
raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_stop_time - decode_start_time));
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/src/perf_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void PerfMetrics::evaluate_statistics(std::optional<TimePoint> start_time) {
return;
}
// If start_item is specified then recalculate durations according to start times and calculate statistics only after that.
if (start_time.has_value()) {
if (start_time.has_value() && raw_metrics.m_new_token_times.size() > 0 && raw_metrics.m_batch_sizes.size() > 0) {
auto start_time_val = *start_time;
auto& tok_times = raw_metrics.m_new_token_times;
auto& batch_sizes = raw_metrics.m_batch_sizes;
Expand Down
16 changes: 16 additions & 0 deletions src/python/openvino_genai/py_openvino_genai.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ class EncodedGenerationResult:
IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued.
DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality.
DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped.
perf_metrics:
Performance metrics for each generation result.

"""
m_generation_ids: list[list[int]]
Expand All @@ -449,6 +451,9 @@ class EncodedGenerationResult:
@property
def m_request_id(self) -> int:
...
@property
def perf_metrics(self) -> PerfMetrics:
...
class EncodedResults:
"""

Expand Down Expand Up @@ -693,6 +698,8 @@ class GenerationResult:
IGNORED = 2 - Status set when generation run into out-of-memory condition and could not be continued.
DROPPED_BY_PIPELINE = 3 - Currently not used, TODO: implement abort functionality.
DROPPED_BY_HANDLE = 4 - Status set when generation handle is dropped.
perf_metrics:
Performance metrics for each generation result.

"""
m_generation_ids: list[str]
Expand All @@ -707,6 +714,9 @@ class GenerationResult:
@property
def m_request_id(self) -> int:
...
@property
def perf_metrics(self) -> PerfMetrics:
...
class GenerationStatus:
"""
Members:
Expand Down Expand Up @@ -1211,6 +1221,9 @@ class PipelineMetrics:

:param avg_cache_usage: Running average of the KV cache usage (in %) during the lifetime of the pipeline, with max window size of 1000 steps
:type avg_cache_usage: float

:param total_num_scheduled_tokens: Number of tokens scheduled for processing at the previous step of the pipeline.
:type total_num_scheduled_tokens: int
"""
def __init__(self) -> None:
...
Expand All @@ -1229,6 +1242,9 @@ class PipelineMetrics:
@property
def scheduled_requests(self) -> int:
...
@property
def total_num_scheduled_tokens(self) -> int:
...
class RawPerfMetrics:
"""

Expand Down
Loading
Loading