Skip to content

Commit

Permalink
[NA] Offline logging support (#115)
Browse files Browse the repository at this point in the history
* Add config variables for offline mode

* Add message processing

* Fix lint errors

* Fix lint errors, change some names

* Refactor names, add message_processing namespace

* Move prompt sending logic to online_senders

* Move sending logic for chains to online_senders.chain

* Fix some lint errors

* Update message processing implementation, split the logic and refactor unit tests

* Fix some lint errors

* Remove odd files from another branch that were added here accidentally

* Fix lint errors

* Add draft version of offline_message_processor (without file rotations)

* Fix lint errors, add file rotation logic

* Refactor message processing logic, add unit tests

* Exclude api_key from serialized messages, rename experiment_information to experiment_info or experiment_info_

* Add test for message to_dict and from_dict

* Add directory creation if it doesnt exist, add versions to messages, fix tests

* Don't raise errors if offline mode enabled

* Fix lint errors

* Enable error raising in test workflows, fix few tests

* Add new test for offline message processor

* Rename test

* Add random numbers to the end of the file

* Config renamings
  • Loading branch information
alexkuzmik authored Mar 26, 2024
1 parent 4ca66ba commit d4c7409
Show file tree
Hide file tree
Showing 29 changed files with 858 additions and 131 deletions.
1 change: 1 addition & 0 deletions .github/workflows/lib-integration-tests-runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
env:
SLACK_WEBHOOK_URL: ${{ secrets.ACTION_MONITORING_SLACK }}
LIBS: ${{ github.event.inputs.libs != '' && github.event.inputs.libs || 'all' }}
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"

jobs:
init_environment:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lib-openai-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ name: Lib OpenAI Tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }}
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
workflow_call:

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/sanity-tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: Sanity Tests Comet LLM
env:
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
pull_request:

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: Unit Tests Comet LLM
env:
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
pull_request:

Expand Down
54 changes: 19 additions & 35 deletions src/comet_llm/chains/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
llm_result,
logging_messages,
)
from ..message_processing import api as message_processing_api, messages
from ..types import JSONEncodable
from . import chain, state

Expand All @@ -52,11 +53,11 @@ def start_chain(
tags: List[str] (optional) user-defined tags attached to the chain
"""

MESSAGE = """
CometLLM requires an API key. Please provide it as the
api_key argument to comet_llm.start_chain or as an environment
variable named COMET_API_KEY
"""
MESSAGE = (
None
if config.offline_enabled()
else (logging_messages.API_KEY_NOT_FOUND_MESSAGE % "comet_llm.start_chain")
)

experiment_info_ = experiment_info.get(
api_key,
Expand All @@ -77,7 +78,7 @@ def start_chain(
def end_chain(
outputs: Dict[str, JSONEncodable],
metadata: Optional[Dict[str, JSONEncodable]] = None,
) -> llm_result.LLMResult:
) -> Optional[llm_result.LLMResult]:
"""
Commits global chain and logs the result to Comet.
Args:
Expand All @@ -100,38 +101,21 @@ def end_chain(
return log_chain(global_chain)


def log_chain(chain: chain.Chain) -> llm_result.LLMResult:
def log_chain(chain: chain.Chain) -> Optional[llm_result.LLMResult]:
chain_data = chain.as_dict()

experiment_info_ = chain.experiment_info
experiment_api_ = experiment_api.ExperimentAPI.create_new(
api_key=experiment_info_.api_key,
workspace=experiment_info_.workspace,
project_name=experiment_info_.project_name,
message = messages.ChainMessage(
experiment_info_=chain.experiment_info,
tags=chain.tags,
chain_data=chain_data,
duration=chain_data["chain_duration"],
metadata=chain_data["metadata"],
others=chain.others,
)

if chain.tags is not None:
experiment_api_.log_tags(chain.tags)
result = message_processing_api.MESSAGE_PROCESSOR.process(message)

experiment_api_.log_asset_with_io(
name="comet_llm_data.json",
file=io.StringIO(json.dumps(chain_data)),
asset_type="llm_data",
)

experiment_api_.log_metric(
name="chain_duration", value=chain_data["chain_duration"]
)
if result is not None:
app.SUMMARY.add_log(result.project_url, "chain")

parameters = convert.chain_metadata_to_flat_parameters(chain_data["metadata"])
for name, value in parameters.items():
experiment_api_.log_parameter(name, value)

for name, value in chain.others.items():
experiment_api_.log_other(name, value)

app.SUMMARY.add_log(experiment_api_.project_url, "chain")

return llm_result.LLMResult(
id=experiment_api_.id, project_url=experiment_api_.project_url
)
return result
15 changes: 15 additions & 0 deletions src/comet_llm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def _extend_comet_ml_config() -> None:
"comet.logging.console": {"type": str, "default": "INFO"},
"comet.raise_exceptions_on_error": {"type": int, "default": 0},
"comet.internal.check_tls_certificate": {"type": bool, "default": True},
"comet.online": {"type": bool, "default": True},
"comet.offline_directory": {"type": str, "default": ".cometllm-runs"},
"comet.offline_batch_duration_seconds": {"type": int, "default": 300},
}

comet_ml_config.CONFIG_MAP.update(CONFIG_MAP_EXTENSION)
Expand Down Expand Up @@ -95,6 +98,18 @@ def tls_verification_enabled() -> bool:
return _COMET_ML_CONFIG["comet.internal.check_tls_certificate"] # type: ignore


def offline_enabled() -> bool:
return not bool(_COMET_ML_CONFIG["comet.online"])


def offline_directory() -> str:
return str(_COMET_ML_CONFIG["comet.offline_directory"])


def offline_batch_duration_seconds() -> int:
return int(_COMET_ML_CONFIG["comet.offline_batch_duration_seconds"])


def init(
api_key: Optional[str] = None,
workspace: Optional[str] = None,
Expand Down
5 changes: 5 additions & 0 deletions src/comet_llm/logging_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
variable named COMET_API_KEY
"""

API_KEY_NOT_CONFIGURED = """
CometLLM requires an API key. Please provide it as the
as an environment variable named COMET_API_KEY
"""

NON_ALLOWED_SCORE = "Score can only be 0 or 1 when calling 'log_user_feedback'"

METADATA_KEY_COLLISION_DURING_DEEPMERGE = (
Expand Down
13 changes: 13 additions & 0 deletions src/comet_llm/message_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************
31 changes: 31 additions & 0 deletions src/comet_llm/message_processing/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

from typing import Union

from .. import config
from . import offline_message_processor, online_message_processor

MESSAGE_PROCESSOR: Union[
offline_message_processor.OfflineMessageProcessor,
online_message_processor.OnlineMessageProcessor,
]

if config.offline_enabled():
MESSAGE_PROCESSOR = offline_message_processor.OfflineMessageProcessor(
offline_directory=config.offline_directory(),
file_usage_duration=config.offline_batch_duration_seconds(),
)
else:
MESSAGE_PROCESSOR = online_message_processor.OnlineMessageProcessor()
72 changes: 72 additions & 0 deletions src/comet_llm/message_processing/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2023 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import dataclasses
import inspect
from typing import Any, ClassVar, Dict, List, Optional, Union

from comet_llm.types import JSONEncodable

from .. import experiment_info, logging_messages


@dataclasses.dataclass
class BaseMessage:
experiment_info_: experiment_info.ExperimentInfo
VERSION: ClassVar[int]

@classmethod
def from_dict(
cls, d: Dict[str, Any], api_key: Optional[str] = None
) -> "BaseMessage":
d.pop("VERSION") #

experiment_info_dict: Dict[str, Optional[str]] = d.pop("experiment_info_")
experiment_info_ = experiment_info.get(
**experiment_info_dict,
api_key=api_key,
api_key_not_found_message=logging_messages.API_KEY_NOT_CONFIGURED
)

return cls(experiment_info_=experiment_info_, **d)

def to_dict(self) -> Dict[str, Any]:
result = dataclasses.asdict(self)

del result["experiment_info_"]["api_key"]
result["VERSION"] = self.VERSION

return result


@dataclasses.dataclass
class PromptMessage(BaseMessage):
prompt_asset_data: Dict[str, Any]
duration: Optional[float]
metadata: Optional[Dict[str, Union[str, bool, float, None]]]
tags: Optional[List[str]]

VERSION: ClassVar[int] = 1


@dataclasses.dataclass
class ChainMessage(BaseMessage):
chain_data: Dict[str, JSONEncodable]
duration: float
tags: Optional[List[str]]
metadata: Optional[Dict[str, JSONEncodable]]
others: Dict[str, JSONEncodable]
# 'other' - is a name of an attribute of experiment, logged via log_other

VERSION: ClassVar[int] = 1
71 changes: 71 additions & 0 deletions src/comet_llm/message_processing/offline_message_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import logging
import os
import pathlib
import random
import threading
import time
from typing import Optional

from . import messages
from .offline_senders import chain, prompt

LOGGER = logging.getLogger(__name__)


class OfflineMessageProcessor:
def __init__(self, offline_directory: str, file_usage_duration: float) -> None:
self._offline_directory = offline_directory
self._batch_duration_seconds = file_usage_duration
self._lock = threading.Lock()

self._current_file_started_at: Optional[float] = None
self._current_file_name: Optional[str] = None

os.makedirs(self._offline_directory, exist_ok=True)

def process(self, message: messages.BaseMessage) -> None:
with self._lock:
self._update_current_file_if_needed()
assert self._current_file_name is not None
file_path = pathlib.Path(self._offline_directory, self._current_file_name)

if isinstance(message, messages.PromptMessage):
try:
return prompt.send(message, str(file_path))
except Exception:
LOGGER.error("Failed to log prompt", exc_info=True)
elif isinstance(message, messages.ChainMessage):
try:
return chain.send(message, str(file_path))
except Exception:
LOGGER.error("Failed to log chain", exc_info=True)

LOGGER.debug(f"Unsupported message type {message}")
return None

def _update_current_file_if_needed(self) -> None:
current_time = time.time()

if (
self._current_file_started_at is None
or (current_time - self._current_file_started_at)
>= self._batch_duration_seconds
):
self._current_file_started_at = current_time
self._current_file_name = (
f"messages_{current_time}_{random.randint(1111,9999)}.jsonl"
)
13 changes: 13 additions & 0 deletions src/comet_llm/message_processing/offline_senders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************
26 changes: 26 additions & 0 deletions src/comet_llm/message_processing/offline_senders/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import json
import pathlib

from .. import messages


def send(message: messages.ChainMessage, file_name: str) -> None:
to_dump = {"type": "ChainMessage", "message": message.to_dict()}
with open(file_name, mode="at", encoding="utf-8") as out_stream:
out_stream.write(json.dumps(to_dump) + "\n")

return None
Loading

0 comments on commit d4c7409

Please sign in to comment.