Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove async and non-streaming blocking clients. #1419

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};

use pyo3::prelude::*;

use langsmith_tracing_client::client::streaming::{
use langsmith_tracing_client::client::{
ClientConfig as RustClientConfig, TracingClient as RustTracingClient,
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,21 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use langsmith_tracing_client::client::async_enabled::{ClientConfig, TracingClient};
use langsmith_tracing_client::client::blocking::{
ClientConfig as BlockingClientConfig, TracingClient as BlockingTracingClient,
};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use langsmith_tracing_client::client::{
Attachment, EventType, RunCommon, RunCreate, RunCreateExtended, RunEventBytes, RunIO, TimeValue,
Attachment, ClientConfig as BlockingClientConfig, RunCommon, RunCreate, RunCreateExtended,
RunIO, TimeValue, TracingClient as BlockingTracingClient,
};
use mockito::Server;
use serde_json::{json, Value};
use std::time::Duration;
use tokio::runtime::Runtime;

fn create_mock_client_config(server_url: &str, batch_size: usize) -> ClientConfig {
ClientConfig {
endpoint: server_url.to_string(),
queue_capacity: 1_000_000,
batch_size,
batch_timeout: Duration::from_secs(1),
headers: Default::default(),
}
}

fn create_mock_client_config_sync(server_url: &str, batch_size: usize) -> BlockingClientConfig {
BlockingClientConfig {
endpoint: server_url.to_string(),
api_key: "anything".into(),
queue_capacity: 1_000_000,
batch_size,
batch_timeout: Duration::from_secs(1),
send_at_batch_size: batch_size,
send_at_batch_time: Duration::from_secs(1),
headers: Default::default(),
num_worker_threads: 1,
compression_level: 1,
}
}

Expand Down Expand Up @@ -67,26 +53,6 @@ fn create_run_create(
}
}

fn create_run_bytes(
attachments: Option<Vec<Attachment>>,
inputs: Option<Value>,
outputs: Option<Value>,
) -> RunEventBytes {
let inputs_bytes = inputs.as_ref().map(|i| serde_json::to_vec(&i).unwrap());
let outputs_bytes = outputs.as_ref().map(|o| serde_json::to_vec(&o).unwrap());
let run_create = create_run_create(attachments, inputs, outputs);
let run_bytes = serde_json::to_vec(&run_create.run_create).unwrap();

RunEventBytes {
run_id: run_create.run_create.common.id,
event_type: EventType::Create,
run_bytes,
inputs_bytes,
outputs_bytes,
attachments: run_create.attachments,
}
}

fn create_large_json(len: usize) -> Value {
let large_array: Vec<Value> = (0..len)
.map(|i| {
Expand All @@ -113,186 +79,6 @@ fn create_large_json(len: usize) -> Value {
})
}

#[expect(dead_code)]
fn bench_run_create(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create");
for batch_size in [50] {
for json_len in [1_000, 5_000] {
for num_runs in [500, 1_000] {
group.bench_with_input(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
&(batch_size, json_len, num_runs),
|b, &(batch_size, json_len, num_runs)| {
b.to_async(&rt).iter_batched(
|| {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
);
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config =
create_mock_client_config(&server.url(), batch_size);
let client = TracingClient::new(client_config).unwrap();
(client, runs)
},
|(client, runs)| async {
for run in runs {
client.submit_run_create(black_box(run)).await.unwrap();
}
// shutdown the client to flush the queue
client.shutdown().await.unwrap();
},
BatchSize::LargeInput,
);
},
);
}
}
}
group.finish();
}

#[expect(dead_code, clippy::single_element_loop)]
fn bench_run_create_iter_custom(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create_custom_iter");
let server_url = server.url();
for batch_size in [100] {
for json_len in [3_000] {
for num_runs in [1_000] {
group.bench_function(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
|b| {
b.to_async(&rt).iter_custom(|iters| {
let mut elapsed_time = Duration::default();
let server_url = server_url.clone();
async move {
for _ in 0..iters {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
);
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config =
create_mock_client_config(&server_url, batch_size);
let client = TracingClient::new(client_config).unwrap();

let start = std::time::Instant::now();
for run in runs {
client.submit_run_create(black_box(run)).await.unwrap();
}

// shutdown the client to flush the queue
let start_shutdown = std::time::Instant::now();
println!("----------SHUTDOWN----------");
client.shutdown().await.unwrap();
println!("----------SHUTDOWN END----------");
println!(
"Elapsed time for shutdown: {:?}",
start_shutdown.elapsed()
);
elapsed_time += start.elapsed();
println!("Elapsed time: {:?}", elapsed_time);
}
elapsed_time
}
})
},
);
}
}
}
group.finish();
}

#[expect(dead_code)]
fn bench_run_bytes_iter_custom(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create_bytes_iter");
let server_url = server.url();
for batch_size in [50] {
for json_len in [1_000, 5_000] {
for num_runs in [500, 1_000] {
group.bench_function(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
|b| {
b.to_async(&rt).iter_custom(|iters| {
let mut elapsed_time = Duration::default();
let server_url = server_url.clone();
async move {
for _ in 0..iters {
let runs: Vec<RunEventBytes> = (0..num_runs)
.map(|_i| {
create_run_bytes(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
)
})
.collect();
let client_config =
create_mock_client_config(&server_url, batch_size);
let client = TracingClient::new(client_config).unwrap();

let start = std::time::Instant::now();
for run in runs {
client.submit_run_bytes(black_box(run)).await.unwrap();
}
// shutdown the client to flush the queue
client.shutdown().await.unwrap();
elapsed_time += start.elapsed();
}
elapsed_time
}
})
},
);
}
}
}
group.finish();
}

#[expect(unused_variables, clippy::single_element_loop)]
fn bench_run_create_sync_iter_custom(c: &mut Criterion) {
let server = {
Expand Down

This file was deleted.

Loading
Loading