Skip to content

Commit

Permalink
[OPIK-337] anthropic integration (#571)
Browse files Browse the repository at this point in the history
* Initial implementation for [Anthropic/AsyncAnthropic].messages.create

* Add library integration tests for anthropic

* Add base url to metadata

* Fix lint errors

* Update docstring for track_anthropic

* Add anthropic integration tests github workflow

* Implement support for messages.stream (sync mode)

* Draft async stream support

* Rename stream_wrappers -> stream_patchers

* Implement async stream support for messages

* Implement async streaming support

* Add integration tests for anthropic messages streams in async mode

* Fix lint errors

* Add a warning message when batches.create is called for tracked anthropic client

* Rename opik_tracked_client -> opik_tracked

* Make some renamings

* Implement customized client properties logging for vertex and bedrock providers

* Fix test, small refactor

* Update track_anthropic docstring

* Fix lint errors

* Add support for stream=True argument

* Add integration test for create(..., stream=True)

* Refactor stream wrappers, add support for AsyncStream return type

* Fix lint errors

* Add warning for completion calls

* Update docstrings
  • Loading branch information
alexkuzmik authored Nov 14, 2024
1 parent 84e34de commit dc8685c
Show file tree
Hide file tree
Showing 14 changed files with 1,752 additions and 5 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/lib-anthropic-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Workflow to run Anthropic tests
#
# Please read inputs to provide correct values.
#
name: SDK Lib Anthropic Tests
run-name: "SDK Lib Anthropic Tests ${{ github.ref_name }} by @${{ github.actor }}"
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
on:
workflow_call:

jobs:
tests:
name: Anthropic Python ${{matrix.python_version}}
runs-on: ubuntu-latest
defaults:
run:
working-directory: sdks/python

strategy:
fail-fast: true
matrix:
python_version: ["3.8", "3.12"]

steps:
- name: Check out code
uses: actions/checkout@v4

- name: Setup Python ${{matrix.python_version}}
uses: actions/setup-python@v5
with:
python-version: ${{matrix.python_version}}

- name: Install opik
run: pip install .

- name: Install test tools
run: |
cd ./tests
pip install --no-cache-dir --disable-pip-version-check -r test_requirements.txt
- name: Install lib
run: |
cd ./tests
pip install --no-cache-dir --disable-pip-version-check -r library_integration/anthropic/requirements.txt
- name: Run tests
run: |
cd ./tests/library_integration/anthropic/
python -m pytest -vv .
7 changes: 7 additions & 0 deletions .github/workflows/lib-integration-tests-runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
- openai
- langchain
- llama_index
- anthropic
schedule:
- cron: "0 0 */1 * *"
pull_request:
Expand Down Expand Up @@ -55,3 +56,9 @@ jobs:
uses: ./.github/workflows/lib-llama-index-tests.yml
secrets: inherit

anthropic_tests:
needs: [init_environment]
if: contains(fromJSON('["anthropic", "all"]'), needs.init_environment.outputs.LIBS)
uses: ./.github/workflows/lib-anthropic-tests.yml
secrets: inherit

4 changes: 4 additions & 0 deletions sdks/python/src/opik/integrations/anthropic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .opik_tracker import track_anthropic


__all__ = ["track_anthropic"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import functools

from typing import Callable, Any


def warning_decorator(message: str, logger: logging.Logger) -> Callable:
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.warning(message)
return func(*args, **kwargs)

return wrapper

return decorator
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from typing import List, Any, Dict, Optional, Callable, Tuple, Union
from opik.types import SpanType
from opik.decorator import base_track_decorator, arguments_helpers
from opik import dict_utils

import anthropic
from anthropic.types import Message as AnthropicMessage

from . import stream_patchers

LOGGER = logging.getLogger(__name__)

KWARGS_KEYS_TO_LOG_AS_INPUTS = ["messages", "system", "tools"]
RESPONSE_KEYS_TO_LOG_AS_OUTPUT = ["content"]


class AnthropicMessagesCreateDecorator(base_track_decorator.BaseTrackDecorator):
"""
An implementation of BaseTrackDecorator designed specifically for tracking
calls of `[Anthropic.AsyncAnthropic].messages.create` method.
"""

def _start_span_inputs_preprocessor(
self,
func: Callable,
name: Optional[str],
type: SpanType,
tags: Optional[List[str]],
metadata: Optional[Dict[str, Any]],
capture_input: bool,
args: Optional[Tuple],
kwargs: Optional[Dict[str, Any]],
project_name: Optional[str],
) -> arguments_helpers.StartSpanParameters:
assert (
kwargs is not None
), "Expected kwargs to be not None in Antropic.messages.create(**kwargs)"
metadata = metadata if metadata is not None else {}

input, metadata_from_kwargs = dict_utils.split_dict_by_keys(
kwargs, KWARGS_KEYS_TO_LOG_AS_INPUTS
)
metadata.update(metadata_from_kwargs)
metadata["created_from"] = "anthropic"
tags = ["anthropic"]
name = name if name is not None else func.__name__

result = arguments_helpers.StartSpanParameters(
name=name,
input=input,
type=type,
tags=tags,
metadata=metadata,
project_name=project_name,
)

return result

def _end_span_inputs_preprocessor(
self, output: AnthropicMessage, capture_output: bool
) -> arguments_helpers.EndSpanParameters:
usage = {
"prompt_tokens": output.usage.input_tokens,
"completion_tokens": output.usage.output_tokens,
"total_tokens": output.usage.input_tokens + output.usage.output_tokens,
}

output_dict = output.model_dump()
span_output, metadata = dict_utils.split_dict_by_keys(
output_dict, RESPONSE_KEYS_TO_LOG_AS_OUTPUT
)

result = arguments_helpers.EndSpanParameters(
output=span_output, usage=usage, metadata=metadata
)

return result

def _generators_handler(
self,
output: Any,
capture_output: bool,
generations_aggregator: Optional[Callable[[List[Any]], Any]],
) -> Union[
None, anthropic.MessageStreamManager, anthropic.AsyncMessageStreamManager
]:
if isinstance(output, anthropic.MessageStreamManager):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_sync_message_stream_manager(
output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
finally_callback=self._after_call,
)

if isinstance(output, anthropic.AsyncMessageStreamManager):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_async_message_stream_manager(
output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
finally_callback=self._after_call,
)

if isinstance(output, anthropic.Stream):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_sync_stream(
output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
finally_callback=self._after_call,
)

if isinstance(output, anthropic.AsyncStream):
span_to_end, trace_to_end = base_track_decorator.pop_end_candidates()
return stream_patchers.patch_async_stream(
output,
span_to_end=span_to_end,
trace_to_end=trace_to_end,
finally_callback=self._after_call,
)

NOT_A_STREAM = None

return NOT_A_STREAM
120 changes: 120 additions & 0 deletions sdks/python/src/opik/integrations/anthropic/opik_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import Optional
import logging

import anthropic
from . import messages_create_decorator
from . import messages_batch_decorator
from typing import TypeVar, Dict, Any

AnthropicClient = TypeVar(
"AnthropicClient",
anthropic.AsyncAnthropic,
anthropic.Anthropic,
anthropic.AsyncAnthropicBedrock,
anthropic.AnthropicBedrock,
anthropic.AsyncAnthropicVertex,
anthropic.AnthropicVertex,
)

LOGGER = logging.getLogger(__name__)


def track_anthropic(
anthropic_client: AnthropicClient,
project_name: Optional[str] = None,
) -> AnthropicClient:
"""Adds Opik tracking to an Anthropic client.
Integrates with the following anthropic library objects:
* AsyncAnthropic,
* Anthropic,
* AsyncAnthropicBedrock,
* AnthropicBedrock,
* AsyncAnthropicVertex,
* AnthropicVertex,
Supported methods (for all classes above) are:
* `client.messages.create()`
* `client.messages.stream()`
Can be used within other Opik-tracked functions.
Args:
anthropic_client: An instance of Anthropic client.
project_name: The name of the project to log data.
Returns:
Anthropic client with integrated Opik tracking logic.
"""

if hasattr(anthropic_client, "opik_tracked"):
return anthropic_client

anthropic_client.opik_tracked = True
decorator_factory = messages_create_decorator.AnthropicMessagesCreateDecorator()

metadata = _extract_metadata_from_client(anthropic_client)

create_decorator = decorator_factory.track(
type="llm",
name="anthropic_messages_create",
project_name=project_name,
metadata=metadata,
)
stream_decorator = decorator_factory.track(
type="llm",
name="anthropic_messages_stream",
project_name=project_name,
metadata=metadata,
)
batch_create_decorator = messages_batch_decorator.warning_decorator(
"At the moment Opik Anthropic integration does not support tracking for `client.beta.messages.batches.create` calls",
LOGGER,
)
completions_create_decorator = messages_batch_decorator.warning_decorator(
"Opik Anthropic integration does not support tracking for `client.completions.create` calls",
LOGGER,
)

anthropic_client.messages.create = create_decorator(
anthropic_client.messages.create
)
anthropic_client.messages.stream = stream_decorator(
anthropic_client.messages.stream
)
try:
anthropic_client.beta.messages.batches.create = batch_create_decorator(
anthropic_client.beta.messages.batches.create
)
except Exception:
LOGGER.debug(
"Failed to patch `client.messages.batches.create` method. It is likely because it was not implemented in the provided anthropic client",
exc_info=True,
)

try:
anthropic_client.completions.create = completions_create_decorator(
anthropic_client.completions.create
)
except Exception:
LOGGER.debug(
"Failed to patch `client.completions.create` method. It is likely because it was not implemented in the provided anthropic client",
exc_info=True,
)

return anthropic_client


def _extract_metadata_from_client(client: AnthropicClient) -> Dict[str, Any]:
metadata = {"base_url": client.base_url}
if isinstance(
client, (anthropic.AnthropicBedrock, anthropic.AsyncAnthropicBedrock)
):
metadata["aws_region"] = client.aws_region
elif isinstance(
client, (anthropic.AnthropicVertex, anthropic.AsyncAnthropicVertex)
):
metadata["region"] = client.region
metadata["project_id"] = client.project_id

return metadata
Loading

0 comments on commit dc8685c

Please sign in to comment.