diff --git a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs index 116632ff0..a6a2b1933 100644 --- a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs +++ b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration}; use pyo3::prelude::*; -use langsmith_tracing_client::client::streaming::{ +use langsmith_tracing_client::client::{ ClientConfig as RustClientConfig, TracingClient as RustTracingClient, }; diff --git a/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs b/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs index 4cc2cc421..19eb2c02e 100644 --- a/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs +++ b/rust/crates/langsmith-tracing-client/benches/tracing_client_benchmark.rs @@ -1,35 +1,21 @@ -use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; -use langsmith_tracing_client::client::async_enabled::{ClientConfig, TracingClient}; -use langsmith_tracing_client::client::blocking::{ - ClientConfig as BlockingClientConfig, TracingClient as BlockingTracingClient, -}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use langsmith_tracing_client::client::{ - Attachment, EventType, RunCommon, RunCreate, RunCreateExtended, RunEventBytes, RunIO, TimeValue, + Attachment, ClientConfig as BlockingClientConfig, RunCommon, RunCreate, RunCreateExtended, + RunIO, TimeValue, TracingClient as BlockingTracingClient, }; use mockito::Server; use serde_json::{json, Value}; use std::time::Duration; -use tokio::runtime::Runtime; - -fn create_mock_client_config(server_url: &str, batch_size: usize) -> ClientConfig { - ClientConfig { - endpoint: server_url.to_string(), - queue_capacity: 1_000_000, - batch_size, - batch_timeout: Duration::from_secs(1), - headers: Default::default(), - } -} fn create_mock_client_config_sync(server_url: &str, batch_size: usize) -> BlockingClientConfig { BlockingClientConfig { endpoint: server_url.to_string(), api_key: "anything".into(), queue_capacity: 1_000_000, - batch_size, - batch_timeout: Duration::from_secs(1), + send_at_batch_size: batch_size, + send_at_batch_time: Duration::from_secs(1), headers: Default::default(), - num_worker_threads: 1, + compression_level: 1, } } @@ -67,26 +53,6 @@ fn create_run_create( } } -fn create_run_bytes( - attachments: Option>, - inputs: Option, - outputs: Option, -) -> RunEventBytes { - let inputs_bytes = inputs.as_ref().map(|i| serde_json::to_vec(&i).unwrap()); - let outputs_bytes = outputs.as_ref().map(|o| serde_json::to_vec(&o).unwrap()); - let run_create = create_run_create(attachments, inputs, outputs); - let run_bytes = serde_json::to_vec(&run_create.run_create).unwrap(); - - RunEventBytes { - run_id: run_create.run_create.common.id, - event_type: EventType::Create, - run_bytes, - inputs_bytes, - outputs_bytes, - attachments: run_create.attachments, - } -} - fn create_large_json(len: usize) -> Value { let large_array: Vec = (0..len) .map(|i| { @@ -113,186 +79,6 @@ fn create_large_json(len: usize) -> Value { }) } -#[expect(dead_code)] -fn bench_run_create(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let server = rt.block_on(async { - let mut server = Server::new_async().await; - server.mock("POST", "/runs/multipart").with_status(202).create_async().await; - server - }); - - let mut group = c.benchmark_group("run_create"); - for batch_size in [50] { - for json_len in [1_000, 5_000] { - for num_runs in [500, 1_000] { - group.bench_with_input( - BenchmarkId::new( - "run_create_async", - format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs), - ), - &(batch_size, json_len, num_runs), - |b, &(batch_size, json_len, num_runs)| { - b.to_async(&rt).iter_batched( - || { - let runs: Vec = (0..num_runs) - .map(|i| { - let mut run = create_run_create( - None, - Some(create_large_json(json_len)), - Some(create_large_json(json_len)), - ); - run.run_create.common.id = format!("test_id_{}", i); - run - }) - .collect(); - let client_config = - create_mock_client_config(&server.url(), batch_size); - let client = TracingClient::new(client_config).unwrap(); - (client, runs) - }, - |(client, runs)| async { - for run in runs { - client.submit_run_create(black_box(run)).await.unwrap(); - } - // shutdown the client to flush the queue - client.shutdown().await.unwrap(); - }, - BatchSize::LargeInput, - ); - }, - ); - } - } - } - group.finish(); -} - -#[expect(dead_code, clippy::single_element_loop)] -fn bench_run_create_iter_custom(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let server = rt.block_on(async { - let mut server = Server::new_async().await; - server.mock("POST", "/runs/multipart").with_status(202).create_async().await; - server - }); - - let mut group = c.benchmark_group("run_create_custom_iter"); - let server_url = server.url(); - for batch_size in [100] { - for json_len in [3_000] { - for num_runs in [1_000] { - group.bench_function( - BenchmarkId::new( - "run_create_async", - format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs), - ), - |b| { - b.to_async(&rt).iter_custom(|iters| { - let mut elapsed_time = Duration::default(); - let server_url = server_url.clone(); - async move { - for _ in 0..iters { - let runs: Vec = (0..num_runs) - .map(|i| { - let mut run = create_run_create( - None, - Some(create_large_json(json_len)), - Some(create_large_json(json_len)), - ); - run.run_create.common.id = format!("test_id_{}", i); - run - }) - .collect(); - let client_config = - create_mock_client_config(&server_url, batch_size); - let client = TracingClient::new(client_config).unwrap(); - - let start = std::time::Instant::now(); - for run in runs { - client.submit_run_create(black_box(run)).await.unwrap(); - } - - // shutdown the client to flush the queue - let start_shutdown = std::time::Instant::now(); - println!("----------SHUTDOWN----------"); - client.shutdown().await.unwrap(); - println!("----------SHUTDOWN END----------"); - println!( - "Elapsed time for shutdown: {:?}", - start_shutdown.elapsed() - ); - elapsed_time += start.elapsed(); - println!("Elapsed time: {:?}", elapsed_time); - } - elapsed_time - } - }) - }, - ); - } - } - } - group.finish(); -} - -#[expect(dead_code)] -fn bench_run_bytes_iter_custom(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let server = rt.block_on(async { - let mut server = Server::new_async().await; - server.mock("POST", "/runs/multipart").with_status(202).create_async().await; - server - }); - - let mut group = c.benchmark_group("run_create_bytes_iter"); - let server_url = server.url(); - for batch_size in [50] { - for json_len in [1_000, 5_000] { - for num_runs in [500, 1_000] { - group.bench_function( - BenchmarkId::new( - "run_create_async", - format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs), - ), - |b| { - b.to_async(&rt).iter_custom(|iters| { - let mut elapsed_time = Duration::default(); - let server_url = server_url.clone(); - async move { - for _ in 0..iters { - let runs: Vec = (0..num_runs) - .map(|_i| { - create_run_bytes( - None, - Some(create_large_json(json_len)), - Some(create_large_json(json_len)), - ) - }) - .collect(); - let client_config = - create_mock_client_config(&server_url, batch_size); - let client = TracingClient::new(client_config).unwrap(); - - let start = std::time::Instant::now(); - for run in runs { - client.submit_run_bytes(black_box(run)).await.unwrap(); - } - // shutdown the client to flush the queue - client.shutdown().await.unwrap(); - elapsed_time += start.elapsed(); - } - elapsed_time - } - }) - }, - ); - } - } - } - group.finish(); -} - #[expect(unused_variables, clippy::single_element_loop)] fn bench_run_create_sync_iter_custom(c: &mut Criterion) { let server = { diff --git a/rust/crates/langsmith-tracing-client/src/client/async_enabled/mod.rs b/rust/crates/langsmith-tracing-client/src/client/async_enabled/mod.rs deleted file mode 100644 index 7d4f1fcba..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/async_enabled/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod processor; -mod tracing_client; - -pub use processor::RunProcessor; -pub use tracing_client::{ClientConfig, TracingClient}; diff --git a/rust/crates/langsmith-tracing-client/src/client/async_enabled/processor.rs b/rust/crates/langsmith-tracing-client/src/client/async_enabled/processor.rs deleted file mode 100644 index 69f0a6d2e..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/async_enabled/processor.rs +++ /dev/null @@ -1,468 +0,0 @@ -use futures::stream::{FuturesUnordered, StreamExt}; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use reqwest::multipart::{Form, Part}; -use serde_json::to_vec; -use tokio::sync::mpsc::Receiver; -use tokio::task; -use tokio::time::{sleep, Instant}; -use tokio_util::io::ReaderStream; - -use super::tracing_client::ClientConfig; -use crate::client::errors::TracingClientError; -use crate::client::run::{Attachment, EventType, QueuedRun, RunEventBytes}; -use crate::client::run::{RunCreateExtended, RunUpdateExtended}; - -pub struct RunProcessor { - receiver: Receiver, - http_client: reqwest::Client, - config: ClientConfig, -} - -impl RunProcessor { - pub(crate) fn new(receiver: Receiver, config: ClientConfig) -> Self { - let http_client = reqwest::Client::new(); - - Self { receiver, http_client, config } - } - - pub(crate) async fn run(mut self) -> Result<(), TracingClientError> { - let mut buffer = Vec::new(); - let mut last_send_time = Instant::now(); - - loop { - tokio::select! { - Some(queued_run) = self.receiver.recv() => { - match queued_run { - QueuedRun::Shutdown => { - println!("shutdown signal received."); - if !buffer.is_empty() { - println!("sending remaining buffer before shutdown."); - self.send_and_clear_buffer(&mut buffer).await?; - } - break; - }, - _ => { - // println!("received a queued run."); - buffer.push(queued_run); - if buffer.len() >= self.config.batch_size { - println!("batch size limit, sending batch."); - self.send_and_clear_buffer(&mut buffer).await?; - last_send_time = Instant::now(); - } - } - } - } - _ = sleep(self.config.batch_timeout) => { - if !buffer.is_empty() && last_send_time.elapsed() >= self.config.batch_timeout { - // println!("batch timeout, sending batch."); - self.send_and_clear_buffer(&mut buffer).await?; - last_send_time = Instant::now(); - } - } - else => { - // println!("channel closed."); - if !buffer.is_empty() { - // println!("sending remaining buffer."); - self.send_and_clear_buffer(&mut buffer).await?; - } - break; - } - } - } - // println!("exiting loop."); - Ok(()) - } - - async fn send_and_clear_buffer( - &self, - buffer: &mut Vec, - ) -> Result<(), TracingClientError> { - if let Err(e) = self.send_batch(std::mem::take(buffer)).await { - // todo: retry logic? - eprintln!("Error sending batch: {}", e); - } - Ok(()) - } - - // async fn send_batch(&self, batch: Vec) -> Result<(), TracingClientError> { - // let mut form = Form::new(); - // // println!("Sending batch of {} runs", batch.len()); - // - // for queued_run in batch { - // match queued_run { - // QueuedRun::Create(run_create_extended) => { - // self.consume_run_create(run_create_extended, &mut form) - // .await?; - // } - // QueuedRun::Update(run_update_extended) => { - // self.consume_run_update(run_update_extended, &mut form) - // .await?; - // } - // QueuedRun::RunBytes(run_event_bytes) => { - // self.consume_run_bytes(run_event_bytes, &mut form).await?; - // } - // QueuedRun::Shutdown => { - // return Err(TracingClientError::UnexpectedShutdown); - // } - // } - // } - // - // // Send the multipart POST request - // let response = self - // .http_client - // .post(format!("{}/runs/multipart", self.config.endpoint)) - // .multipart(form) - // .headers(self.config.headers.clone().unwrap_or_default()) - // .send() - // .await?; - // - // if response.status().is_success() { - // Ok(()) - // } else { - // Err(TracingClientError::HttpError(response.status())) - // } - // } - - #[expect(dead_code)] - async fn consume_run_create( - &self, - run_create_extended: RunCreateExtended, - form: &mut Form, - ) -> Result<(), TracingClientError> { - let RunCreateExtended { run_create, io, attachments } = run_create_extended; - let run_id = &run_create.common.id; - - // conditionally add the run_create and io parts to the form - self.add_json_part_to_form(form, format!("post.{}", run_id), &run_create)?; - - if let Some(inputs) = io.inputs { - self.add_json_part_to_form(form, format!("post.{}.inputs", run_id), &inputs)?; - } - - if let Some(outputs) = io.outputs { - self.add_json_part_to_form(form, format!("post.{}.outputs", run_id), &outputs)?; - } - - if let Some(attachments) = attachments { - for attachment in attachments { - self.add_attachment_to_form(form, run_id, attachment).await?; - } - } - - Ok(()) - } - - #[expect(dead_code)] - async fn consume_run_update( - &self, - run_update_extended: RunUpdateExtended, - form: &mut Form, - ) -> Result<(), TracingClientError> { - let RunUpdateExtended { run_update, io, attachments } = run_update_extended; - let run_id = &run_update.common.id; - - self.add_json_part_to_form(form, format!("patch.{}", run_id), &run_update)?; - - if let Some(outputs) = io.outputs { - self.add_json_part_to_form(form, format!("patch.{}.outputs", run_id), &outputs)?; - } - - if let Some(attachments) = attachments { - for attachment in attachments { - self.add_attachment_to_form(form, run_id, attachment).await?; - } - } - - Ok(()) - } - - #[expect(dead_code)] - async fn consume_run_bytes( - &self, - run_event_bytes: RunEventBytes, - form: &mut Form, - ) -> Result<(), TracingClientError> { - let RunEventBytes { - run_id, - event_type, - run_bytes, - inputs_bytes, - outputs_bytes, - attachments, - } = run_event_bytes; - - let event_type_str = match event_type { - EventType::Create => "post", - EventType::Update => "patch", - }; - - let part_size = run_bytes.len() as u64; - *form = std::mem::take(form).part( - format!("{}.{}", event_type_str, run_id), - Part::bytes(run_bytes).mime_str(&format!("application/json; length={}", part_size))?, - ); - - if let Some(inputs_bytes) = inputs_bytes { - let part_size = inputs_bytes.len() as u64; - *form = std::mem::take(form).part( - format!("{}.{}.inputs", event_type_str, run_id), - Part::bytes(inputs_bytes) - .mime_str(&format!("application/json; length={}", part_size))?, - ); - } - - if let Some(outputs_bytes) = outputs_bytes { - let part_size = outputs_bytes.len() as u64; - *form = std::mem::take(form).part( - format!("{}.{}.outputs", event_type_str, run_id), - Part::bytes(outputs_bytes) - .mime_str(&format!("application/json; length={}", part_size))?, - ); - } - - if let Some(attachments) = attachments { - for attachment in attachments { - self.add_attachment_to_form(form, &run_id, attachment).await?; - } - } - - Ok(()) - } - - fn add_json_part_to_form( - &self, - form: &mut Form, - part_name: String, - data: &impl serde::Serialize, - ) -> Result<(), TracingClientError> { - let data_bytes = to_vec(data).unwrap(); // TODO: get rid of unwrap - let part_size = data_bytes.len() as u64; - *form = std::mem::take(form).part( - part_name, - Part::bytes(data_bytes).mime_str(&format!("application/json; length={}", part_size))?, - ); - Ok(()) - } - - async fn add_attachment_to_form( - &self, - form: &mut Form, - run_id: &str, - attachment: Attachment, - ) -> Result<(), TracingClientError> { - let part_name = format!("attachment.{}.{}", run_id, attachment.ref_name); - if let Some(data) = attachment.data { - let part_size = data.len() as u64; - *form = std::mem::take(form).part( - part_name, - Part::bytes(data) - .file_name(attachment.filename) - .mime_str(&format!("{}; length={}", &attachment.content_type, part_size))?, - ); - } else { - // stream the file from disk to avoid loading the entire file into memory - let file_path = std::path::Path::new(&attachment.filename); - let metadata = tokio::fs::metadata(file_path).await.map_err(|e| { - TracingClientError::IoError(format!("Failed to read file metadata: {}", e)) - })?; - let file_size = metadata.len(); - let file = tokio::fs::File::open(file_path) - .await - .map_err(|e| TracingClientError::IoError(format!("Failed to open file: {}", e)))?; - let stream = ReaderStream::new(file); - let body = reqwest::Body::wrap_stream(stream); - - // extract filename from path - let file_name = file_path - .file_name() - .ok_or_else(|| { - TracingClientError::IoError("Failed to extract filename from path".to_string()) - })? - .to_string_lossy() - .into_owned(); - - let part = Part::stream_with_length(body, file_size) - .file_name(file_name) - .mime_str(&format!("{}; length={}", &attachment.content_type, file_size))?; - - *form = std::mem::take(form).part(part_name, part); - } - Ok(()) - } - - #[expect(unused_variables)] - async fn send_batch(&self, batch: Vec) -> Result<(), TracingClientError> { - let start_send_batch = Instant::now(); - let mut json_data = Vec::new(); - let mut attachment_futures = Vec::new(); - - for queued_run in batch { - match queued_run { - QueuedRun::Create(run_create_extended) => { - let RunCreateExtended { run_create, io, attachments } = run_create_extended; - let run_id = run_create.common.id.clone(); - - // Collect JSON data - json_data.push(( - format!("post.{}", run_id), - to_vec(&run_create).unwrap(), // TODO: get rid of unwrap - )); - - if let Some(inputs) = io.inputs { - json_data.push((format!("post.{}.inputs", run_id), inputs)); - } - - if let Some(outputs) = io.outputs { - json_data.push((format!("post.{}.outputs", run_id), outputs)); - } - - if let Some(attachments) = attachments { - for attachment in attachments { - attachment_futures.push(( - format!("attachment.{}.{}", run_id, attachment.ref_name), - self.create_attachment_part(attachment), - )); - } - } - } - QueuedRun::Update(run_update_extended) => { - let RunUpdateExtended { run_update, io, attachments } = run_update_extended; - let run_id = run_update.common.id.clone(); - - // Collect JSON data - json_data.push(( - format!("patch.{}", run_id), - to_vec(&run_update).unwrap(), // TODO: get rid of unwrap - )); - - if let Some(outputs) = io.outputs { - json_data.push((format!("patch.{}.outputs", run_id), outputs)); - } - - if let Some(attachments) = attachments { - for attachment in attachments { - attachment_futures.push(( - format!("attachment.{}.{}", run_id, attachment.ref_name), - self.create_attachment_part(attachment), - )); - } - } - } - QueuedRun::RunBytes(_) => { - // TODO: fix this - return Err(TracingClientError::UnexpectedShutdown); - } - QueuedRun::Drain => { - unreachable!("drain message in batch"); - } - QueuedRun::Shutdown => { - return Err(TracingClientError::UnexpectedShutdown); - } - } - } - - // println!("Batch processing took {:?}", start_send_batch.elapsed()); - // process JSON serialization in a blocking thread with Rayon parallel iterator - let start = Instant::now(); - let json_parts = task::spawn_blocking(move || { - println!("Parallel processing a batch of {} runs", json_data.len()); - let start_time_in_parallel = Instant::now(); - json_data - .into_par_iter() - .map(|(part_name, data_bytes)| { - let part_size = data_bytes.len() as u64; - let part = Part::bytes(data_bytes) - .mime_str(&format!("application/json; length={}", part_size))?; - Ok::<(String, Part), TracingClientError>((part_name, part)) - }) - .collect::, TracingClientError>>() - }) - .await - .unwrap()?; // TODO: get rid of unwrap - println!("JSON processing took {:?}", start.elapsed()); - - // process attachments asynchronously - let attachment_parts_results = FuturesUnordered::from_iter( - attachment_futures.into_iter().map(|(part_name, future)| async { - let part = future.await?; - Ok((part_name, part)) - }), - ) - .collect::>>() - .await; - let mut attachment_parts = Vec::new(); - for result in attachment_parts_results { - match result { - Ok((part_name, part)) => { - attachment_parts.push((part_name, part)); - } - Err(e) => { - eprintln!("Error processing attachment: {}", e); - } - } - } - - // assemble form - let mut form = Form::new(); - for (part_name, part) in json_parts.into_iter().chain(attachment_parts) { - form = form.part(part_name, part); - } - - // println!("Assembling form took {:?}", start.elapsed()); - - // send the multipart POST request - let start_send_batch = std::time::Instant::now(); - let response = self - .http_client - .post(format!("{}/runs/multipart", self.config.endpoint)) - .multipart(form) - .headers(self.config.headers.clone().unwrap_or_default()) - .send() - .await?; - println!("Sending batch took {:?}", start_send_batch.elapsed()); - - // println!("Sending batch took {:?}", start_send_batch.elapsed()); - if response.status().is_success() { - Ok(()) - } else { - Err(TracingClientError::HttpError(response.status())) - } - } - - async fn create_attachment_part( - &self, - attachment: Attachment, - ) -> Result { - let part = if let Some(data) = attachment.data { - let part_size = data.len() as u64; - Part::bytes(data) - .file_name(attachment.filename) - .mime_str(&format!("{}; length={}", &attachment.content_type, part_size))? - } else { - let file_path = std::path::Path::new(&attachment.filename); - let metadata = tokio::fs::metadata(file_path).await.map_err(|e| { - TracingClientError::IoError(format!("Failed to read file metadata: {}", e)) - })?; - let file_size = metadata.len(); - let file = tokio::fs::File::open(file_path) - .await - .map_err(|e| TracingClientError::IoError(format!("Failed to open file: {}", e)))?; - let stream = ReaderStream::new(file); - let body = reqwest::Body::wrap_stream(stream); - - let file_name = file_path - .file_name() - .ok_or_else(|| { - TracingClientError::IoError("Failed to extract filename from path".to_string()) - })? - .to_string_lossy() - .into_owned(); - - Part::stream_with_length(body, file_size) - .file_name(file_name) - .mime_str(&format!("{}; length={}", &attachment.content_type, file_size))? - }; - - Ok(part) - } -} diff --git a/rust/crates/langsmith-tracing-client/src/client/async_enabled/tracing_client.rs b/rust/crates/langsmith-tracing-client/src/client/async_enabled/tracing_client.rs deleted file mode 100644 index 940c136b0..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/async_enabled/tracing_client.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::time::Duration; - -use reqwest::header::HeaderMap; -use tokio::sync::mpsc::{self, Sender}; -use tokio::task::JoinHandle; - -use super::processor::RunProcessor; -use crate::client::errors::TracingClientError; -use crate::client::run::{QueuedRun, RunEventBytes}; -use crate::client::run::{RunCreateExtended, RunUpdateExtended}; - -pub struct ClientConfig { - pub endpoint: String, - pub queue_capacity: usize, - pub batch_size: usize, - pub batch_timeout: Duration, - pub headers: Option, -} - -pub struct TracingClient { - sender: Sender, - handle: JoinHandle>, -} - -impl TracingClient { - pub fn new(config: ClientConfig) -> Result { - let (sender, receiver) = mpsc::channel(config.queue_capacity); - - let processor = RunProcessor::new(receiver, config); - - let handle = tokio::spawn(async move { - let result = processor.run().await; - if let Err(e) = &result { - eprintln!("RunProcessor exited with error: {}", e); - } - result - }); - - Ok(Self { sender, handle }) - } - - pub async fn submit_run_create( - &self, - run: RunCreateExtended, - ) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::Create(run); - - self.sender.send(queued_run).await.map_err(|_| TracingClientError::QueueFull) - } - - pub async fn submit_run_update( - &self, - run: RunUpdateExtended, - ) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::Update(run); - - self.sender.send(queued_run).await.map_err(|_| TracingClientError::QueueFull) - } - - pub async fn submit_run_bytes(&self, run: RunEventBytes) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::RunBytes(run); - - self.sender.send(queued_run).await.map_err(|_| TracingClientError::QueueFull) - } - - pub async fn shutdown(self) -> Result<(), TracingClientError> { - self.sender.send(QueuedRun::Shutdown).await.map_err(|_| TracingClientError::QueueFull)?; - - self.handle.await.unwrap() - } -} diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/mod.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/mod.rs deleted file mode 100644 index 7d4f1fcba..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod processor; -mod tracing_client; - -pub use processor::RunProcessor; -pub use tracing_client::{ClientConfig, TracingClient}; diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs deleted file mode 100644 index 495c546d9..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs +++ /dev/null @@ -1,317 +0,0 @@ -use std::collections::HashMap; -use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{mpsc, Arc, Mutex}; -use std::time::{Duration, Instant}; - -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use reqwest::blocking::multipart::{Form, Part}; -use serde_json::to_vec; - -use super::tracing_client::ClientConfig; -use crate::client::errors::TracingClientError; -use crate::client::run::{Attachment, QueuedRun}; -use crate::client::run::{RunCreateExtended, RunUpdateExtended}; - -pub struct RunProcessor { - receiver: Arc>>, - drain_sender: Sender<()>, - config: Arc, - http_client: reqwest::blocking::Client, -} - -impl RunProcessor { - pub(crate) fn new( - receiver: Arc>>, - drain_sender: Sender<()>, - config: Arc, - ) -> Self { - let http_client = reqwest::blocking::Client::new(); - - Self { receiver, drain_sender, http_client, config } - } - - pub(crate) fn run(&self) -> Result<(), TracingClientError> { - let mut buffer = Vec::new(); - let batch_timeout = self.config.batch_timeout; - let batch_size = self.config.batch_size; - let mut last_send_time = Instant::now(); - - loop { - let queued_run = { - let receiver = self.receiver.lock().unwrap(); - receiver.recv_timeout(Duration::from_millis(100)) - }; - - match queued_run { - Ok(queued_run) => match queued_run { - QueuedRun::Shutdown => { - if !buffer.is_empty() { - self.send_and_clear_buffer(&mut buffer)?; - } - break; - } - QueuedRun::Drain => { - if !buffer.is_empty() { - self.send_and_clear_buffer(&mut buffer)?; - } - - self.drain_sender.send(()).expect("drain_sender should never fail"); - break; - } - _ => { - buffer.push(queued_run); - if buffer.len() >= batch_size { - self.send_and_clear_buffer(&mut buffer)?; - last_send_time = Instant::now(); - } - } - }, - Err(mpsc::RecvTimeoutError::Timeout) => { - if !buffer.is_empty() && last_send_time.elapsed() >= batch_timeout { - self.send_and_clear_buffer(&mut buffer)?; - last_send_time = Instant::now(); - } - } - Err(mpsc::RecvTimeoutError::Disconnected) => { - if !buffer.is_empty() { - self.send_and_clear_buffer(&mut buffer)?; - } - break; - } - } - } - - Ok(()) - } - - fn send_and_clear_buffer(&self, buffer: &mut Vec) -> Result<(), TracingClientError> { - if let Err(e) = self.send_batch(std::mem::take(buffer)) { - // todo: retry logic? - eprintln!("Error sending batch: {}", e); - } - Ok(()) - } - - // If we have a `QueuedRun::Create` and `QueuedRun::Update` for the same run ID in the batch, - // combine the update data into the create so we can send just one operation instead of two. - fn combine_batch_operations(batch: Vec) -> Vec { - let mut output = Vec::with_capacity(batch.len()); - let mut id_to_index = HashMap::with_capacity(batch.len()); - - for queued_run in batch { - match queued_run { - QueuedRun::Create(ref run_create_extended) => { - // Record the `Create` operation's ID and index, - // in case we need to modify it later. - let RunCreateExtended { run_create, .. } = run_create_extended; - let run_id = run_create.common.id.clone(); - let index = output.len(); - id_to_index.insert(run_id, index); - output.push(queued_run); - } - QueuedRun::Update(run_update_extended) => { - let run_id = run_update_extended.run_update.common.id.as_str(); - if let Some(create_index) = id_to_index.get(run_id) { - // This `run_id` matches a `Create` in this batch. - // Merge the `Update` data into the `Create` and - // drop the separate `Update` operation from the batch. - let RunUpdateExtended { run_update, io, attachments } = run_update_extended; - let QueuedRun::Create(matching_create) = &mut output[*create_index] else { - panic!("index {create_index} did not point to a Create operation in {output:?}"); - }; - debug_assert_eq!( - run_update.common.id, matching_create.run_create.common.id, - "Create operation at index {create_index} did not have expected ID {}: {matching_create:?}", - run_update.common.id, - ); - - matching_create.run_create.common.merge(run_update.common); - matching_create.run_create.end_time = Some(run_update.end_time); - matching_create.io.merge(io); - if let Some(mut _existing_attachments) = - matching_create.attachments.as_mut() - { - unimplemented!("figure out how to merge attachments -- in Python they are a dict but here they are a Vec"); - } else { - matching_create.attachments = attachments; - } - } else { - // No matching `Create` operations for this `Update`, add it as-is. - output.push(QueuedRun::Update(run_update_extended)); - } - } - // Allow other operations to pass through unchanged. - _ => output.push(queued_run), - } - } - - output - } - - #[expect(unused_variables)] - fn send_batch(&self, batch: Vec) -> Result<(), TracingClientError> { - //println!("Handling a batch of {} runs", batch.len()); - let start_send_batch = tokio::time::Instant::now(); - let mut json_data = Vec::new(); - let mut attachment_parts = Vec::new(); - - let batch = Self::combine_batch_operations(batch); - - let start_iter = Instant::now(); - for queued_run in batch { - match queued_run { - QueuedRun::Create(run_create_extended) => { - let RunCreateExtended { run_create, io, attachments } = run_create_extended; - let run_id = run_create.common.id.clone(); - - // Collect JSON data - json_data.push(( - format!("post.{}", run_id), - to_vec(&run_create).unwrap(), // TODO: get rid of unwrap - )); - - // Ensure that pre-formatted JSON data represented as bytes - // doesn't end in trailing null bytes, since we'll be pasting it verbatim - // into an HTTP multipart request which carries an explicit length header. - if let Some(mut inputs) = io.inputs { - if inputs.last() == Some(&0) { - inputs.pop().expect("popping trailing null byte failed"); - } - json_data.push((format!("post.{}.inputs", run_id), inputs)); - } - if let Some(mut outputs) = io.outputs { - if outputs.last() == Some(&0) { - outputs.pop().expect("popping trailing null byte failed"); - } - json_data.push((format!("post.{}.outputs", run_id), outputs)); - } - - if let Some(attachments) = attachments { - for attachment in attachments { - let part_name = - format!("attachment.{}.{}", run_id, attachment.ref_name); - match self.create_attachment_part(attachment) { - Ok(part) => { - attachment_parts.push((part_name, part)); - } - Err(e) => { - eprintln!("Error processing attachment: {}", e); - } - } - } - } - } - QueuedRun::Update(run_update_extended) => { - let RunUpdateExtended { run_update, io, attachments } = run_update_extended; - let run_id = run_update.common.id.clone(); - - // Collect JSON data - json_data.push(( - format!("patch.{}", run_id), - to_vec(&run_update).unwrap(), // TODO: get rid of unwrap - )); - - // Ensure that pre-formatted JSON data represented as bytes - // doesn't end in trailing null bytes, since we'll be pasting it verbatim - // into an HTTP multipart request which carries an explicit length header. - if let Some(mut outputs) = io.outputs { - if outputs.last() == Some(&0) { - outputs.pop().expect("popping trailing null byte failed"); - } - json_data.push((format!("patch.{}.outputs", run_id), outputs)); - } - - if let Some(attachments) = attachments { - for attachment in attachments { - let part_name = - format!("attachment.{}.{}", run_id, attachment.ref_name); - match self.create_attachment_part(attachment) { - Ok(part) => { - attachment_parts.push((part_name, part)); - } - Err(e) => { - eprintln!("Error processing attachment: {}", e); - } - } - } - } - } - QueuedRun::RunBytes(_) => { - // TODO: fix this - return Err(TracingClientError::UnexpectedShutdown); - } - QueuedRun::Drain => { - unreachable!("drain message in batch"); - } - QueuedRun::Shutdown => { - return Err(TracingClientError::UnexpectedShutdown); - } - } - } - //println!("Iterating over batch took {:?}", start_iter.elapsed()); - - let start = Instant::now(); - let json_parts = json_data - .into_par_iter() - .map(|(part_name, data_bytes)| { - let part_size = data_bytes.len() as u64; - let part = Part::bytes(data_bytes) - .mime_str(&format!("application/json; length={}", part_size))?; - Ok::<(String, Part), TracingClientError>((part_name, part)) - }) - .collect::, TracingClientError>>()?; - // println!("JSON processing took {:?}", start.elapsed()); - - let mut form = Form::new(); - for (part_name, part) in json_parts.into_iter().chain(attachment_parts) { - form = form.part(part_name, part); - } - - // send the multipart POST request - let start_send_batch = Instant::now(); - let response = self - .http_client - .post(format!("{}/runs/multipart", self.config.endpoint)) - .multipart(form) - .headers(self.config.headers.as_ref().cloned().unwrap_or_default()) - .send()?; - // println!("Sending batch took {:?}", start_send_batch.elapsed()); - - if response.status().is_success() { - Ok(()) - } else { - Err(TracingClientError::HttpError(response.status())) - } - } - - fn create_attachment_part(&self, attachment: Attachment) -> Result { - let part = if let Some(data) = attachment.data { - let part_size = data.len() as u64; - Part::bytes(data) - .file_name(attachment.filename) - .mime_str(&format!("{}; length={}", &attachment.content_type, part_size))? - } else { - let file_path = std::path::Path::new(&attachment.filename); - let metadata = std::fs::metadata(file_path).map_err(|e| { - TracingClientError::IoError(format!("Failed to read file metadata: {}", e)) - })?; - let file_size = metadata.len(); - let file = std::fs::File::open(file_path) - .map_err(|e| TracingClientError::IoError(format!("Failed to open file: {}", e)))?; - - let file_name = file_path - .file_name() - .ok_or_else(|| { - TracingClientError::IoError("Failed to extract filename from path".to_string()) - })? - .to_string_lossy() - .into_owned(); - - Part::reader_with_length(file, file_size) - .file_name(file_name) - .mime_str(&format!("{}; length={}", &attachment.content_type, file_size))? - }; - - Ok(part) - } -} diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs deleted file mode 100644 index 4861ffe3d..000000000 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::sync::mpsc::{self, Receiver, Sender}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - -use reqwest::header::{HeaderMap, HeaderValue}; - -use super::processor::RunProcessor; -use crate::client::errors::TracingClientError; -use crate::client::run::{QueuedRun, RunEventBytes}; -use crate::client::run::{RunCreateExtended, RunUpdateExtended}; - -#[derive(Clone)] -pub struct ClientConfig { - pub endpoint: String, - pub api_key: String, - pub queue_capacity: usize, - pub batch_size: usize, - pub batch_timeout: Duration, - pub headers: Option, - pub num_worker_threads: usize, -} - -pub struct TracingClient { - sender: Sender, - drain: Mutex>, - handles: Vec>, // Handles to worker threads -} - -impl TracingClient { - pub fn new(mut config: ClientConfig) -> Result { - let (sender, receiver) = mpsc::channel::(); - let (drain_sender, drain_receiver) = mpsc::channel::<()>(); - let receiver = Arc::new(Mutex::new(receiver)); - - // Ensure our headers include the API key. - config.headers.get_or_insert_with(Default::default).append( - "X-API-KEY", - HeaderValue::from_str(&config.api_key).expect("failed to convert API key into header"), - ); - - // We're going to share the config across threads. - // It's immutable from this point onward, so Arc it for efficiency. - let config = Arc::from(config); - - let mut handles = Vec::new(); - - for _ in 0..config.num_worker_threads { - let worker_receiver = Arc::clone(&receiver); - let worker_config = Arc::clone(&config); - let cloned_drain_sender = drain_sender.clone(); - - let handle = thread::spawn(move || { - let processor = - RunProcessor::new(worker_receiver, cloned_drain_sender, worker_config); - processor.run().expect("run failed"); - }); - - handles.push(handle); - } - - Ok(Self { sender, drain: drain_receiver.into(), handles }) - } - - pub fn submit_run_create(&self, run: RunCreateExtended) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::Create(run); - - self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull) - } - - // Similar methods for submit_run_update and submit_run_bytes - - pub fn submit_run_bytes(&self, run_bytes: RunEventBytes) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::RunBytes(run_bytes); - - self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull) - } - - pub fn submit_run_update(&self, run: RunUpdateExtended) -> Result<(), TracingClientError> { - let queued_run = QueuedRun::Update(run); - - self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull) - } - - /// Complete all in-progress requests, then allow the worker threads to exit. - /// - /// Convenience function for the PyO3 bindings, which cannot use [`Self::shutdown`] - /// due to its by-value `self`. This means we cannot `.join()` the threads, - /// but the client is nevertheless unusable after this call. - /// - /// Sending further data after a [`Self::drain()`] call has unspecified behavior. - /// It will not cause *undefined behavior* in the programming language sense, - /// but it may e.g. cause errors, panics, or even silently fail, with no guarantees. - pub fn drain(&self) -> Result<(), TracingClientError> { - for _ in &self.handles { - self.sender.send(QueuedRun::Drain).map_err(|_| TracingClientError::QueueFull)?; - } - - let drain_guard = self.drain.lock().expect("locking failed"); - for _ in &self.handles { - drain_guard.recv().expect("failed to receive drained message"); - } - drop(drain_guard); - - Ok(()) - } - - pub fn shutdown(self) -> Result<(), TracingClientError> { - // Send a Shutdown message to each worker thread - for _ in &self.handles { - self.sender.send(QueuedRun::Shutdown).map_err(|_| TracingClientError::QueueFull)?; - } - - // Wait for all worker threads to finish - for handle in self.handles { - handle.join().unwrap(); - } - - Ok(()) - } -} diff --git a/rust/crates/langsmith-tracing-client/src/client/mod.rs b/rust/crates/langsmith-tracing-client/src/client/mod.rs index b07ad7722..7a4159454 100644 --- a/rust/crates/langsmith-tracing-client/src/client/mod.rs +++ b/rust/crates/langsmith-tracing-client/src/client/mod.rs @@ -1,12 +1,11 @@ mod errors; mod run; -pub mod async_enabled; -pub mod blocking; -pub mod streaming; +mod streaming; pub use errors::TracingClientError; pub use run::{ - Attachment, EventType, RunCommon, RunCreate, RunCreateExtended, RunEventBytes, RunIO, - RunUpdate, RunUpdateExtended, TimeValue, + Attachment, RunCommon, RunCreate, RunCreateExtended, RunIO, RunUpdate, RunUpdateExtended, + TimeValue, }; +pub use streaming::{ClientConfig, TracingClient}; diff --git a/rust/crates/langsmith-tracing-client/src/client/run.rs b/rust/crates/langsmith-tracing-client/src/client/run.rs index f1484e50e..5eb6f58cc 100644 --- a/rust/crates/langsmith-tracing-client/src/client/run.rs +++ b/rust/crates/langsmith-tracing-client/src/client/run.rs @@ -25,6 +25,7 @@ pub struct RunIO { } impl RunIO { + #[allow(dead_code)] #[inline] pub(crate) fn merge(&mut self, other: RunIO) { if other.inputs.is_some() { @@ -52,6 +53,7 @@ pub struct RunCommon { } impl RunCommon { + #[allow(dead_code)] #[inline] pub(crate) fn merge(&mut self, other: RunCommon) { if other.parent_run_id.is_some() { @@ -113,28 +115,10 @@ pub struct RunUpdateExtended { pub attachments: Option>, } -#[derive(Debug)] -pub struct RunEventBytes { - pub run_id: String, - pub event_type: EventType, - pub run_bytes: Vec, - pub inputs_bytes: Option>, - pub outputs_bytes: Option>, - pub attachments: Option>, -} - -#[derive(Debug)] -pub enum EventType { - Create, - Update, -} - #[derive(Debug)] pub(crate) enum QueuedRun { Create(RunCreateExtended), Update(RunUpdateExtended), - #[expect(dead_code)] - RunBytes(RunEventBytes), Drain, // Like `Shutdown`, but explicitly sends a message confirming draining is complete. Shutdown, } diff --git a/rust/crates/langsmith-tracing-client/src/client/streaming/processor.rs b/rust/crates/langsmith-tracing-client/src/client/streaming/processor.rs index 01ae2cacd..10fb0ebed 100644 --- a/rust/crates/langsmith-tracing-client/src/client/streaming/processor.rs +++ b/rust/crates/langsmith-tracing-client/src/client/streaming/processor.rs @@ -240,9 +240,6 @@ impl RunProcessor { Ok(()) } - QueuedRun::RunBytes(_) => { - unimplemented!("RunBytes is not supported") - } QueuedRun::Drain => { unreachable!("drain message that wasn't handled earlier"); } diff --git a/rust/crates/langsmith-tracing-client/src/main.rs b/rust/crates/langsmith-tracing-client/src/main.rs deleted file mode 100644 index b9bc17185..000000000 --- a/rust/crates/langsmith-tracing-client/src/main.rs +++ /dev/null @@ -1,201 +0,0 @@ -#![expect(unused_imports)] - -use std::fs::File; -use std::io::Write; - -use langsmith_tracing_client::client::async_enabled::{ClientConfig, TracingClient}; -use langsmith_tracing_client::client::{ - Attachment, RunCommon, RunCreate, RunCreateExtended, RunIO, RunUpdate, RunUpdateExtended, - TimeValue, -}; -use rayon::prelude::*; -use reqwest::header::{HeaderMap, HeaderValue}; -use serde_json::Value; -use tempfile::TempDir; -use tokio::time::Duration; -use uuid::Uuid; - -// #[tokio::main] -// async fn main() -> Result<(), Box> { -// let tmp_dir = TempDir::new().unwrap(); -// let test_file_path = tmp_dir.path().join("test_file_create.txt"); -// let mut test_file = File::create(&test_file_path).unwrap(); -// writeln!(test_file, "Test file content for create").unwrap(); -// -// let mut attachments = Vec::new(); -// attachments.push(Attachment { -// ref_name: "attachment_1".to_string(), -// filename: "file1.txt".to_string(), -// data: Some(vec![1, 2, 3]), -// content_type: "application/octet-stream".to_string(), -// }); -// attachments.push(Attachment { -// ref_name: "attachment_2".to_string(), -// filename: test_file_path.into_os_string().into_string().unwrap(), -// data: None, // this will cause the processor to read from disk -// content_type: "text/plain".to_string(), -// }); -// -// let run_id = Uuid::new_v4().to_string(); -// println!("Run ID: {}", run_id); -// -// let run_create = RunCreateExtended { -// run_create: RunCreate { -// common: RunCommon { -// id: String::from(&run_id), -// trace_id: String::from(&run_id), -// dotted_order: String::from("20241009T223747383001Z{}".to_string() + &run_id), -// parent_run_id: None, -// extra: Some(serde_json::json!({"extra_data": "value"})), -// error: None, -// serialized: None, -// events: Some(serde_json::json!([{ "event": "event_data" }])), -// tags: Some(serde_json::json!(["tag1", "tag2"])), -// session_id: None, -// session_name: Some("Rust Session Name".to_string()), -// }, -// name: String::from("Rusty"), -// start_time: TimeValue::UnsignedInt(1728513467383), -// end_time: Some(TimeValue::UnsignedInt(1728513468236)), -// run_type: String::from("chain"), -// reference_example_id: None, -// }, -// attachments: Some(attachments), -// io: RunIO { -// inputs: Some(serde_json::json!({"input": "value"})), -// outputs: Some(serde_json::json!({"output": "value"})), -// }, -// }; -// -// let mut attachments_two = Vec::new(); -// attachments_two.push(Attachment { -// ref_name: "attachment_1".to_string(), -// filename: "file1.txt".to_string(), -// data: Some(vec![1, 2, 3]), -// content_type: "application/octet-stream".to_string(), -// }); -// -// let run_id_two = Uuid::new_v4().to_string(); -// println!("Run ID Two: {}", run_id_two); -// let run_create_two = RunCreateExtended { -// run_create: RunCreate { -// common: RunCommon { -// id: String::from(&run_id_two), -// trace_id: String::from(&run_id_two), -// dotted_order: String::from("20241009T223747383001Z{}".to_string() + &run_id_two), -// parent_run_id: None, -// extra: Some(serde_json::json!({"extra_data": "value"})), -// error: None, -// serialized: None, -// events: Some(serde_json::json!([{ "event": "event_data" }])), -// tags: Some(serde_json::json!(["tag1", "tag2"])), -// session_id: None, -// session_name: Some("Rust Session Name".to_string()), -// }, -// name: String::from("Rusty two"), -// start_time: TimeValue::UnsignedInt(1728513467383), -// end_time: None, -// run_type: String::from("chain"), -// reference_example_id: None, -// }, -// attachments: Some(attachments_two), -// io: RunIO { -// inputs: Some(serde_json::json!({"input": "value"})), -// outputs: None, -// }, -// }; -// -// let run_update_two = RunUpdateExtended { -// run_update: RunUpdate { -// common: RunCommon { -// id: String::from(&run_id_two), -// trace_id: String::from(&run_id_two), -// dotted_order: String::from("20241009T223747383001Z{}".to_string() + &run_id_two), -// parent_run_id: None, -// extra: Some(serde_json::json!({"extra_data": "value"})), -// error: None, -// serialized: None, -// events: None, -// tags: Some(serde_json::json!(["tag1", "tag2"])), -// session_id: None, -// session_name: Some("Rust Session Name".to_string()), -// }, -// end_time: TimeValue::UnsignedInt(1728513468236), -// }, -// io: RunIO { -// inputs: None, -// outputs: Some(serde_json::json!({"output": "value"})), -// }, -// attachments: None, -// }; -// -// let mut headers = HeaderMap::new(); -// headers.insert("X-API-KEY", HeaderValue::from_static("test_key")); -// let config = ClientConfig { -// endpoint: String::from("http://localhost:1984"), -// queue_capacity: 10, -// batch_size: 5, // batch size is 5 to ensure shutdown flushes the queue -// batch_timeout: Duration::from_secs(1), -// headers: None, -// }; -// -// let client = TracingClient::new(config).unwrap(); -// client.submit_run_create(run_create).await.unwrap(); -// client.submit_run_create(run_create_two).await.unwrap(); -// client.submit_run_update(run_update_two).await.unwrap(); -// -// client.shutdown().await.unwrap(); -// Ok(()) -// } - -fn create_large_json(len: usize) -> Value { - let large_array: Vec = (0..len) - .map(|i| { - serde_json::json!({ - "index": i, - "data": format!("This is element number {}", i), - "nested": { - "id": i, - "value": format!("Nested value for element {}", i), - } - }) - }) - .collect(); - - serde_json::json!({ - "name": "Huge JSON", - "description": "This is a very large JSON object for benchmarking purposes.", - "array": large_array, - "metadata": { - "created_at": "2024-10-22T19:00:00Z", - "author": "Rust Program", - "version": 1.0 - } - }) -} - -// Sequential processing -fn benchmark_sequential(data: &[Value]) -> Vec> { - data.iter().map(|json| serde_json::to_vec(json).expect("Failed to serialize JSON")).collect() -} - -// Parallel processing -fn benchmark_parallel(data: &[Value]) -> Vec> { - data.par_iter() - .map(|json| serde_json::to_vec(json).expect("Failed to serialize JSON")) - .collect() -} - -fn main() { - let num_json_objects = 1000; - let json_length = 3000; - let data: Vec = (0..num_json_objects).map(|_| create_large_json(json_length)).collect(); - - let start = std::time::Instant::now(); - let _ = benchmark_parallel(&data); - println!("Parallel serialization took: {:?}", start.elapsed()); - - let start = std::time::Instant::now(); - let _ = benchmark_sequential(&data); - println!("Sequential serialization took: {:?}", start.elapsed()); -} diff --git a/rust/crates/langsmith-tracing-client/tests/tracing_client_test.rs b/rust/crates/langsmith-tracing-client/tests/tracing_client_test.rs deleted file mode 100644 index 2b66b5fa1..000000000 --- a/rust/crates/langsmith-tracing-client/tests/tracing_client_test.rs +++ /dev/null @@ -1,341 +0,0 @@ -use std::error::Error; -use std::fs::File; -use std::io::Write; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -use mockito::Server; -use multer::Multipart; -use reqwest::header::{HeaderMap, HeaderValue}; -use serde_json::{from_str, json, to_vec, Value}; -use tempfile::TempDir; - -use langsmith_tracing_client::client::async_enabled::{ClientConfig, TracingClient}; -use langsmith_tracing_client::client::{ - Attachment, RunCommon, RunCreate, RunCreateExtended, RunIO, RunUpdate, RunUpdateExtended, - TimeValue, -}; - -#[derive(Debug)] -struct MultipartField { - name: String, - content_type: Option, - filename: Option, - data: String, -} - -async fn handle_request(body: Vec, content_type_str: String) -> Vec { - assert!(content_type_str.starts_with("multipart/form-data")); - - let boundary = content_type_str.split("boundary=").nth(1).unwrap(); - let stream = futures::stream::once(async move { - Ok::<_, Box>(multer::bytes::Bytes::copy_from_slice( - body.as_slice(), - )) - }); - let mut mp = Multipart::new(stream, boundary); - - let mut fields = Vec::new(); - - while let Some(field) = mp.next_field().await.expect("reading failed") { - let field_name = field.name().expect("field had no name").to_string(); - let field_content_type = field.content_type().map(|ct| ct.to_string()); - let field_filename = field.file_name().map(String::from); - - let content = - String::from_utf8(field.bytes().await.expect("failed to read field bytes").into()) - .expect("failed to turn field data into string"); - - let multipart_field = MultipartField { - name: field_name, - content_type: field_content_type, - filename: field_filename, - data: content, - }; - - fields.push(multipart_field); - } - - fields -} - -#[tokio::test] -async fn test_tracing_client_submit_run_create() { - let mut server = Server::new_async().await; - // NOTE: we can't use an async mutex here because mockito doesn't support async functions - // in `with_body_from_request`. - let captured_request: Arc, String)>> = - Arc::new(Mutex::new((Vec::new(), String::new()))); - let captured_request_clone = Arc::clone(&captured_request); - - let m = server - .mock("POST", "/runs/multipart") - .expect(1) - .with_status(200) - .with_body_from_request(move |req| { - let mut request = captured_request_clone.lock().unwrap(); - request.0 = req.body().unwrap().to_vec(); - let content_type_headers = req.header("content-type"); - let content_type_str: String = content_type_headers - .iter() - .filter_map(|h| h.to_str().ok()) - .collect::>() - .join(", "); - request.1 = content_type_str; - vec![] // return empty response body - }) - .create_async() - .await; - - let config = ClientConfig { - endpoint: server.url(), - queue_capacity: 10, - batch_size: 5, // batch size is 5 to ensure shutdown flushes the queue - batch_timeout: Duration::from_secs(1), - headers: None, - }; - - let client = TracingClient::new(config).unwrap(); - - // Write a test file to disk for streaming - let tmp_dir = TempDir::new().unwrap(); - let test_file_path = tmp_dir.path().join("test_file_create.txt"); - let mut test_file = File::create(&test_file_path).unwrap(); - writeln!(test_file, "Test file content for create").unwrap(); - - let attachments = vec![ - Attachment { - ref_name: "attachment_1".to_string(), - filename: "file1.txt".to_string(), - data: Some(vec![1, 2, 3]), - content_type: "application/octet-stream".to_string(), - }, - Attachment { - ref_name: "attachment_2".to_string(), - filename: test_file_path.into_os_string().into_string().unwrap(), - data: None, // this will cause the processor to read from disk - content_type: "text/plain".to_string(), - }, - ]; - - let run_create = RunCreateExtended { - run_create: RunCreate { - common: RunCommon { - id: String::from("test_id"), - trace_id: String::from("trace_id"), - dotted_order: String::from("1.1"), - parent_run_id: None, - extra: Some(json!({"extra_data": "value"})), - error: None, - serialized: Some(json!({"key": "value"})), - events: Some(Value::from(vec![json!({"event": "event_data"})])), - tags: Some(Value::from(vec!["tag1", "tag2"])), - session_id: None, - session_name: Some("Session Name".to_string()), - }, - name: String::from("Run Name"), - start_time: TimeValue::UnsignedInt(1697462400000), - end_time: Some(TimeValue::UnsignedInt(1697466000000)), - run_type: String::from("chain"), - reference_example_id: None, - }, - attachments: Some(attachments), - io: RunIO { - inputs: Some(to_vec(&json!({"input": "value"})).expect("to_vec failed")), - outputs: Some(to_vec(&json!({"output": "value"})).expect("to_vec failed")), - }, - }; - - client.submit_run_create(run_create).await.unwrap(); - - // shutdown the client to ensure all messages are processed - client.shutdown().await.unwrap(); - m.assert_async().await; - - let req = captured_request.lock().unwrap().clone(); - let fields = handle_request(req.0, req.1).await; - - assert_eq!(fields.len(), 5); - - // assert run fields - assert_eq!(fields[0].name, "post.test_id"); - assert_eq!(fields[0].content_type, Some("application/json; length=375".to_string())); - assert_eq!(fields[0].filename, None); - let received_run: Value = from_str(&fields[0].data).unwrap(); - assert_eq!(received_run["id"], "test_id"); - assert_eq!(received_run["trace_id"], "trace_id"); - assert_eq!(received_run["dotted_order"], "1.1"); - assert_eq!(received_run["parent_run_id"], json!(null)); - assert_eq!(received_run["extra"], json!({"extra_data": "value"})); - assert_eq!(received_run["error"], json!(null)); - assert_eq!(received_run["serialized"], json!({"key": "value"})); - assert_eq!(received_run["events"], Value::from(vec![json!({"event": "event_data"})])); - assert_eq!(received_run["tags"], Value::from(vec!["tag1", "tag2"])); - assert_eq!(received_run["session_name"], "Session Name"); - assert_eq!(received_run["session_id"], json!(null)); - assert_eq!(received_run["name"], "Run Name"); - assert_eq!(received_run["start_time"], 1697462400000i64); - assert_eq!(received_run["end_time"], 1697466000000i64); - assert_eq!(received_run["run_type"], "chain"); - assert_eq!(received_run["reference_example_id"], json!(null)); - - // assert inputs fields - assert_eq!(fields[1].name, "post.test_id.inputs"); - assert_eq!(fields[1].content_type, Some("application/json; length=17".to_string())); - assert_eq!(fields[1].filename, None); - let received_inputs: Value = from_str(&fields[1].data).unwrap(); - assert_eq!(received_inputs, json!({"input": "value"})); - - // assert outputs fields - assert_eq!(fields[2].name, "post.test_id.outputs"); - assert_eq!(fields[2].content_type, Some("application/json; length=18".to_string())); - assert_eq!(fields[2].filename, None); - let received_outputs: Value = from_str(&fields[2].data).unwrap(); - assert_eq!(received_outputs, json!({"output": "value"})); - - // assert attachment_1 fields - assert_eq!(fields[3].name, "attachment.test_id.attachment_1"); - assert_eq!(fields[3].content_type, Some("application/octet-stream; length=3".to_string())); - assert_eq!(fields[3].filename, Some("file1.txt".to_string())); - assert_eq!(fields[3].data, "\u{1}\u{2}\u{3}"); - - // assert attachment_2 fields - assert_eq!(fields[4].name, "attachment.test_id.attachment_2"); - assert_eq!(fields[4].content_type, Some("text/plain; length=29".to_string())); - assert_eq!(fields[4].filename, Some("test_file_create.txt".to_string())); - assert_eq!(fields[4].data, "Test file content for create\n"); -} - -#[tokio::test] -async fn test_tracing_client_submit_run_update() { - // NOTE: we can't use an async mutex here because mockito doesn't support async functions - // in `with_body_from_request`. - let mut server = Server::new_async().await; - let captured_request: Arc, String)>> = - Arc::new(Mutex::new((Vec::new(), String::new()))); - let captured_request_clone = Arc::clone(&captured_request); - - let m = server - .mock("POST", "/runs/multipart") - .expect(1) - .with_status(200) - .with_body_from_request(move |req| { - let mut request = captured_request_clone.lock().unwrap(); - request.0 = req.body().unwrap().to_vec(); - let content_type_headers = req.header("content-type"); - let content_type_str: String = content_type_headers - .iter() - .filter_map(|h| h.to_str().ok()) - .collect::>() - .join(", "); - request.1 = content_type_str; - - let auth_headers = req.header("X-API-KEY"); - assert!(auth_headers.iter().any(|h| h.to_str().unwrap() == "test_key")); - vec![] // return empty response body - }) - .create_async() - .await; - - let mut headers = HeaderMap::new(); - headers.insert("X-API-KEY", HeaderValue::from_static("test_key")); - let config = ClientConfig { - endpoint: server.url(), - queue_capacity: 10, - batch_size: 5, // batch size is 5 to ensure shutdown flushes the queue - batch_timeout: Duration::from_secs(1), - headers: Some(headers), - }; - - let client = TracingClient::new(config).unwrap(); - - // Write a test file to disk for streaming - let tmp_dir = TempDir::new().unwrap(); - let test_file_path = tmp_dir.path().join("test_file_update.txt"); - let mut test_file = File::create(&test_file_path).unwrap(); - writeln!(test_file, "Test file content for update").unwrap(); - - let attachments = vec![ - Attachment { - ref_name: "attachment_1".to_string(), - filename: "file1_update.txt".to_string(), - data: Some(vec![4, 5, 6]), - content_type: "application/octet-stream".to_string(), - }, - Attachment { - ref_name: "attachment_2".to_string(), - filename: test_file_path.to_string_lossy().into_owned(), - data: None, // this will cause the processor to read from disk - content_type: "text/plain".to_string(), - }, - ]; - - let run_update = RunUpdateExtended { - run_update: RunUpdate { - common: RunCommon { - id: String::from("test_id"), - trace_id: String::from("trace_id"), - dotted_order: String::from("1.1"), - parent_run_id: None, - extra: Some(json!({"extra_data": "value"})), - error: None, - serialized: Some(json!({"key": "value"})), - events: Some(Value::from(vec![json!({"event": "event_data"})])), - tags: Some(Value::from(vec!["tag1", "tag2"])), - session_id: None, - session_name: Some("Session Name".to_string()), - }, - end_time: TimeValue::String("2024-10-16T12:00:00Z".to_string()), - }, - attachments: Some(attachments), - io: RunIO { - inputs: None, - outputs: Some(to_vec(&json!({"updated_output": "value"})).expect("to_vec failed")), - }, - }; - - client.submit_run_update(run_update).await.unwrap(); - - // shutdown the client to ensure all messages are processed - client.shutdown().await.unwrap(); - m.assert_async().await; - - let req = captured_request.lock().unwrap().clone(); - let fields = handle_request(req.0, req.1).await; - - assert_eq!(fields.len(), 4); - - // assert run fields - assert_eq!(fields[0].name, "patch.test_id"); - assert_eq!(fields[0].content_type, Some("application/json; length=292".to_string())); - assert_eq!(fields[0].filename, None); - let received_run: Value = from_str(&fields[0].data).unwrap(); - assert_eq!(received_run["id"], "test_id"); - assert_eq!(received_run["trace_id"], "trace_id"); - assert_eq!(received_run["extra"], json!({"extra_data": "value"})); - assert_eq!(received_run["error"], json!(null)); - assert_eq!(received_run["serialized"], json!({"key": "value"})); - assert_eq!(received_run["events"], Value::from(vec![json!({"event": "event_data"})])); - assert_eq!(received_run["tags"], Value::from(vec!["tag1", "tag2"])); - assert_eq!(received_run["session_name"], "Session Name"); - assert_eq!(received_run["end_time"], "2024-10-16T12:00:00Z"); - - // assert outputs fields - assert_eq!(fields[1].name, "patch.test_id.outputs"); - assert_eq!(fields[1].content_type, Some("application/json; length=26".to_string())); - assert_eq!(fields[1].filename, None); - let received_outputs: Value = from_str(&fields[1].data).unwrap(); - assert_eq!(received_outputs, json!({"updated_output": "value"})); - - // assert attachment_1 fields - assert_eq!(fields[2].name, "attachment.test_id.attachment_1"); - assert_eq!(fields[2].content_type, Some("application/octet-stream; length=3".to_string())); - assert_eq!(fields[2].filename, Some("file1_update.txt".to_string())); - assert_eq!(fields[2].data, "\u{4}\u{5}\u{6}"); - - // assert attachment_2 fields - assert_eq!(fields[3].name, "attachment.test_id.attachment_2"); - assert_eq!(fields[3].content_type, Some("text/plain; length=29".to_string())); - assert_eq!(fields[3].filename, Some("test_file_update.txt".to_string())); - assert_eq!(fields[3].data, "Test file content for update\n"); -}