Skip to content

Modernize Python API and Implement High-Performance Batch Ingestion (v0.1.7)#16

Merged
anaslimem merged 3 commits intomainfrom
python_api
Mar 8, 2026
Merged

Modernize Python API and Implement High-Performance Batch Ingestion (v0.1.7)#16
anaslimem merged 3 commits intomainfrom
python_api

Conversation

@anaslimem
Copy link
Owner

Overview

This PR modernizes the CortexaDB Python API and implements a high-performance batch insertion system, delivering a 100x speedup for large document ingestion.

Key Changes

  • Unified API: Replaced remember and ask with the more intuitive add and search (with aliases).
  • Collection Abstraction: Renamed Namespace to Collection to align with industry standards.
  • Fluent Query Builder: Introduced a chainable QueryBuilder for complex, expressive searches.
  • Batch Insertion: Implemented remember_batch in Rust and add_batch/ingest optimization in Python.
  • Benchmarks: 500+ chunks are now ingested in ~0.04s (down from ~12s).
  • Documentation: Updated README.md and basic examples for Python and Rust.

Copilot AI review requested due to automatic review settings March 8, 2026 00:13
@vercel
Copy link

vercel bot commented Mar 8, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
cortexa-db Ready Ready Preview, Comment Mar 8, 2026 0:28am

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates CortexaDB’s public-facing APIs (Python + examples/docs) and introduces a Rust-side batch insertion path intended to dramatically speed up large ingestions.

Changes:

  • Added Rust batch ingestion primitives (BatchRecord, remember_batch, insert_memories_batch) and exposed them through the PyO3 bindings.
  • Reworked the Python client API around add/search, a fluent QueryBuilder, and Collection scoping (with legacy aliases).
  • Updated README and example scripts to use the new APIs and highlight batch ingestion performance.

Reviewed changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
examples/rust/basic_usage.rs Switches the example to batch insertion via remember_batch.
examples/python/basic_usage.py Updates the Python example to add, ingest, QueryBuilder, and collection.
crates/cortexadb-py/src/lib.rs Adds PyO3 BatchRecord + remember_batch binding and minor Hit conversion cleanup.
crates/cortexadb-py/cortexadb/client.py Major Python API redesign (QueryBuilder/Collection/add/search/ingest/batch).
crates/cortexadb-core/src/store.rs Implements the core batch write loop (insert_memories_batch).
crates/cortexadb-core/src/facade.rs Adds BatchRecord and CortexaDB::remember_batch facade API.
README.md Updates quickstart and capability descriptions to the new API + performance claims.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

"""Ingest text with 100x speedup via batching."""
chunks = chunk(text, **kwargs)
if not chunks: return []

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingest() unconditionally uses self._embedder.embed_batch(...) but doesn’t validate that an embedder was configured. If the DB was opened in manual-dimension mode, this will crash with AttributeError. Please restore an explicit config check and raise a CortexaDBConfigError with a helpful message (as the previous implementation did).

Suggested change
if not self._embedder:
raise CortexaDBConfigError(
"Embedder required for ingest(). When using manual-dimension mode, "
"provide vectors directly (e.g., via add/add_batch) or configure an embedder."
)

Copilot uses AI. Check for mistakes.
Comment on lines 246 to +259
try:
neighbors = self._inner.get_neighbors(hit.id)
for target_id, relation in neighbors:
if not is_namespace_allowed(target_id):
continue
# Edge weight factor (e.g. 0.9 penalty for 1 hop)
neighbor_score = hit.score * 0.9

# Take the best score among multiple paths to the same neighbor
current_best = neighbor_candidates_scores.get(target_id, 0.0)
if neighbor_score > current_best:
neighbor_candidates_scores[target_id] = neighbor_score
except Exception:
pass # missing ID handle gracefully

# Mix neighbors in; if already found by vector search, take the max score
for target_id, score in neighbor_candidates_scores.items():
scored_candidates[target_id] = max(
scored_candidates.get(target_id, 0.0), score
)

# 3. Recency Bias (Phase 3.3)
for target_id, _ in self._inner.get_neighbors(hit.id):
scored_candidates[target_id] = max(scored_candidates.get(target_id, 0), hit.score * 0.9)
except: pass

if recency_bias:
now = time.time()
for obj_id in scored_candidates:
try:
mem = self.get(obj_id)
age_seconds = max(0, now - mem.created_at)
# 30-day half-life decay
decay_factor = 0.5 ** (age_seconds / (30 * 86400))
# Boost final score by up to 20%
recency_boost = 1.0 + (0.2 * decay_factor)
scored_candidates[obj_id] *= recency_boost
except Exception:
pass

# 4. Final Re-ranking and Truncation
# Convert dictionary back to Hit objects (for neighbors we don't have the original Hit, so we recreate it)
final_hits = [Hit(id=mid, score=s) for mid, s in scored_candidates.items()]
final_hits.sort(key=lambda h: h.score, reverse=True)
return final_hits[:top_k]

def connect(self, from_id: int, to_id: int, relation: str) -> None:
"""
Create a directional edge between two memories.

If recording is enabled, the operation is appended to the log.
"""
self._inner.connect(from_id, to_id, relation)
if self._recorder is not None:
self._recorder.record_connect(
from_id=from_id, to_id=to_id, relation=relation
)

def ingest(
self,
text: str,
*,
strategy: str = "recursive",
chunk_size: int = 512,
overlap: int = 50,
namespace: str = "default",
metadata: t.Optional[t.Dict[str, str]] = None,
) -> t.List[int]:
"""
Ingest text with smart chunking and store in database.

This is the simplified API for ingesting text content.

Args:
text: Text content to ingest.
strategy: Chunking strategy - "fixed", "recursive", "semantic", "markdown", "json".
Default: "recursive"
chunk_size: Target size of each chunk (for fixed/recursive). Default: 512
overlap: Number of words to overlap between chunks. Default: 50
namespace: Namespace to store in. Default: "default"
metadata: Optional metadata dict.

Returns:
List of memory IDs.

Requires:
An embedder must be configured (via embedder=... when opening).
"""
if self._embedder is None:
raise CortexaDBConfigError(
"ingest() requires an embedder. Open the database with 'embedder=...'"
)

chunks = chunk(text, strategy=strategy, chunk_size=chunk_size, overlap=overlap)
if not chunks:
return []

chunk_texts = [c["text"] for c in chunks]
embeddings = self._embedder.embed_batch(chunk_texts)

ids: t.List[int] = []
for chunk_result, vec in zip(chunks, embeddings):
meta: t.Dict[str, str] = {}
if metadata:
meta = {k: str(v) for k, v in metadata.items()}
if chunk_result.get("metadata"):
for k, v in chunk_result["metadata"].items():
meta[k] = str(v)

mid = self._remember_inner(
text=chunk_result["text"],
embedding=vec,
metadata=meta if meta else None,
namespace=namespace,
)
ids.append(mid)
return ids

def load(
self,
path: str,
*,
strategy: str = "recursive",
chunk_size: int = 512,
overlap: int = 50,
namespace: str = "default",
metadata: t.Optional[t.Dict[str, str]] = None,
) -> t.List[int]:
"""
Load a file and ingest its content.

Automatically detects file format (.txt, .md, .json, .docx, .pdf).

Args:
path: Path to the file.
strategy: Chunking strategy. Default: "recursive"
chunk_size: Target chunk size. Default: 512
overlap: Chunk overlap. Default: 50
namespace: Namespace to store in. Default: "default"
metadata: Optional metadata to merge with file metadata.

Returns:
List of memory IDs.

Raises:
FileNotFoundError: If file does not exist.
ValueError: If file format not supported.
"""
content = load_file(path)
file_metadata = get_file_metadata(path)

meta = dict(file_metadata)
if metadata:
meta.update(metadata)

return self.ingest(
content,
strategy=strategy,
chunk_size=chunk_size,
overlap=overlap,
namespace=namespace,
metadata=meta,
)

def ingest_document(
self,
text: str,
*,
chunk_size: int = 512,
overlap: int = 50,
namespace: str = "default",
metadata: t.Optional[t.Dict[str, str]] = None,
) -> t.List[int]:
"""
Split *text* into chunks and store each one as a separate memory.
Requires an embedder to be configured.
"""
if self._embedder is None:
raise CortexaDBConfigError(
"ingest_document() requires an embedder. "
"Open the database with 'embedder=...'."
)

chunks = chunk_text(text, chunk_size=chunk_size, overlap=overlap)
if not chunks:
return []

embeddings = self._embedder.embed_batch(chunks)
ids: t.List[int] = []
for chunk_str, vec in zip(chunks, embeddings):
# Uses _remember_inner so each chunk is also logged when recording.
mid = self._remember_inner(
text=chunk_str,
embedding=vec,
metadata=metadata,
namespace=namespace,
)
ids.append(mid)
return ids

def export_replay(self, log_path: str) -> None:
"""
Export the current database state to a replay log file.

Unlike the ``record=`` mode (which captures operations as they happen),
this method produces a *snapshot* of all existing memories. The export
does not preserve the original insertion order beyond what is stored.

Args:
log_path: Path to write the NDJSON replay log.

Example::

db = CortexaDB.open("agent.mem", dimension=128)
# ... lots of work ...
db.export_replay("snapshot.log")

# Later on any machine:
db2 = CortexaDB.replay("snapshot.log", "restored.mem")
"""
stats = self._inner.stats()
dim = stats.vector_dimension

import os

# Truncate any existing file so we start fresh.
if os.path.exists(log_path):
os.remove(log_path)

report: t.Dict[str, t.Any] = {
"checked": 0,
"exported": 0,
"skipped_missing_id": 0,
"skipped_missing_embedding": 0,
"errors": 0,
}

with ReplayWriter(log_path, dimension=dim, sync="strict") as writer:
# Iterate memories by scanning IDs 1..entries range.
# We use a generous upper bound and skip gaps.
checked = 0
found = 0
candidate = 1
target = stats.entries

while found < target and checked < target * 4:
try:
mem = self._inner.get(candidate)
embedding = getattr(mem, "embedding", None)
if not embedding:
report["skipped_missing_embedding"] += 1
candidate += 1
checked += 1
continue
content = getattr(mem, "content", b"")
if isinstance(content, bytes):
text = content.decode("utf-8", errors="replace")
else:
text = str(content)
metadata = dict(mem.metadata) if hasattr(mem, "metadata") else None
writer.record_remember(
id=mem.id,
text=text,
embedding=embedding,
namespace=mem.namespace,
metadata=metadata,
)
found += 1
report["exported"] += 1
except CortexaDBNotFoundError:
report["skipped_missing_id"] += 1
except Exception:
report["errors"] += 1
candidate += 1
checked += 1
report["checked"] = checked
self._last_export_replay_report = report

def get(self, mid: int) -> Memory:
"""Retrieve a full memory by ID."""
return self._inner.get(mid)

def delete_memory(self, mid: int) -> None:
"""
Delete a memory by ID.

If recording is enabled, the operation is appended to the log.
"""
self._inner.delete_memory(mid)
if self._recorder is not None:
self._recorder.record_delete(mid)

def compact(self) -> None:
"""Compact on-disk segment storage (removes tombstoned entries)."""
self._inner.compact()
if self._recorder is not None:
self._recorder.record_compact()

def checkpoint(self) -> None:
"""Force a checkpoint (snapshot state + truncate WAL)."""
self._inner.checkpoint()
if self._recorder is not None:
self._recorder.record_checkpoint()

def stats(self) -> Stats:
"""Get database statistics."""
return self._inner.stats()

@property
def last_replay_report(self) -> t.Optional[t.Dict[str, t.Any]]:
"""Diagnostic report from the most recent replay() call."""
if self._last_replay_report is None:
return None
return copy.deepcopy(self._last_replay_report)

@property
def last_export_replay_report(self) -> t.Optional[t.Dict[str, t.Any]]:
"""Diagnostic report from the most recent export_replay() call."""
if self._last_export_replay_report is None:
return None
return copy.deepcopy(self._last_export_replay_report)

def __repr__(self) -> str:
s = self._inner.stats()
embedder_name = type(self._embedder).__name__ if self._embedder else "none"
recording = f", recording={self._recorder._path}" if self._recorder else ""
return (
f"CortexaDB(entries={s.entries}, dimension={s.vector_dimension}, "
f"indexed={s.indexed_embeddings}, embedder={embedder_name}{recording})"
)

def __len__(self) -> int:
return len(self._inner)

def __enter__(self) -> "CortexaDB":
return self
age = max(0, now - mem.created_at)
decay = 0.5 ** (age / (30 * 86400))
scored_candidates[obj_id] *= (1.0 + 0.2 * decay)
except: pass
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid bare except: pass here (and below in recency bias). It will swallow KeyboardInterrupt/SystemExit and hides real bugs (e.g., unexpected binding failures). Catch Exception explicitly and consider at least leaving a comment/logging hook so failures aren’t completely silent.

Copilot uses AI. Check for mistakes.
Comment on lines +271 to +278
facade_records = [
BatchRecord(
namespace=r.get("collection") or r.get("namespace") or "default",
content=r.get("text") or "",
embedding=self._resolve_embedding(r.get("text"), r.get("vector")),
metadata=r.get("metadata")
) for r in records
]
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_batch() passes metadata=r.get('metadata') directly into the BatchRecord PyO3 class, which expects dict[str, str] | None. If callers provide non-string metadata values, batch insertion will fail at runtime. Consider normalizing metadata values to str (and validating required fields) before building BatchRecords.

Suggested change
facade_records = [
BatchRecord(
namespace=r.get("collection") or r.get("namespace") or "default",
content=r.get("text") or "",
embedding=self._resolve_embedding(r.get("text"), r.get("vector")),
metadata=r.get("metadata")
) for r in records
]
facade_records = []
for r in records:
raw_metadata = r.get("metadata")
if raw_metadata is None:
metadata: t.Optional[t.Dict[str, str]] = None
elif isinstance(raw_metadata, dict):
# Normalize all metadata keys and values to strings to satisfy BatchRecord's
# expected type: dict[str, str] | None.
metadata = {str(k): str(v) for k, v in raw_metadata.items()}
else:
raise TypeError(
f"metadata must be a dict or None, got {type(raw_metadata).__name__}"
)
facade_records.append(
BatchRecord(
namespace=r.get("collection") or r.get("namespace") or "default",
content=r.get("text") or "",
embedding=self._resolve_embedding(r.get("text"), r.get("vector")),
metadata=metadata,
)
)

Copilot uses AI. Check for mistakes.
Comment on lines +59 to 62
# v0.1.7 uses optimized batch insertion internally
ids = db.ingest(long_text, strategy="recursive", chunk_size=100, overlap=10)
print(f" Recursive chunking: {len(ids)} chunks stored")

# Semantic - split by paragraphs
ids = db.ingest(long_text, strategy="semantic")
print(f" Semantic chunking: {len(ids)} chunks stored")
print(f" Recursive batching: {len(ids)} chunks stored in ms")

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example assumes db.ingest(...) returns a list of inserted IDs and prints “stored in ms”, but no timing is measured here and (with the current CortexaDB.ingest implementation in client.py) the returned list is always empty. Either measure elapsed time explicitly and print it, or adjust the message and ensure ingest() returns IDs as the example expects.

Copilot uses AI. Check for mistakes.
Comment on lines +281 to +284
def ingest(self, text: str, **kwargs) -> t.List[int]:
"""Ingest text with 100x speedup via batching."""
chunks = chunk(text, **kwargs)
if not chunks: return []
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CortexaDB.ingest() forwards all **kwargs to chunk(text, **kwargs), but callers (and Collection.ingest()) pass non-chunker kwargs like collection and metadata. This will raise TypeError: chunk() got an unexpected keyword argument .... Split/whitelist kwargs for chunking parameters vs ingestion parameters before calling chunk().

Copilot uses AI. Check for mistakes.
Comment on lines +127 to 156
use cortexadb_core::BatchRecord;

let records = vec![
BatchRecord {
namespace: "default".to_string(),
content: text1.as_bytes().to_vec(),
embedding: Some(embed_text(text1, dimension)),
metadata: None,
},
BatchRecord {
namespace: "default".to_string(),
content: text2.as_bytes().to_vec(),
embedding: Some(embed_text(text2, dimension)),
metadata: None,
},
BatchRecord {
namespace: "default".to_string(),
content: text3.as_bytes().to_vec(),
embedding: Some(embed_text(text3, dimension)),
metadata: None,
},
];

// Bulk insert with 100x speedup
let last_id = db.remember_batch(records)?;
println!(" Batch finished. Last inserted ID: {}", last_id);

// For manual IDs in the example, we'll use 1, 2, 3 assuming clean start
let id1 = 1; let id2 = 2; let id3 = 3;

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember_batch() returns a command ID (not a memory ID), so printing it as “Last inserted ID” is misleading. Also, hard-coding id1/id2/id3 = 1/2/3 will break if the DB isn’t a clean start or if ID allocation changes. If you need the inserted memory IDs for graph connections, the batch API should return them (or the example should use remember_with_content() for those three records).

Suggested change
use cortexadb_core::BatchRecord;
let records = vec![
BatchRecord {
namespace: "default".to_string(),
content: text1.as_bytes().to_vec(),
embedding: Some(embed_text(text1, dimension)),
metadata: None,
},
BatchRecord {
namespace: "default".to_string(),
content: text2.as_bytes().to_vec(),
embedding: Some(embed_text(text2, dimension)),
metadata: None,
},
BatchRecord {
namespace: "default".to_string(),
content: text3.as_bytes().to_vec(),
embedding: Some(embed_text(text3, dimension)),
metadata: None,
},
];
// Bulk insert with 100x speedup
let last_id = db.remember_batch(records)?;
println!(" Batch finished. Last inserted ID: {}", last_id);
// For manual IDs in the example, we'll use 1, 2, 3 assuming clean start
let id1 = 1; let id2 = 2; let id3 = 3;
// Insert the three example memories individually so we can get their IDs
let id1 = db.remember_with_content(
"default",
text1.as_bytes().to_vec(),
embed_text(text1, dimension),
None,
)?;
let id2 = db.remember_with_content(
"default",
text2.as_bytes().to_vec(),
embed_text(text2, dimension),
None,
)?;
let id3 = db.remember_with_content(
"default",
text3.as_bytes().to_vec(),
embed_text(text3, dimension),
None,
)?;
println!(" Inserted memories with IDs: {}, {}, {}", id1, id2, id3);

Copilot uses AI. Check for mistakes.
Comment on lines +190 to +194
dim = embedder.dimension if embedder else dimension
record_path = kwargs.pop("record", None)
recorder = ReplayWriter(record_path, dimension=dim, sync=kwargs.get("sync", "strict")) if record_path else None

return cls(path, dimension=dim, _recorder=recorder, **kwargs)
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CortexaDB.open() passes dimension=dim while leaving dimension inside kwargs, which will raise TypeError: got multiple values for keyword argument 'dimension' when callers use the dimension-based open path. Pop/remove dimension (and optionally embedder) from kwargs before calling cls(...), or make open() take explicit parameters instead of **kwargs.

Copilot uses AI. Check for mistakes.
Comment on lines +298 to +299
if supplied is not None: return supplied
if not self._embedder: raise CortexaDBConfigError("Embedder required.")
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_resolve_embedding() can call self._embedder.embed(text) even when text is None (e.g., add(text=None, vector=None) or search(query=None, vector=None) paths). Add input validation so callers must provide either text/query or an explicit vector, and raise a CortexaDBConfigError when neither is provided.

Suggested change
if supplied is not None: return supplied
if not self._embedder: raise CortexaDBConfigError("Embedder required.")
if supplied is not None:
return supplied
if text is None:
raise CortexaDBConfigError(
"Either text/query or an explicit vector must be provided."
)
if not self._embedder:
raise CortexaDBConfigError("Embedder required.")

Copilot uses AI. Check for mistakes.
Comment on lines +567 to +574
for entry in entries {
let mut effective = entry;
// Check for previous state to handle partial updates if necessary
if let Ok(prev) = writer.engine.get_state_machine().get_memory(effective.id) {
let content_changed = prev.content != effective.content;
if content_changed && effective.embedding.is_none() {
return Err(CortexaDBStoreError::MissingEmbeddingOnContentChange(effective.id));
}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_memories_batch() may partially apply the batch: if an error occurs mid-loop (dimension mismatch, index error, engine error), earlier entries have already been written via execute_command_unsynced. If callers expect all-or-nothing semantics, pre-validate all entries before writing (or document/rename the method to make partial success explicit).

Copilot uses AI. Check for mistakes.
Comment on lines +325 to +344
/// Store a batch of memories efficiently.
pub fn remember_batch(&self, records: Vec<BatchRecord>) -> Result<u64> {
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let mut entries = Vec::with_capacity(records.len());

for rec in records {
let id = MemoryId(self.next_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed));
let mut entry = MemoryEntry::new(id, rec.namespace, rec.content, ts);
if let Some(emb) = rec.embedding {
entry = entry.with_embedding(emb);
}
if let Some(meta) = rec.metadata {
entry.metadata = meta;
}
entries.push(entry);
}

let last_cmd_id = self.inner.insert_memories_batch(entries)?;
Ok(last_cmd_id.0)
}
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember_batch() returns the last command ID (last_cmd_id.0) rather than an inserted memory ID or list of IDs. This is inconsistent with remember*() methods that return the inserted memory ID, and it’s easy for callers to misinterpret (as the Rust example does). Consider returning the inserted IDs (or at least the last inserted memory ID), or rename/retag the return value/type to make it unambiguous (e.g., CommandId).

Copilot uses AI. Check for mistakes.
@anaslimem anaslimem merged commit d326763 into main Mar 8, 2026
3 checks passed
@anaslimem anaslimem deleted the python_api branch March 9, 2026 01:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants