Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@

from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore
from portkey_ai.api_resources.apis.chat_complete import Completions
from portkey_ai.api_resources.apis.chat_complete import (
AsyncCompletions,
Completions,
)
from wrapt import wrap_function_wrapper

from openinference.instrumentation import OITracer, TraceConfig
from openinference.instrumentation.portkey._wrappers import _CompletionsWrapper
from openinference.instrumentation.portkey._wrappers import (
_AsyncCompletionsWrapper,
_CompletionsWrapper,
)
from openinference.instrumentation.portkey.version import __version__

logger = logging.getLogger(__name__)
Expand All @@ -20,7 +26,7 @@
class PortkeyInstrumentor(BaseInstrumentor): # type: ignore[misc]
"""An instrumentor for the Portkey AI framework."""

__slots__ = ("_original_completions_create", "_tracer")
__slots__ = ("_original_completions_create", "_original_async_completions_create", "_tracer")

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Expand Down Expand Up @@ -49,7 +55,22 @@ def _instrument(self, **kwargs: Any) -> None:
wrapper=_CompletionsWrapper(tracer=self._tracer),
)

self._original_async_completions_create = AsyncCompletions.create
wrap_function_wrapper(
module="portkey_ai.api_resources.apis.chat_complete",
name="AsyncCompletions.create",
wrapper=_AsyncCompletionsWrapper(tracer=self._tracer),
)
wrap_function_wrapper(
module="portkey_ai.api_resources.apis.generation",
name="AsyncCompletions.create",
wrapper=_AsyncCompletionsWrapper(tracer=self._tracer),
)

def _uninstrument(self, **kwargs: Any) -> None:
portkey_module = import_module("portkey_ai.api_resources.apis.chat_complete")
if self._original_completions_create is not None:
portkey_module.Completions.create = self._original_completions_create

if self._original_async_completions_create is not None:
portkey_module.AsyncCompletions.create = self._original_async_completions_create
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incomplete uninstrumentation leaves generation wrap active

The _uninstrument method does not properly restore AsyncCompletions.create from the generation module. During instrumentation (lines 64-68), AsyncCompletions.create is wrapped in both portkey_ai.api_resources.apis.chat_complete and portkey_ai.api_resources.apis.generation modules. However, the uninstrument method only imports and restores from the chat_complete module (lines 71-76), leaving the generation module's AsyncCompletions.create method wrapped even after uninstrumentation. This will cause the instrumentation to remain active for async completions called through the generation module even after calling uninstrument().

Fix in Cursor Fix in Web

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is true, we would need to restore the sync function, in addition to the async function.

Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,62 @@ def __call__(
except Exception:
logger.exception(f"Failed to finalize response of type {type(response)}")
span.finish_tracing()
finally:
return response
return response


class _AsyncCompletionsWrapper(_WithTracer):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._request_extractor = _RequestAttributesExtractor()
self._response_extractor = _ResponseAttributesExtractor()

async def __call__(
self,
wrapped: Callable[..., Any],
instance: Any,
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
) -> Any:
if context_api.get_value(context_api._SUPPRESS_INSTRUMENTATION_KEY):
return await wrapped(*args, **kwargs)

# Prepare invocation parameters by merging args and kwargs
invocation_parameters = {**kwargs}
for arg in args:
if isinstance(arg, dict):
invocation_parameters.update(arg)

request_parameters = _parse_args(signature(wrapped), *args, **kwargs)
span_name = "AsyncCompletions"
with self._start_as_current_span(
span_name=span_name,
attributes=self._request_extractor.get_attributes_from_request(request_parameters),
context_attributes=get_attributes_from_context(),
extra_attributes=self._request_extractor.get_extra_attributes_from_request(
request_parameters
),
) as span:
try:
response = await wrapped(*args, **kwargs)
except Exception as exception:
span.record_exception(exception)
status = trace_api.Status(
status_code=trace_api.StatusCode.ERROR,
description=f"{type(exception).__name__}: {exception}",
)
span.finish_tracing(status=status)
raise
try:
_finish_tracing(
with_span=span,
attributes=self._response_extractor.get_attributes(response),
extra_attributes=self._response_extractor.get_extra_attributes(
response=response,
request_parameters=request_parameters,
),
status=trace_api.Status(status_code=trace_api.StatusCode.OK),
)
except Exception:
logger.exception(f"Failed to finalize response of type {type(response)}")
span.finish_tracing()
return response
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"What''s the weather like?"}],"model":"gpt-4o-mini"}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
authorization:
- Bearer OPENAI_API_KEY
connection:
- keep-alive
content-length:
- '89'
content-type:
- application/json
host:
- api.portkey.ai
user-agent:
- AsyncOpenAI/Python 2.2.0
x-stainless-arch:
- arm64
x-stainless-async:
- async:asyncio
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 2.2.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.10.0
method: POST
uri: https://api.portkey.ai/v1/chat/completions
response:
body:
string: '{"id":"chatcmpl-AsyncTest123456789","object":"chat.completion","created":1743882894,"model":"gpt-4o-mini-2024-07-18","choices":[{"index":0,"message":{"role":"assistant","content":"I don''t have real-time data access to provide current weather updates. However, you can check a weather website or app for the latest information in your area. If you tell me your location, I can suggest typical weather patterns for this time of year!","refusal":null,"annotations":[]},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":12,"completion_tokens":51,"total_tokens":63,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"service_tier":"default","system_fingerprint":"fp_b376dfbbd5"}'
headers:
CF-RAY:
- 99984826acb4cb94-LAX
Connection:
- keep-alive
Content-Length:
- '694'
Content-Type:
- application/json;charset=UTF-8
Date:
- Wed, 05 Nov 2025 00:37:18 GMT
Server:
- cloudflare
Vary:
- accept-encoding
alt-svc:
- h3=":443"; ma=86400
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
interactions:
- request:
body: '{"variables":{"location":"New York City"},"stream":false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '57'
content-type:
- application/json
host:
- api.portkey.ai
user-agent:
- python-httpx/0.28.1
method: POST
uri: https://api.portkey.ai/v1/prompts/pp-weather-pr-b74c4f/completions
response:
body:
string: '{"id":"chatcmpl-AsyncPromptTest789","object":"chat.completion","created":1743882894,"model":"gpt-4.1-2025-04-14","choices":[{"finish_reason":"stop","index":0,"message":{"content":"Based on the current weather data, New York City is experiencing partly cloudy skies with a temperature of around 72°F (22°C). There''s a light breeze from the northwest at about 8 mph. Humidity levels are moderate at around 55%. It''s a pleasant spring day overall with no precipitation expected in the immediate forecast.","role":"assistant","refusal":null,"annotations":[]},"logprobs":null}],"usage":{"completion_tokens":66,"prompt_tokens":35,"total_tokens":101,"prompt_tokens_details":{"cached_tokens":0},"completion_tokens_details":{"reasoning_tokens":0}},"service_tier":"default","system_fingerprint":"fp_test123"}'
headers:
CF-Cache-Status:
- DYNAMIC
CF-Ray:
- 943debc01e8b1492-EWR
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 22 May 2025 17:09:26 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- arize-ai-ewa7w1
openai-processing-ms:
- '3213'
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '3221'
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '30000000'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '29999965'
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_24c239d275655c5f601e70346c6cd0f6
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,118 @@ def test_prompt_template(

for key, expected_value in expected_attributes.items():
assert attributes.get(key) == expected_value


@pytest.mark.asyncio
@pytest.mark.vcr(
before_record_request=lambda request: setattr( # type: ignore[func-returns-value]
request,
"headers",
{k: v for k, v in request.headers.items() if not k.lower().startswith("x-portkey")},
)
or request,
before_record_response=lambda response: {
**response,
"headers": {
k: v for k, v in response["headers"].items() if not k.lower().startswith("x-portkey")
},
},
)
async def test_async_chat_completion(
in_memory_span_exporter: InMemorySpanExporter,
tracer_provider: trace_api.TracerProvider,
setup_portkey_instrumentation: None,
) -> None:
portkey = import_module("portkey_ai")
client = portkey.AsyncPortkey(
api_key="REDACTED",
virtual_key="REDACTED",
)
resp = await client.chat.completions.create(
messages=[{"role": "user", "content": "What's the weather like?"}], model="gpt-4o-mini"
)
spans = in_memory_span_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.name == "AsyncCompletions"
attributes = dict(span.attributes or {})

expected_attributes = {
f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_ROLE}": "user",
f"{SpanAttributes.LLM_INPUT_MESSAGES}.0."
f"{MessageAttributes.MESSAGE_CONTENT}": "What's the weather like?",
SpanAttributes.OUTPUT_MIME_TYPE: "application/json",
SpanAttributes.INPUT_MIME_TYPE: "application/json",
SpanAttributes.LLM_MODEL_NAME: "gpt-4o-mini-2024-07-18",
SpanAttributes.LLM_TOKEN_COUNT_TOTAL: resp.usage.total_tokens,
SpanAttributes.LLM_TOKEN_COUNT_PROMPT: resp.usage.prompt_tokens,
SpanAttributes.LLM_TOKEN_COUNT_COMPLETION: resp.usage.completion_tokens,
f"{SpanAttributes.LLM_OUTPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_ROLE}": resp.choices[
0
].message.role,
f"{SpanAttributes.LLM_OUTPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_CONTENT}": resp.choices[
0
].message.content,
SpanAttributes.OPENINFERENCE_SPAN_KIND: "LLM",
}

for key, expected_value in expected_attributes.items():
assert attributes.get(key) == expected_value


@pytest.mark.asyncio
@pytest.mark.vcr(
before_record_request=lambda request: setattr( # type: ignore[func-returns-value]
request,
"headers",
{k: v for k, v in request.headers.items() if not k.lower().startswith("x-portkey")},
)
or request,
before_record_response=lambda response: {
**response,
"headers": {
k: v for k, v in response["headers"].items() if not k.lower().startswith("x-portkey")
},
},
)
async def test_async_prompt_template(
in_memory_span_exporter: InMemorySpanExporter,
tracer_provider: trace_api.TracerProvider,
setup_portkey_instrumentation: None,
) -> None:
prompt_id = "pp-weather-pr-b74c4f"
portkey = import_module("portkey_ai")
variables = {"location": "New York City"}
client = portkey.AsyncPortkey(
api_key="REDACTED",
virtual_key="REDACTED",
)
resp = await client.prompts.completions.create(
prompt_id=prompt_id,
variables=variables,
)

spans = in_memory_span_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.name == "AsyncCompletions"
attributes = dict(span.attributes or {})

expected_attributes = {
SpanAttributes.OUTPUT_MIME_TYPE: "application/json",
SpanAttributes.INPUT_MIME_TYPE: "application/json",
SpanAttributes.LLM_MODEL_NAME: "gpt-4.1-2025-04-14",
SpanAttributes.LLM_TOKEN_COUNT_TOTAL: resp.usage.total_tokens,
SpanAttributes.LLM_TOKEN_COUNT_PROMPT: resp.usage.prompt_tokens,
SpanAttributes.LLM_TOKEN_COUNT_COMPLETION: resp.usage.completion_tokens,
SpanAttributes.PROMPT_ID: prompt_id,
SpanAttributes.LLM_PROMPT_TEMPLATE_VARIABLES: json.dumps(variables),
f"{SpanAttributes.LLM_OUTPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_ROLE}": "assistant",
f"{SpanAttributes.LLM_OUTPUT_MESSAGES}.0.{MessageAttributes.MESSAGE_CONTENT}": resp.choices[
0
].message.content,
SpanAttributes.OPENINFERENCE_SPAN_KIND: "LLM",
}

for key, expected_value in expected_attributes.items():
assert attributes.get(key) == expected_value