Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class BaseKnowledgeSource(BaseModel, ABC):

chunk_size: int = 4000
chunk_overlap: int = 200
chunks: list[str] = Field(default_factory=list)
chunks: list[dict[str, Any]] = Field(default_factory=list)
chunk_embeddings: list[np.ndarray] = Field(default_factory=list)

model_config = ConfigDict(arbitrary_types_allowed=True)
Expand All @@ -39,7 +39,7 @@ def _chunk_text(self, text: str) -> list[str]:
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]

def _save_documents(self):
def _save_documents(self) -> None:
"""
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
Expand Down
192 changes: 147 additions & 45 deletions lib/crewai/src/crewai/knowledge/source/crew_docling_source.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,53 @@
from __future__ import annotations

from collections.abc import Iterator
import importlib
from pathlib import Path
from typing import Any
from urllib.parse import urlparse


# --- third-party/optional imports (OK to keep in try/except) ---
try:
from docling.datamodel.base_models import ( # type: ignore[import-not-found]
InputFormat,
)
from docling.document_converter import ( # type: ignore[import-not-found]
DocumentConverter,
)
from docling.exceptions import ConversionError # type: ignore[import-not-found]
from docling_core.transforms.chunker.hierarchical_chunker import ( # type: ignore[import-not-found]
HierarchicalChunker,
)
from docling_core.types.doc.document import ( # type: ignore[import-not-found]
DoclingDocument,
)

DOCLING_AVAILABLE = True
except ImportError:
DOCLING_AVAILABLE = False

from pydantic import Field
# Ensure the converter module is present too; otherwise the flag is misleading.
if DOCLING_AVAILABLE:
import importlib.util as _ilu

if (
_ilu.find_spec("docling.document_converter") is None
or _ilu.find_spec("docling.exceptions") is None
):
DOCLING_AVAILABLE = False

# --- regular imports must stay together, before any non-import statements ---
from pydantic import Field, PrivateAttr

from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger


# Safe default; will be overwritten at runtime if docling is present
DoclingConversionError: type[BaseException] | None = None


class CrewDoclingSource(BaseKnowledgeSource):
"""Default Source class for converting documents to markdown or json
This will auto support PDF, DOCX, and TXT, XLSX, Images, and HTML files without any additional dependencies and follows the docling package as the source of truth.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
if not DOCLING_AVAILABLE:
raise ImportError(
"The docling package is required to use CrewDoclingSource. "
Expand All @@ -48,11 +59,57 @@ def __init__(self, *args, **kwargs):

file_path: list[Path | str] | None = Field(default=None)
file_paths: list[Path | str] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
chunks: list[dict[str, Any]] = Field(default_factory=list)
safe_file_paths: list[Path | str] = Field(default_factory=list)
content: list[DoclingDocument] = Field(default_factory=list)
document_converter: DocumentConverter = Field(
default_factory=lambda: DocumentConverter(
content: list[Any] = Field(default_factory=list)
_aligned_paths: list[Path | str] = PrivateAttr(default_factory=list)
document_converter: Any = Field(default=None)

def model_post_init(self, __context: Any) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path

self.safe_file_paths = self.validate_content()

# Import docling pieces dynamically to avoid mypy missing-import issues.
try:
docling_mod = importlib.import_module("docling.document_converter")
except Exception as e:
raise ImportError(
"docling is partially installed: 'docling.document_converter' not found."
"Please install/upgrade docling: `uv add docling` ."
) from e
document_converter_cls = docling_mod.DocumentConverter

# Resolve ConversionError dynamically (no static import)
try:
exc_mod = importlib.import_module("docling.exceptions")
exc_cls = getattr(exc_mod, "ConversionError", None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
global DoclingConversionError
DoclingConversionError = exc_cls
else:
self._logger.log(
"warning",
"docling.exceptions.ConversionError not found or invalid; using generic handling.",
color="yellow",
)
DoclingConversionError = None
except Exception as err:
# Log instead of bare `pass` to satisfy ruff S110
self._logger.log(
"warning",
f"docling.exceptions not available ({err!s}); using generic handling.",
color="yellow",
)
DoclingConversionError = None

self.document_converter = document_converter_cls(
allowed_formats=[
InputFormat.MD,
InputFormat.ASCIIDOC,
Expand All @@ -62,48 +119,94 @@ def __init__(self, *args, **kwargs):
InputFormat.IMAGE,
InputFormat.XLSX,
InputFormat.PPTX,
InputFormat.CSV,
]
)
)

def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self.validate_content()
self.content = self._load_content()

def _load_content(self) -> list[DoclingDocument]:
def _load_content(self) -> list[Any]:
try:
return self._convert_source_to_docling_documents()
except ConversionError as e:
self._logger.log(
"error",
f"Error loading content: {e}. Supported formats: {self.document_converter.allowed_formats}",
"red",
)
raise e
except Exception as e:
self._logger.log("error", f"Error loading content: {e}")
raise e
if DoclingConversionError is not None and isinstance(
e, DoclingConversionError
):
self._logger.log(
"error",
f"Error loading content: {e}. Supported formats: {self.document_converter.allowed_formats}",
"red",
)
else:
self._logger.log("error", f"Error loading content: {e}")
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: ConversionError Import Failure Masks Exceptions

If docling.exceptions.ConversionError fails to import, the DoclingConversionError variable defaults to Exception. This causes the first except block in _load_content to catch all exceptions, making the subsequent except Exception block unreachable and misclassifying all errors as conversion failures.

Fix in Cursor Fix in Web


def add(self) -> None:
if self.content is None:
"""Convert each document to chunks, attach filepath metadata, and persist."""
if not self.content:
return
for doc in self.content:
new_chunks_iterable = self._chunk_doc(doc)
self.chunks.extend(list(new_chunks_iterable))
self._save_documents()

def _convert_source_to_docling_documents(self) -> list[DoclingDocument]:
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
return [result.document for result in conv_results_iter]
for filepath, doc in zip(self._aligned_paths, self.content, strict=True):
chunk_idx = 0
for chunk in self._chunk_doc(doc):
self.chunks.append(
{
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_idx,
"source_type": "docling",
},
}
)
chunk_idx += 1

def _chunk_doc(self, doc: DoclingDocument) -> Iterator[str]:
self._save_documents()

def _convert_one(self, fp: Path | str) -> tuple[Any, Path | str] | None:
"""Convert a single file; on failure, log and return None."""
try:
result = self.document_converter.convert(fp)
return result.document, fp
except Exception as e:
if DoclingConversionError is not None and isinstance(
e, DoclingConversionError
):
self._logger.log(
"warning",
f"Skipping {fp!s}: conversion failed with {e!s}",
color="yellow",
)
else:
self._logger.log(
"warning",
f"Skipping {fp!s}: unexpected error during conversion {e!s}",
color="yellow",
)
return None

def _convert_source_to_docling_documents(self) -> list[Any]:
"""
Convert files one-by-one to preserve (filepath, document) alignment.

Any file that fails conversion is skipped (with a warning). For all successful
conversions, we maintain a parallel list of source paths so the add() step can
attach correct per-chunk filepath metadata without relying on zip truncation.
"""
aligned_docs: list[Any] = []
aligned_paths: list[Path | str] = []

for fp in self.safe_file_paths:
item = self._convert_one(fp)
if item is None:
continue
doc, aligned_fp = item
aligned_docs.append(doc)
aligned_paths.append(aligned_fp)

self._aligned_paths = aligned_paths
return aligned_docs

def _chunk_doc(self, doc: Any) -> Iterator[str]:
chunker = HierarchicalChunker()
for chunk in chunker.chunk(doc):
yield chunk.text
Expand All @@ -127,7 +230,6 @@ def validate_content(self) -> list[Path | str]:
else:
raise FileNotFoundError(f"File not found: {local_path}")
else:
# this is an instance of Path
processed_paths.append(path)
return processed_paths

Expand All @@ -138,7 +240,7 @@ def _validate_url(self, url: str) -> bool:
[
result.scheme in ("http", "https"),
result.netloc,
len(result.netloc.split(".")) >= 2, # Ensure domain has TLD
len(result.netloc.split(".")) >= 2,
]
)
except Exception:
Expand Down
26 changes: 18 additions & 8 deletions lib/crewai/src/crewai/knowledge/source/csv_knowledge_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,25 @@ def load_content(self) -> dict[Path, str]:

def add(self) -> None:
"""
Add CSV file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
Add CSV file content to the knowledge source, chunk it per file,
attach filepath metadata, and persist via the configured storage.
"""
content_str = (
str(self.content) if isinstance(self.content, dict) else self.content
)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)
self._save_documents()
for filepath, text in self.content.items():
chunk_idx = 0
content_str = str(text)
for chunk in self._chunk_text(content_str):
self.chunks.append(
{
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_idx,
"source_type": "csv",
},
}
)
chunk_idx += 1
self._save_documents() # type: ignore[no-untyped-call]

def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
Expand Down
Loading