From 7e6b3fa2cf77f0f748cf194bf05fa0ab73c3d672 Mon Sep 17 00:00:00 2001 From: Guillaume Fradet Date: Fri, 26 Sep 2025 10:35:16 +0200 Subject: [PATCH] Count read/write cache tokens with anthropic streaming --- .../instrumentation/anthropic/_stream.py | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/python/instrumentation/openinference-instrumentation-anthropic/src/openinference/instrumentation/anthropic/_stream.py b/python/instrumentation/openinference-instrumentation-anthropic/src/openinference/instrumentation/anthropic/_stream.py index 2e450d043a..b5e19476ea 100644 --- a/python/instrumentation/openinference-instrumentation-anthropic/src/openinference/instrumentation/anthropic/_stream.py +++ b/python/instrumentation/openinference-instrumentation-anthropic/src/openinference/instrumentation/anthropic/_stream.py @@ -249,6 +249,8 @@ def __init__(self) -> None: ), stop_reason=_SimpleStringReplace(), input_tokens=_SimpleStringReplace(), + cache_creation_input_tokens=_SimpleStringReplace(), + cache_read_input_tokens=_SimpleStringReplace(), output_tokens=_SimpleStringReplace(), ), ), @@ -272,6 +274,13 @@ def process_chunk(self, chunk: "RawContentBlockDeltaEvent") -> None: "input_tokens": str(chunk.message.usage.input_tokens), } } + # Capture Anthropic prompt cache usage details when present + cache_write = getattr(chunk.message.usage, "cache_creation_input_tokens", None) + cache_read = getattr(chunk.message.usage, "cache_read_input_tokens", None) + if cache_write is not None: + value["messages"]["cache_creation_input_tokens"] = str(cache_write) + if cache_read is not None: + value["messages"]["cache_read_input_tokens"] = str(cache_read) self._values += value elif isinstance(chunk, RawContentBlockStartEvent): self._current_content_block_type = chunk.content_block @@ -341,6 +350,8 @@ def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: idx = 0 total_completion_token_count = 0 total_prompt_token_count = 0 + total_cache_read_token_count = 0 + total_cache_write_token_count = 0 # TODO(harrison): figure out if we should always assume messages is 1. # The current non streaming implementation assumes the same for message in messages: @@ -353,6 +364,10 @@ def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: total_completion_token_count += int(output_tokens) if input_tokens := message.get("input_tokens"): total_prompt_token_count += int(input_tokens) + if cache_read_tokens := message.get("cache_read_input_tokens"): + total_cache_read_token_count += int(cache_read_tokens) + if cache_write_tokens := message.get("cache_creation_input_tokens"): + total_cache_write_token_count += int(cache_write_tokens) # TODO(harrison): figure out if we should always assume the first message # will always be a message output generally this block feels really @@ -377,10 +392,23 @@ def get_extra_attributes(self) -> Iterator[Tuple[str, AttributeValue]]: tool_idx += 1 idx += 1 yield SpanAttributes.LLM_TOKEN_COUNT_COMPLETION, total_completion_token_count - yield SpanAttributes.LLM_TOKEN_COUNT_PROMPT, total_prompt_token_count + total_prompt_with_cache = ( + total_prompt_token_count + total_cache_read_token_count + total_cache_write_token_count + ) + yield SpanAttributes.LLM_TOKEN_COUNT_PROMPT, total_prompt_with_cache + if total_cache_read_token_count: + yield ( + SpanAttributes.LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHE_READ, + total_cache_read_token_count, + ) + if total_cache_write_token_count: + yield ( + SpanAttributes.LLM_TOKEN_COUNT_PROMPT_DETAILS_CACHE_WRITE, + total_cache_write_token_count, + ) yield ( SpanAttributes.LLM_TOKEN_COUNT_TOTAL, - total_completion_token_count + total_prompt_token_count, + total_completion_token_count + total_prompt_with_cache, )