Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/test_sync_folder/test.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Test Image
1 change: 1 addition & 0 deletions data/test_sync_folder/test_doc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello World
1 change: 1 addition & 0 deletions data/test_sync_folder_2/doc2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Content 2
28 changes: 24 additions & 4 deletions src/api/routers/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ async def run_upload_processing_task(
base_url: str,
uploaded_file_paths: list[str],
rag_provider: str = None,
folder_id: str | None = None, # For folder sync state tracking
):
"""Background task for processing uploaded files"""
task_manager = TaskIDManager.get_instance()
Expand Down Expand Up @@ -179,18 +180,37 @@ async def run_upload_processing_task(
current=0,
total=len(processed_files),
)

# Update sync state with SOURCE paths (not destination) for proper change detection
if folder_id:
try:
manager = get_kb_manager()
# Map processed files back to their source paths by matching filenames
processed_basenames = {p.name for p in processed_files}
synced_source_paths = [
src for src in uploaded_file_paths if Path(src).name in processed_basenames
]
manager.update_folder_sync_state(kb_name, folder_id, synced_source_paths)
logger.info(
f"Updated sync state for {len(synced_source_paths)} source files in folder {folder_id}"
)
except Exception as e:
logger.error(f"Failed to update folder sync state: {e}")

adder.extract_numbered_items_for_new_docs(processed_files, batch_size=20)

adder.update_metadata(len(new_files))

progress_tracker.update(
ProgressStage.COMPLETED,
f"Successfully processed {len(processed_files)} files!",
current=len(processed_files),
total=len(processed_files),
f"Successfully processed {len(processed_files) if processed_files else 0} files!",
current=len(processed_files) if processed_files else 0,
total=len(processed_files) if processed_files else len(new_files),
)

logger.success(f"[{task_id}] Processed {len(processed_files)} files to KB '{kb_name}'")
logger.success(
f"[{task_id}] Processed {len(processed_files) if processed_files else 0} files to KB '{kb_name}'"
)
task_manager.update_task_status(task_id, "completed")
except Exception as e:
error_msg = f"Upload processing failed (KB '{kb_name}'): {e}"
Expand Down
143 changes: 138 additions & 5 deletions src/knowledge/add_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,25 @@ async def process_new_documents(self, new_files: List[Path]):
Uses FileTypeRouter to classify files and route them appropriately:
- PDF/DOCX/images -> MinerU parser (full document analysis)
- Text/Markdown -> Direct read + LightRAG insert (fast)

Falls back to LightRAG pipeline if RAGAnything is unavailable.
"""
if not new_files:
return None

if raganything_cls is None:
raise ImportError("RAGAnything module not found.")

from src.services.rag.components.routing import FileTypeRouter

# Pre-import progress stage if needed to avoid overhead in loop
ProgressStage: Any = None
if self.progress_tracker:
from src.knowledge.progress_tracker import ProgressStage

# Fall back to LightRAG if RAGAnything is not available
if raganything_cls is None:
logger.warning("RAGAnything not available, using LightRAG fallback for text processing")
return await self._process_with_lightrag_fallback(new_files, ProgressStage)

# Use unified LLM client from src/services/llm
from src.services.llm import get_llm_client
from src.services.rag.components.routing import FileTypeRouter

llm_client = get_llm_client()
self.llm_cfg = llm_client.config
Expand Down Expand Up @@ -349,6 +352,136 @@ async def unified_embed_func(texts):
await self.fix_structure()
return processed_files

async def _process_with_lightrag_fallback(self, new_files: List[Path], ProgressStage):
"""Fallback processing using LightRAG when RAGAnything is not available.

This is a simpler pipeline that:
1. Reads text content from supported files
2. Inserts into LightRAG for indexing
"""
from lightrag import LightRAG
from lightrag.llm.openai import openai_complete_if_cache

self.llm_cfg = get_llm_config()
model = self.llm_cfg.model
api_key = self.api_key or self.llm_cfg.api_key
base_url = self.base_url or self.llm_cfg.base_url

logger.info(f"Using LightRAG fallback for {len(new_files)} files")

# Initialize LightRAG with async LLM function
async def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs):
return await openai_complete_if_cache(
model,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=api_key,
base_url=base_url,
**kwargs,
)

# Use embedding config
reset_embedding_client()
embedding_cfg = get_embedding_config()
embedding_client = get_embedding_client()

async def embedding_func(texts):
logger.info(f"Embedding {len(texts)} texts...")
result = await embedding_client.embed(texts)
return result

try:
rag = LightRAG(
working_dir=str(self.rag_storage_dir),
llm_model_func=llm_model_func,
embedding_func=EmbeddingFunc(
embedding_dim=embedding_cfg.dim,
max_token_size=embedding_cfg.max_tokens,
func=embedding_func,
),
)
logger.info("LightRAG instance created successfully")
except Exception as e:
logger.error(f"Failed to create LightRAG instance: {e}")
raise

processed_files = []
for idx, doc_file in enumerate(new_files, 1):
try:
if self.progress_tracker and ProgressStage:
self.progress_tracker.update(
ProgressStage.PROCESSING_FILE,
f"Processing {doc_file.name} (fallback mode)",
current=idx,
total=len(new_files),
)

if not doc_file.exists():
logger.error(f" ✗ Failed: File missing {doc_file.name}")
continue

# Read text content based on file type
text_content = await self._extract_text_content(doc_file)

if text_content and text_content.strip():
# Insert into LightRAG
await rag.ainsert(text_content)
logger.info(f" ✓ Indexed (fallback): {doc_file.name}")
processed_files.append(doc_file)
self._record_successful_hash(doc_file)
else:
logger.warning(f" ⚠ No text content extracted: {doc_file.name}")
processed_files.append(doc_file)
self._record_successful_hash(doc_file)

except Exception as e:
logger.exception(f" ✗ Failed {doc_file.name}: {e}")

return processed_files

async def _extract_text_content(self, file_path: Path) -> str:
"""Extract text content from a file based on its extension."""
suffix = file_path.suffix.lower()

try:
if suffix in [".txt", ".md", ".markdown"]:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()

elif suffix == ".pdf":
try:
import fitz # PyMuPDF

doc = fitz.open(file_path)
text_parts = []
for page in doc:
text_parts.append(page.get_text())
doc.close()
return "\n".join(text_parts)
except ImportError:
logger.warning("PyMuPDF not available for PDF extraction")
return ""

elif suffix in [".docx", ".doc"]:
try:
from docx import Document

doc = Document(file_path)
text_parts = [para.text for para in doc.paragraphs]
return "\n".join(text_parts)
except ImportError:
logger.warning("python-docx not available for Word extraction")
return ""

else:
logger.warning(f"Unsupported file type for text extraction: {suffix}")
return ""

except Exception as e:
logger.error(f"Error extracting text from {file_path}: {e}")
return ""

def _record_successful_hash(self, file_path: Path):
"""Update metadata with the hash of a successfully processed file."""
file_hash = self._get_file_hash(file_path)
Expand Down
Loading