Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Features
| **[Fixed Schedule](docs/tutorials/fixed-schedule.md)** | Precise timestamp-based request execution | Traffic replay, temporal analysis, burst testing |
| **[Time-based Benchmarking](docs/tutorials/time-based-benchmarking.md)** | Duration-based testing with grace period control | Stability testing, sustained performance |

### Working with Benchmark Data
- **[Profile Exports](docs/tutorials/working-with-profile-exports.md)** - Parse and analyze `profile_export.jsonl` with Pydantic models, custom metrics, and async processing

### Quick Navigation
```bash
# Basic profiling
Expand Down
2 changes: 1 addition & 1 deletion aiperf/common/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@
"load_user_config",
"parse_file",
"parse_service_types",
"parse_str_as_numeric_dict",
"parse_str_or_csv_list",
"parse_str_or_dict_as_tuple_list",
"parse_str_or_list",
"parse_str_or_list_of_positive_values",
"parse_str_as_numeric_dict",
"print_developer_mode_warning",
"print_str_or_list",
]
4 changes: 3 additions & 1 deletion aiperf/common/config/config_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AudioFormat,
CommunicationBackend,
EndpointType,
ExportLevel,
ImageFormat,
ModelSelectionStrategy,
RequestRateMode,
Expand Down Expand Up @@ -114,12 +115,13 @@ class TurnDelayDefaults:
@dataclass(frozen=True)
class OutputDefaults:
ARTIFACT_DIRECTORY = Path("./artifacts")
PROFILE_EXPORT_FILE = Path("profile_export.json")
PROFILE_EXPORT_FILE = Path("profile_export.jsonl")
LOG_FOLDER = Path("logs")
LOG_FILE = Path("aiperf.log")
INPUTS_JSON_FILE = Path("inputs.json")
PROFILE_EXPORT_AIPERF_CSV_FILE = Path("profile_export_aiperf.csv")
PROFILE_EXPORT_AIPERF_JSON_FILE = Path("profile_export_aiperf.json")
EXPORT_LEVEL = ExportLevel.RECORDS


@dataclass(frozen=True)
Expand Down
16 changes: 16 additions & 0 deletions aiperf/common/config/output_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aiperf.common.config.cli_parameter import CLIParameter
from aiperf.common.config.config_defaults import OutputDefaults
from aiperf.common.config.groups import Groups
from aiperf.common.enums import ExportLevel


class OutputConfig(BaseConfig):
Expand All @@ -32,3 +33,18 @@ class OutputConfig(BaseConfig):
group=_CLI_GROUP,
),
] = OutputDefaults.ARTIFACT_DIRECTORY

profile_export_file: Annotated[
Path,
Field(
description="The file to store the profile export in JSONL format.",
),
CLIParameter(
name=("--profile-export-file",),
group=_CLI_GROUP,
),
] = OutputDefaults.PROFILE_EXPORT_FILE

@property
def export_level(self) -> ExportLevel:
return ExportLevel.RECORDS
3 changes: 3 additions & 0 deletions aiperf/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@

GOOD_REQUEST_COUNT_TAG = "good_request_count"
"""GoodRequestCount metric tag"""

DEFAULT_RECORD_EXPORT_BATCH_SIZE = 100
"""Default batch size for record export results processor."""
4 changes: 2 additions & 2 deletions aiperf/common/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from aiperf.common.enums.data_exporter_enums import (
ConsoleExporterType,
DataExporterType,
ExportLevel,
)
from aiperf.common.enums.dataset_enums import (
AudioFormat,
Expand Down Expand Up @@ -53,7 +54,6 @@
BaseMetricUnit,
BaseMetricUnitInfo,
GenericMetricUnit,
MetricDateTimeUnit,
MetricFlags,
MetricOverTimeUnit,
MetricOverTimeUnitInfo,
Expand Down Expand Up @@ -122,12 +122,12 @@
"EndpointServiceKind",
"EndpointType",
"EndpointTypeInfo",
"ExportLevel",
"GenericMetricUnit",
"ImageFormat",
"LifecycleState",
"MediaType",
"MessageType",
"MetricDateTimeUnit",
"MetricFlags",
"MetricOverTimeUnit",
"MetricOverTimeUnitInfo",
Expand Down
13 changes: 13 additions & 0 deletions aiperf/common/enums/data_exporter_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,16 @@ class ConsoleExporterType(CaseInsensitiveStrEnum):
class DataExporterType(CaseInsensitiveStrEnum):
JSON = "json"
CSV = "csv"


class ExportLevel(CaseInsensitiveStrEnum):
"""Export level for benchmark data."""

SUMMARY = "summary"
"""Export only aggregated/summarized metrics (default, most compact)"""

RECORDS = "records"
"""Export per-record metrics after aggregation with display unit conversion"""

RAW = "raw"
"""Export raw parsed records with full request/response data (most detailed)"""
20 changes: 5 additions & 15 deletions aiperf/common/enums/metric_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0

from collections.abc import Callable
from datetime import datetime
from enum import Flag
from functools import cached_property
from typing import TYPE_CHECKING, Any, TypeAlias, TypeVar
Expand Down Expand Up @@ -169,16 +168,9 @@ def long_name(self) -> str:

def convert_to(self, other_unit: "MetricUnitT", value: int | float) -> float:
"""Convert a value from this unit to another unit."""
if not isinstance(
other_unit, MetricTimeUnit | MetricTimeUnitInfo | MetricDateTimeUnit
):
if not isinstance(other_unit, MetricTimeUnit | MetricTimeUnitInfo):
return super().convert_to(other_unit, value)

if isinstance(other_unit, MetricDateTimeUnit):
return datetime.fromtimestamp(
self.convert_to(MetricTimeUnit.SECONDS, value)
)

return value * (other_unit.per_second / self.per_second)


Expand All @@ -197,12 +189,6 @@ class GenericMetricUnit(BaseMetricUnit):
USER = _unit("user")


class MetricDateTimeUnit(BaseMetricUnit):
"""Defines the various date time units that can be used for metrics."""

DATE_TIME = _unit("datetime")


class MetricOverTimeUnitInfo(BaseMetricUnitInfo):
"""Information about a metric over time unit."""

Expand Down Expand Up @@ -445,6 +431,10 @@ class MetricFlags(Flag):
GOODPUT = 1 << 10
"""Metrics that are only applicable when goodput feature is enabled"""

NO_INDIVIDUAL_RECORDS = 1 << 11
"""Metrics that should not be exported for individual records. These are typically aggregate metrics.
This is used to filter out metrics such as request count or min/max timestamps that are not relevant to individual records."""

def has_flags(self, flags: "MetricFlags") -> bool:
"""Return True if the metric has ALL of the given flag(s) (regardless of other flags)."""
# Bitwise AND will return the input flags only if all of the given flags are present.
Expand Down
17 changes: 15 additions & 2 deletions aiperf/common/enums/post_processor_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@


class RecordProcessorType(CaseInsensitiveStrEnum):
"""Type of streaming record processor."""
"""Type of streaming record processor.

Record processors are responsible for streaming records and computing metrics from MetricType.RECORD and MetricType.AGGREGATE.
This is the first stage of the processing pipeline, and is done is a distributed manner across multiple service instances.
"""

METRIC_RECORD = "metric_record"
"""Streamer that streams records and computes metrics from MetricType.RECORD and MetricType.AGGREGATE.
This is the first stage of the metrics processing pipeline, and is done is a distributed manner across multiple service instances."""


class ResultsProcessorType(CaseInsensitiveStrEnum):
"""Type of streaming results processor."""
"""Type of streaming results processor.

Results processors are responsible for processing results from RecordProcessors and computing metrics from MetricType.DERIVED.
as well as aggregating the results.
This is the last stage of the processing pipeline, and is done from the single instance of the RecordsManager.
"""

METRIC_RESULTS = "metric_results"
"""Processor that processes the metric results from METRIC_RECORD and computes metrics from MetricType.DERIVED. as well as aggregates the results.
This is the last stage of the metrics processing pipeline, and is done from the RecordsManager after all the service instances have completed their processing."""

RECORD_EXPORT = "record_export"
"""Processor that exports per-record metrics to JSONL files with display unit conversion and filtering.
Only enabled when export_level is set to RECORDS."""
4 changes: 4 additions & 0 deletions aiperf/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class NoMetricValue(AIPerfError):
"""Raised when a metric value is not available."""


class PostProcessorDisabled(AIPerfError):
"""Raised when initializing a post processor to indicate to the caller that it is disabled and should not be used."""


class ProxyError(AIPerfError):
"""Exception raised when a proxy encounters an error."""

Expand Down
2 changes: 2 additions & 0 deletions aiperf/common/messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
)
from aiperf.common.messages.inference_messages import (
InferenceResultsMessage,
MetricRecordsData,
MetricRecordsMessage,
RealtimeMetricsMessage,
)
Expand Down Expand Up @@ -107,6 +108,7 @@
"HeartbeatMessage",
"InferenceResultsMessage",
"Message",
"MetricRecordsData",
"MetricRecordsMessage",
"ProcessRecordsCommand",
"ProcessRecordsResponse",
Expand Down
10 changes: 9 additions & 1 deletion aiperf/common/messages/credit_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ class CreditDropMessage(BaseServiceMessage):
default_factory=lambda: str(uuid.uuid4()),
description="The ID of the credit drop, that will be used as the X-Correlation-ID header.",
)
phase: CreditPhase = Field(..., description="The type of credit phase")
phase: CreditPhase = Field(
..., description="The type of credit phase, such as warmup or profiling."
)
credit_num: int = Field(
...,
ge=0,
description="The sequential number of the credit in the credit phase. This is used to track the progress of the credit phase,"
" as well as the order that requests are sent in.",
)
conversation_id: str | None = Field(
default=None, description="The ID of the conversation, if applicable."
)
Expand Down
73 changes: 48 additions & 25 deletions aiperf/common/messages/inference_messages.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from pydantic import (
Field,
SerializeAsAny,
)

from aiperf.common.enums import (
CreditPhase,
MessageType,
)
from pydantic import Field, SerializeAsAny

from aiperf.common.aiperf_logger import AIPerfLogger
from aiperf.common.enums import MessageType
from aiperf.common.enums.metric_enums import MetricValueTypeT
from aiperf.common.messages.service_messages import BaseServiceMessage
from aiperf.common.models import ErrorDetails, RequestRecord
from aiperf.common.models.record_models import MetricResult
from aiperf.common.models.base_models import AIPerfBaseModel
from aiperf.common.models.record_models import MetricRecordMetadata, MetricResult
from aiperf.common.types import MessageTypeT, MetricTagT

_logger = AIPerfLogger(__name__)


class InferenceResultsMessage(BaseServiceMessage):
"""Message for a inference results."""
Expand All @@ -27,29 +25,36 @@ class InferenceResultsMessage(BaseServiceMessage):
)


class MetricRecordsData(AIPerfBaseModel):
"""Incoming data from the record processor service to combine metric records for the profile run."""

metadata: MetricRecordMetadata = Field(
..., description="The metadata of the request record."
)
metrics: dict[MetricTagT, MetricValueTypeT] = Field(
..., description="The combined metric records for this inference request."
)
error: ErrorDetails | None = Field(
default=None, description="The error details if the request failed."
)

@property
def valid(self) -> bool:
"""Whether the request was valid."""
return self.error is None


class MetricRecordsMessage(BaseServiceMessage):
"""Message from the result parser to the records manager to notify it
of the metric records for a single request."""

message_type: MessageTypeT = MessageType.METRIC_RECORDS

timestamp_ns: int = Field(
..., description="The wall clock timestamp of the request in nanoseconds."
)
x_request_id: str | None = Field(
default=None, description="The X-Request-ID header of the request."
)
x_correlation_id: str | None = Field(
default=None, description="The X-Correlation-ID header of the request."
)
worker_id: str = Field(
..., description="The ID of the worker that processed the request."
)
credit_phase: CreditPhase = Field(
..., description="The credit phase of the request."
metadata: MetricRecordMetadata = Field(
..., description="The metadata of the request record."
)
results: list[dict[MetricTagT, MetricValueTypeT]] = Field(
..., description="The record processor results"
..., description="The record processor metric results"
)
error: ErrorDetails | None = Field(
default=None, description="The error details if the request failed."
Expand All @@ -60,6 +65,24 @@ def valid(self) -> bool:
"""Whether the request was valid."""
return self.error is None

def to_data(self) -> MetricRecordsData:
"""Convert the metric records message to a MetricRecordsData for processing by the records manager."""
metrics = {}
for result in self.results:
for tag, value in result.items():
if tag in metrics:
_logger.warning(
f"Duplicate metric tag '{tag}' found in results. "
f"Overwriting previous value {metrics[tag]} with {value}."
)
metrics[tag] = value

return MetricRecordsData(
metadata=self.metadata,
metrics=metrics,
error=self.error,
)


class RealtimeMetricsMessage(BaseServiceMessage):
"""Message from the records manager to show real-time metrics for the profile run."""
Expand Down
6 changes: 6 additions & 0 deletions aiperf/common/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
BaseResponseData,
EmbeddingResponseData,
InferenceServerResponse,
MetricRecordInfo,
MetricRecordMetadata,
MetricResult,
MetricValue,
ParsedResponse,
ParsedResponseRecord,
ProcessRecordsResult,
Expand Down Expand Up @@ -90,7 +93,10 @@
"InferenceServerResponse",
"InputsFile",
"Media",
"MetricRecordInfo",
"MetricRecordMetadata",
"MetricResult",
"MetricValue",
"ParsedResponse",
"ParsedResponseRecord",
"ProcessHealth",
Expand Down
3 changes: 2 additions & 1 deletion aiperf/common/models/base_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class AIPerfBaseModel(BaseModel):
are None. This is set by the @exclude_if_none decorator.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)
# Allow extras by default to be more flexible for end users
model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")

@model_serializer
def _serialize_model(self) -> dict[str, Any]:
Expand Down
Loading