Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
baskaryan committed Jan 21, 2025
2 parents 4084685 + b812149 commit 341798c
Show file tree
Hide file tree
Showing 26 changed files with 2,946 additions and 1,891 deletions.
45 changes: 0 additions & 45 deletions .github/workflows/test_docker_compose.yml

This file was deleted.

100 changes: 81 additions & 19 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand All @@ -20,6 +21,7 @@

from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
Expand Down Expand Up @@ -100,7 +102,8 @@ def _tracing_thread_drain_queue(
def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
if client.compressed_runs is None:
return None, None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()
Expand Down Expand Up @@ -214,6 +217,24 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
client.compressed_runs = CompressedRuns()
client._data_available_event = threading.Event()
threading.Thread(
target=tracing_control_thread_func_compress_parallel,
args=(weakref.ref(client),),
).start()

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down Expand Up @@ -256,6 +277,7 @@ def keep_thread_active() -> bool:
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
Expand All @@ -264,12 +286,20 @@ def keep_thread_active() -> bool:


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
client_ref: weakref.ref[Client], flush_interval: float = 0.5
) -> None:
client = client_ref()
if client is None:
return

if (
client.compressed_runs is None
or client._data_available_event is None
or client._futures is None
):
logger.error("Required compression attributes not initialized")
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down Expand Up @@ -300,35 +330,67 @@ def keep_thread_active() -> bool:
# for now, keep thread alive
return True

last_flush_time = time.monotonic()

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If data arrived, clear the event and attempt a drain
if triggered:
client._data_available_event.clear()

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If we have data, submit the send request
if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

else:
if (time.monotonic() - last_flush_time) >= flush_interval:
data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
)
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream, compressed_runs_info)
if data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

# Drain the buffer on exit
# Drain the buffer on exit (final flush)
try:
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
)
if final_data_stream is not None:
try:
Expand Down
17 changes: 1 addition & 16 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import io
import threading

try:
from zstandard import ZstdCompressor # type: ignore[import]

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False
from zstandard import ZstdCompressor # type: ignore[import]

from langsmith import utils as ls_utils

Expand All @@ -20,11 +15,6 @@ def __init__(self):
self.lock = threading.Lock()
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
Expand All @@ -34,11 +24,6 @@ def reset(self):
self.run_count = 0
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
53 changes: 37 additions & 16 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import itertools
import logging
import os
import uuid
from typing import Literal, Optional, Union, cast
from io import BufferedReader
from typing import Dict, Literal, Optional, Union, cast

from langsmith import schemas as ls_schemas
from langsmith._internal import _orjson
Expand Down Expand Up @@ -212,9 +214,9 @@ def serialized_feedback_operation_to_multipart_parts_and_context(

def serialized_run_operation_to_multipart_parts_and_context(
op: SerializedRunOperation,
) -> MultipartPartsAndContext:
) -> tuple[MultipartPartsAndContext, Dict[str, BufferedReader]]:
acc_parts: list[MultipartPart] = []

opened_files_dict: Dict[str, BufferedReader] = {}
# this is main object, minus inputs/outputs/events/attachments
acc_parts.append(
(
Expand Down Expand Up @@ -247,7 +249,7 @@ def serialized_run_operation_to_multipart_parts_and_context(
),
)
if op.attachments:
for n, (content_type, valb) in op.attachments.items():
for n, (content_type, data_or_path) in op.attachments.items():
if "." in n:
logger.warning(
f"Skipping logging of attachment '{n}' "
Expand All @@ -257,20 +259,39 @@ def serialized_run_operation_to_multipart_parts_and_context(
)
continue

acc_parts.append(
(
f"attachment.{op.id}.{n}",
if isinstance(data_or_path, bytes):
acc_parts.append(
(
None,
valb,
content_type,
{"Content-Length": str(len(valb))},
),
f"attachment.{op.id}.{n}",
(
None,
data_or_path,
content_type,
{"Content-Length": str(len(data_or_path))},
),
)
)
)
return MultipartPartsAndContext(
acc_parts,
f"trace={op.trace_id},id={op.id}",
else:
file_size = os.path.getsize(data_or_path)
file = open(data_or_path, "rb")
opened_files_dict[str(data_or_path) + str(uuid.uuid4())] = file
acc_parts.append(
(
f"attachment.{op.id}.{n}",
(
None,
file, # type: ignore[arg-type]
f"{content_type}; length={file_size}",
{},
),
)
)
return (
MultipartPartsAndContext(
acc_parts,
f"trace={op.trace_id},id={op.id}",
),
opened_files_dict,
)


Expand Down
Loading

0 comments on commit 341798c

Please sign in to comment.