Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions crates/codegraph-graph/src/surrealdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,56 @@ impl SurrealDbStorage {
Ok(())
}

/// Resilient edge upsert for large codebases.
///
/// Pre-chunks the input into slices of [`EDGE_BATCH_CHUNK_SIZE`] before
/// sending to SurrealDB. If a chunk still fails with a WebSocket
/// connection-reset or an "excessive computation depth" error (both caused
/// by SurrealDB's `FOR … IN $batch` loop hitting internal limits), the
/// chunk is split in half and retried up to 3 times. This mirrors the
/// existing `upsert_chunk_embeddings_resilient` strategy.
///
/// Use this instead of `upsert_edges_batch` whenever the total number of
/// edges is not tightly bounded (e.g. during full project indexing).
pub async fn upsert_edges_batch_resilient(&mut self, edges: &[CodeEdge]) -> Result<()> {
if edges.is_empty() {
return Ok(());
}

// Iterative queue: (batch_slice, retries_remaining)
let mut queue: Vec<(Vec<CodeEdge>, u8)> = edges
.chunks(EDGE_BATCH_CHUNK_SIZE)
.map(|c| (c.to_vec(), 3u8))
.collect();

while let Some((batch, remaining)) = queue.pop() {
if batch.is_empty() {
continue;
}
match self.upsert_edges_batch(&batch).await {
Ok(()) => {}
Err(err) => {
let msg = err.to_string();
let msg_lower = msg.to_lowercase();
let recoverable = msg_lower.contains("excessive computation depth")
|| msg_lower.contains("computationdepth")
|| msg_lower.contains("connection reset")
|| msg_lower.contains("broken pipe");
if recoverable && remaining > 0 && batch.len() > 1 {
let mid = batch.len() / 2;
let (left, right) = batch.split_at(mid);
queue.push((right.to_vec(), remaining - 1));
queue.push((left.to_vec(), remaining - 1));
} else {
return Err(err);
}
}
}
}

Ok(())
}

pub async fn upsert_symbol_embeddings_batch(
&self,
records: &[SymbolEmbeddingRecord],
Expand Down Expand Up @@ -683,10 +733,12 @@ impl SurrealDbStorage {
// First, try a bulk INSERT (shallow query, no FOR loop)
if let Err(err) = self.insert_chunk_embeddings_batch(records).await {
let msg = err.to_string();
let duplicate = msg.to_lowercase().contains("duplicate");
let depth_hit = msg.contains("excessive computation depth")
|| msg.contains("ComputationDepth")
|| msg.contains("connection reset");
let msg_lower = msg.to_lowercase();
let duplicate = msg_lower.contains("duplicate");
let depth_hit = msg_lower.contains("excessive computation depth")
|| msg_lower.contains("computationdepth")
|| msg_lower.contains("connection reset")
|| msg_lower.contains("broken pipe");

// Only fall through to upsert/backoff on duplicate or depth issues; otherwise fail fast
if !duplicate && !depth_hit {
Expand All @@ -708,9 +760,11 @@ impl SurrealDbStorage {
Ok(()) => {}
Err(err) => {
let msg = err.to_string();
let depth_hit = msg.contains("excessive computation depth")
|| msg.contains("ComputationDepth")
|| msg.contains("connection reset");
let msg_lower = msg.to_lowercase();
let depth_hit = msg_lower.contains("excessive computation depth")
|| msg_lower.contains("computationdepth")
|| msg_lower.contains("connection reset")
|| msg_lower.contains("broken pipe");

if depth_hit && remaining > 0 && batch.len() > 1 {
let mid = batch.len() / 2;
Expand Down Expand Up @@ -919,7 +973,7 @@ impl SurrealDbStorage {
}

pub async fn add_code_edges(&mut self, edges: Vec<CodeEdge>) -> Result<()> {
self.upsert_edges_batch(&edges).await
self.upsert_edges_batch_resilient(&edges).await
}

pub async fn upsert_symbol_embedding(&self, record: SymbolEmbeddingUpsert<'_>) -> Result<()> {
Expand Down Expand Up @@ -2148,6 +2202,12 @@ FOR $doc IN $batch {
}
"#;

/// Maximum number of edges sent in a single SurrealDB `FOR … IN $batch` query.
/// Larger batches cause WebSocket connection resets or "excessive computation
/// depth" errors in SurrealDB. `upsert_edges_batch_resilient` pre-chunks to
/// this size and halves further on transient errors.
const EDGE_BATCH_CHUNK_SIZE: usize = 2_000;

const UPSERT_EDGES_QUERY: &str = r#"
LET $batch = $data;
FOR $doc IN $batch {
Expand Down
2 changes: 1 addition & 1 deletion crates/codegraph-mcp/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl SurrealWriterHandle {
}
if let Err(err) = {
let mut guard = storage.lock().await;
guard.upsert_edges_batch(&edges).await
guard.upsert_edges_batch_resilient(&edges).await
} {
error!("Surreal edge batch failed: {}", err);
last_error = Some(anyhow!(err.to_string()));
Expand Down