Skip to content

Commit

Permalink
fix(js): Further reduce blocking behavior, try naive stringification …
Browse files Browse the repository at this point in the history
…instead of circular detection (#1133)

Further investigation into #1101
  • Loading branch information
jacoblee93 authored Oct 26, 2024
1 parent f4886a4 commit 4945163
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 49 deletions.
2 changes: 1 addition & 1 deletion js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.2.1",
"version": "0.2.2",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "yarn@1.22.19",
"files": [
Expand Down
45 changes: 18 additions & 27 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ const handle429 = async (response?: Response) => {
return false;
};

export class Queue {
export class AutoBatchQueue {
items: {
action: "create" | "update";
payload: RunCreate | RunUpdate;
Expand Down Expand Up @@ -461,7 +461,7 @@ export class Client {

private autoBatchTracing = true;

private autoBatchQueue = new Queue();
private autoBatchQueue = new AutoBatchQueue();

private autoBatchTimeout: ReturnType<typeof setTimeout> | undefined;

Expand Down Expand Up @@ -755,7 +755,7 @@ export class Client {
}
}

private async _getBatchSizeLimitBytes() {
private async _getBatchSizeLimitBytes(): Promise<number> {
const serverInfo = await this._ensureServerInfo();
return (
this.batchSizeBytesLimit ??
Expand All @@ -764,21 +764,18 @@ export class Client {
);
}

private async drainAutoBatchQueue() {
const batchSizeLimit = await this._getBatchSizeLimitBytes();
private drainAutoBatchQueue(batchSizeLimit: number) {
while (this.autoBatchQueue.items.length > 0) {
for (let i = 0; i < this.traceBatchConcurrency; i++) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
await this.processBatch(batch, done);
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
void this._processBatch(batch, done).catch(console.error);
}
}

private async processBatch(batch: AutoBatchQueueItem[], done: () => void) {
private async _processBatch(batch: AutoBatchQueueItem[], done: () => void) {
if (!batch.length) {
done();
return;
Expand All @@ -803,10 +800,7 @@ export class Client {
}
}

private async processRunOperation(
item: AutoBatchQueueItem,
immediatelyTriggerBatch?: boolean
) {
private async processRunOperation(item: AutoBatchQueueItem) {
const oldTimeout = this.autoBatchTimeout;
clearTimeout(this.autoBatchTimeout);
this.autoBatchTimeout = undefined;
Expand All @@ -815,19 +809,14 @@ export class Client {
}
const itemPromise = this.autoBatchQueue.push(item);
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
if (
immediatelyTriggerBatch ||
this.autoBatchQueue.sizeBytes > sizeLimitBytes
) {
await this.drainAutoBatchQueue().catch(console.error);
if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) {
this.drainAutoBatchQueue(sizeLimitBytes);
}
if (this.autoBatchQueue.items.length > 0) {
this.autoBatchTimeout = setTimeout(
() => {
this.autoBatchTimeout = undefined;
// This error would happen in the background and is uncatchable
// from the outside. So just log instead.
void this.drainAutoBatchQueue().catch(console.error);
this.drainAutoBatchQueue(sizeLimitBytes);
},
oldTimeout
? this.autoBatchAggregationDelayMs
Expand Down Expand Up @@ -1232,9 +1221,11 @@ export class Client {
data.parent_run_id === undefined &&
this.blockOnRootRunFinalization
) {
// Trigger a batch as soon as a root trace ends and block to ensure trace finishes
// Trigger batches as soon as a root trace ends and wait to ensure trace finishes
// in serverless environments.
await this.processRunOperation({ action: "update", item: data }, true);
await this.processRunOperation({ action: "update", item: data }).catch(
console.error
);
return;
} else {
void this.processRunOperation({ action: "update", item: data }).catch(
Expand Down
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.2.1";
export const __version__ = "0.2.2";
52 changes: 32 additions & 20 deletions js/src/utils/fast-safe-stringify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,45 @@ function defaultOptions() {

// Regular stringify
export function stringify(obj, replacer?, spacer?, options?) {
if (typeof options === "undefined") {
options = defaultOptions();
}

decirc(obj, "", 0, [], undefined, 0, options);
var res;
try {
if (replacerStack.length === 0) {
res = JSON.stringify(obj, replacer, spacer);
} else {
res = JSON.stringify(obj, replaceGetterValues(replacer), spacer);
return JSON.stringify(obj, replacer, spacer);
} catch (e: any) {
// Fall back to more complex stringify if circular reference
if (!e.message?.includes("Converting circular structure to JSON")) {
console.warn("[WARNING]: LangSmith received unserializable value.");
return "[Unserializable]";
}
} catch (_) {
return JSON.stringify(
"[unable to serialize, circular reference is too complex to analyze]"
console.warn(
"[WARNING]: LangSmith received circular JSON. This will decrease tracer performance."
);
} finally {
while (arr.length !== 0) {
var part = arr.pop();
if (part.length === 4) {
Object.defineProperty(part[0], part[1], part[3]);
if (typeof options === "undefined") {
options = defaultOptions();
}

decirc(obj, "", 0, [], undefined, 0, options);
var res;
try {
if (replacerStack.length === 0) {
res = JSON.stringify(obj, replacer, spacer);
} else {
part[0][part[1]] = part[2];
res = JSON.stringify(obj, replaceGetterValues(replacer), spacer);
}
} catch (_) {
return JSON.stringify(
"[unable to serialize, circular reference is too complex to analyze]"
);
} finally {
while (arr.length !== 0) {
var part = arr.pop();
if (part.length === 4) {
Object.defineProperty(part[0], part[1], part[3]);
} else {
part[0][part[1]] = part[2];
}
}
}
return res;
}
return res;
}

function setReplace(replace, val, k, parent) {
Expand Down

0 comments on commit 4945163

Please sign in to comment.