From cd794f72eca20cba8f1613098d585012371111f8 Mon Sep 17 00:00:00 2001 From: ahmedjawedaj Date: Wed, 14 Jan 2026 15:03:59 +0500 Subject: [PATCH 1/2] fix: add missing backend methods for folder sync - Add get_kb_content() to list documents/images - Add /content API endpoint - Fix folder_id parameter in upload task - All tested and working on macOS --- src/api/routers/knowledge.py | 33 ++++++++++++++++++++++- src/knowledge/manager.py | 52 ++++++++++++++++++++++++++++++++++++ web/app/knowledge/page.tsx | 39 +++++++++------------------ 3 files changed, 96 insertions(+), 28 deletions(-) diff --git a/src/api/routers/knowledge.py b/src/api/routers/knowledge.py index 8394710b..c8b85e76 100644 --- a/src/api/routers/knowledge.py +++ b/src/api/routers/knowledge.py @@ -136,7 +136,12 @@ async def run_initialization_task(initializer: KnowledgeBaseInitializer): async def run_upload_processing_task( - kb_name: str, base_dir: str, api_key: str, base_url: str, uploaded_file_paths: list[str] + kb_name: str, + base_dir: str, + api_key: str, + base_url: str, + uploaded_file_paths: list[str], + folder_id: str | None = None, # Optional: for folder sync state tracking ): """Background task for processing uploaded files""" task_manager = TaskIDManager.get_instance() @@ -177,6 +182,18 @@ async def run_upload_processing_task( adder.update_metadata(len(new_files)) + # Update folder sync state if this was a folder sync operation + if folder_id: + try: + manager = get_kb_manager() + processed_paths_str = [str(p) for p in processed_files] + manager.mark_folder_synced(kb_name, folder_id, processed_paths_str) + logger.info( + f"[{task_id}] Updated sync state for folder {folder_id}: {len(processed_files)} files" + ) + except Exception as e: + logger.warning(f"[{task_id}] Failed to update folder sync state: {e}") + progress_tracker.update( ProgressStage.COMPLETED, f"Successfully processed {len(processed_files)} files!", @@ -583,6 +600,20 @@ async def websocket_progress(websocket: WebSocket, kb_name: str): pass + +@router.get("/{kb_name}/content") +async def get_kb_content(kb_name: str): + """Get list of content (documents and images) in a knowledge base""" + try: + manager = get_kb_manager() + return manager.get_kb_content(kb_name) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error getting KB content: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/{kb_name}/link-folder", response_model=LinkedFolderInfo) async def link_folder(kb_name: str, request: LinkFolderRequest): """ diff --git a/src/knowledge/manager.py b/src/knowledge/manager.py index 8537e2bf..48c1c088 100644 --- a/src/knowledge/manager.py +++ b/src/knowledge/manager.py @@ -336,6 +336,58 @@ def clean_rag_storage(self, name: str | None = None, backup: bool = True) -> boo print(f"✓ RAG storage cleaned for '{kb_name}'") return True + def get_kb_content(self, kb_name: str) -> dict: + """ + Get detailed content list (documents and images) for a knowledge base. + + Args: + kb_name: Knowledge base name + + Returns: + Dict with 'documents' and 'images' lists containing file metadata + """ + if kb_name not in self.list_knowledge_bases(): + raise ValueError(f"Knowledge base not found: {kb_name}") + + kb_dir = self.base_dir / kb_name + raw_dir = kb_dir / "raw" + images_dir = kb_dir / "images" + + content = {"documents": [], "images": []} + + # Scan raw documents + if raw_dir.exists(): + for f in raw_dir.iterdir(): + if f.is_file() and not f.name.startswith("."): + stat = f.stat() + content["documents"].append( + { + "name": f.name, + "path": str(f), + "size": stat.st_size, + "last_modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), + } + ) + + # Scan images + if images_dir.exists(): + for f in images_dir.iterdir(): + if f.is_file() and not f.name.startswith("."): + stat = f.stat() + content["images"].append( + { + "name": f.name, + "path": str(f), + "size": stat.st_size, + "last_modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), + } + ) + + return content + + # Local Folder Integration + # ============================================================ + def main(): # ============================================================ diff --git a/web/app/knowledge/page.tsx b/web/app/knowledge/page.tsx index ce67655f..2e39accf 100644 --- a/web/app/knowledge/page.tsx +++ b/web/app/knowledge/page.tsx @@ -15,8 +15,6 @@ import { Loader2, X, RefreshCw, - CheckCircle2, - AlertCircle, } from "lucide-react"; import { apiUrl, wsUrl } from "@/lib/api"; @@ -60,21 +58,6 @@ export default function KnowledgePage() { const [progressMap, setProgressMap] = useState>( {}, ); - - // Toast notification system - const [toast, setToast] = useState<{ - message: string; - type: "success" | "error" | "info"; - } | null>(null); - - const showToast = ( - message: string, - type: "success" | "error" | "info" = "info", - ) => { - setToast({ message, type }); - setTimeout(() => setToast(null), 3000); - }; - // Use ref only for WebSocket connections (no need for state as it's not used in render) const wsConnectionsRef = useRef>({}); const kbsNamesRef = useRef([]); @@ -410,7 +393,7 @@ export default function KnowledgePage() { fetchKnowledgeBases(); } catch (err) { console.error(err); - showToast("Failed to delete knowledge base", "error"); + alert("Failed to delete knowledge base"); } }; @@ -479,10 +462,10 @@ export default function KnowledgePage() { setFiles(null); // Refresh immediately to establish WebSocket connection await fetchKnowledgeBases(); - showToast("Files uploaded successfully! Processing started in background.", "success"); + alert("Files uploaded successfully! Processing started in background."); } catch (err) { console.error(err); - showToast("Failed to upload files", "error"); + alert("Failed to upload files"); } finally { setUploading(false); } @@ -555,11 +538,12 @@ export default function KnowledgePage() { await fetchKnowledgeBases(); }, 1000); - - showToast("Knowledge base created successfully!", "success"); + alert( + "Knowledge base created successfully! Initialization started in background.", + ); } catch (err: any) { console.error(err); - showToast(`Failed to create knowledge base: ${err.message}`, "error"); + alert(`Failed to create knowledge base: ${err.message}`); } finally { setUploading(false); } @@ -895,10 +879,11 @@ export default function KnowledgePage() { Upload Documents
Date: Fri, 16 Jan 2026 01:32:21 +0500 Subject: [PATCH 2/2] fix: folder sync improvements ## Changes since last push: ### 1. Fixed folder sync state tracking (knowledge.py) - Added 'folder_id' parameter to run_upload_processing_task - Implemented source path tracking: Now stores original source paths from linked folders instead of destination paths in synced_files - This fixes the bug where files always appeared as 'new' after sync because detect_folder_changes checks source paths but synced_files contained destination paths ### 2. Added LightRAG fallback (add_documents.py) - When RAGAnything module is unavailable, gracefully falls back to LightRAG pipeline instead of raising ImportError - New _process_with_lightrag_fallback method handles text extraction and indexing for PDF, DOCX, TXT, and MD files - New _extract_text_content helper for extracting text from various document formats using PyMuPDF and python-docx ### 3. Improved error handling - Added null checks for processed_files in progress messages - Better logging for sync state updates --- src/api/routers/knowledge.py | 28 ++++++- src/knowledge/add_documents.py | 144 +++++++++++++++++++++++++++++++-- 2 files changed, 163 insertions(+), 9 deletions(-) diff --git a/src/api/routers/knowledge.py b/src/api/routers/knowledge.py index 14556dfa..38c5200f 100644 --- a/src/api/routers/knowledge.py +++ b/src/api/routers/knowledge.py @@ -142,6 +142,7 @@ async def run_upload_processing_task( base_url: str, uploaded_file_paths: list[str], rag_provider: str = None, + folder_id: str | None = None, # For folder sync state tracking ): """Background task for processing uploaded files""" task_manager = TaskIDManager.get_instance() @@ -179,18 +180,37 @@ async def run_upload_processing_task( current=0, total=len(processed_files), ) + + # Update sync state with SOURCE paths (not destination) for proper change detection + if folder_id: + try: + manager = get_kb_manager() + # Map processed files back to their source paths by matching filenames + processed_basenames = {p.name for p in processed_files} + synced_source_paths = [ + src for src in uploaded_file_paths if Path(src).name in processed_basenames + ] + manager.update_folder_sync_state(kb_name, folder_id, synced_source_paths) + logger.info( + f"Updated sync state for {len(synced_source_paths)} source files in folder {folder_id}" + ) + except Exception as e: + logger.error(f"Failed to update folder sync state: {e}") + adder.extract_numbered_items_for_new_docs(processed_files, batch_size=20) adder.update_metadata(len(new_files)) progress_tracker.update( ProgressStage.COMPLETED, - f"Successfully processed {len(processed_files)} files!", - current=len(processed_files), - total=len(processed_files), + f"Successfully processed {len(processed_files) if processed_files else 0} files!", + current=len(processed_files) if processed_files else 0, + total=len(processed_files) if processed_files else len(new_files), ) - logger.success(f"[{task_id}] Processed {len(processed_files)} files to KB '{kb_name}'") + logger.success( + f"[{task_id}] Processed {len(processed_files) if processed_files else 0} files to KB '{kb_name}'" + ) task_manager.update_task_status(task_id, "completed") except Exception as e: error_msg = f"Upload processing failed (KB '{kb_name}'): {e}" diff --git a/src/knowledge/add_documents.py b/src/knowledge/add_documents.py index bb7ff46d..43d952d7 100644 --- a/src/knowledge/add_documents.py +++ b/src/knowledge/add_documents.py @@ -205,20 +205,24 @@ async def process_new_documents(self, new_files: List[Path]): Uses FileTypeRouter to classify files and route them appropriately: - PDF/DOCX/images -> MinerU parser (full document analysis) - Text/Markdown -> Direct read + LightRAG insert (fast) + + Falls back to LightRAG pipeline if RAGAnything is unavailable. """ if not new_files: return None - if raganything_cls is None: - raise ImportError("RAGAnything module not found.") - - from src.services.rag.components.routing import FileTypeRouter - # Pre-import progress stage if needed to avoid overhead in loop ProgressStage: Any = None if self.progress_tracker: from src.knowledge.progress_tracker import ProgressStage + # Fall back to LightRAG if RAGAnything is not available + if raganything_cls is None: + logger.warning("RAGAnything not available, using LightRAG fallback for text processing") + return await self._process_with_lightrag_fallback(new_files, ProgressStage) + + from src.services.rag.components.routing import FileTypeRouter + self.llm_cfg = get_llm_config() model = self.llm_cfg.model api_key = self.api_key or self.llm_cfg.api_key @@ -430,6 +434,136 @@ async def unified_embed_func(texts): await self.fix_structure() return processed_files + async def _process_with_lightrag_fallback(self, new_files: List[Path], ProgressStage): + """Fallback processing using LightRAG when RAGAnything is not available. + + This is a simpler pipeline that: + 1. Reads text content from supported files + 2. Inserts into LightRAG for indexing + """ + from lightrag import LightRAG + from lightrag.llm.openai import openai_complete_if_cache + + self.llm_cfg = get_llm_config() + model = self.llm_cfg.model + api_key = self.api_key or self.llm_cfg.api_key + base_url = self.base_url or self.llm_cfg.base_url + + logger.info(f"Using LightRAG fallback for {len(new_files)} files") + + # Initialize LightRAG with async LLM function + async def llm_model_func(prompt, system_prompt=None, history_messages=[], **kwargs): + return await openai_complete_if_cache( + model, + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=api_key, + base_url=base_url, + **kwargs, + ) + + # Use embedding config + reset_embedding_client() + embedding_cfg = get_embedding_config() + embedding_client = get_embedding_client() + + async def embedding_func(texts): + logger.info(f"Embedding {len(texts)} texts...") + result = await embedding_client.embed(texts) + return result + + try: + rag = LightRAG( + working_dir=str(self.rag_storage_dir), + llm_model_func=llm_model_func, + embedding_func=EmbeddingFunc( + embedding_dim=embedding_cfg.dim, + max_token_size=embedding_cfg.max_tokens, + func=embedding_func, + ), + ) + logger.info("LightRAG instance created successfully") + except Exception as e: + logger.error(f"Failed to create LightRAG instance: {e}") + raise + + processed_files = [] + for idx, doc_file in enumerate(new_files, 1): + try: + if self.progress_tracker and ProgressStage: + self.progress_tracker.update( + ProgressStage.PROCESSING_FILE, + f"Processing {doc_file.name} (fallback mode)", + current=idx, + total=len(new_files), + ) + + if not doc_file.exists(): + logger.error(f" ✗ Failed: File missing {doc_file.name}") + continue + + # Read text content based on file type + text_content = await self._extract_text_content(doc_file) + + if text_content and text_content.strip(): + # Insert into LightRAG + await rag.ainsert(text_content) + logger.info(f" ✓ Indexed (fallback): {doc_file.name}") + processed_files.append(doc_file) + self._record_successful_hash(doc_file) + else: + logger.warning(f" ⚠ No text content extracted: {doc_file.name}") + processed_files.append(doc_file) + self._record_successful_hash(doc_file) + + except Exception as e: + logger.exception(f" ✗ Failed {doc_file.name}: {e}") + + return processed_files + + async def _extract_text_content(self, file_path: Path) -> str: + """Extract text content from a file based on its extension.""" + suffix = file_path.suffix.lower() + + try: + if suffix in [".txt", ".md", ".markdown"]: + with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + return f.read() + + elif suffix == ".pdf": + try: + import fitz # PyMuPDF + + doc = fitz.open(file_path) + text_parts = [] + for page in doc: + text_parts.append(page.get_text()) + doc.close() + return "\n".join(text_parts) + except ImportError: + logger.warning("PyMuPDF not available for PDF extraction") + return "" + + elif suffix in [".docx", ".doc"]: + try: + from docx import Document + + doc = Document(file_path) + text_parts = [para.text for para in doc.paragraphs] + return "\n".join(text_parts) + except ImportError: + logger.warning("python-docx not available for Word extraction") + return "" + + else: + logger.warning(f"Unsupported file type for text extraction: {suffix}") + return "" + + except Exception as e: + logger.error(f"Error extracting text from {file_path}: {e}") + return "" + def _record_successful_hash(self, file_path: Path): """Update metadata with the hash of a successfully processed file.""" file_hash = self._get_file_hash(file_path)