Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the indexing pipeline to report the indexing progress onto the UI #81

Merged
merged 3 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/kotaemon/kotaemon/base/component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import abstractmethod
from typing import AsyncGenerator, Iterator, Optional
from typing import Any, AsyncGenerator, Iterator, Optional

from theflow import Function, Node, Param, lazy

Expand Down Expand Up @@ -58,7 +58,7 @@ def astream(self, *args, **kwargs) -> AsyncGenerator[Document, None] | None:
@abstractmethod
def run(
self, *args, **kwargs
) -> Document | list[Document] | Iterator[Document] | None:
) -> Document | list[Document] | Iterator[Document] | None | Any:
"""Run the component."""
...

Expand Down
3 changes: 2 additions & 1 deletion libs/kotaemon/kotaemon/base/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ class Document(BaseDocument):
channel: the channel to show the document. Optional.:
- chat: show in chat message
- info: show in information panel
- index: show in index panel
- debug: show in debug panel
"""

content: Any = None
source: Optional[str] = None
channel: Optional[Literal["chat", "info", "debug"]] = None
channel: Optional[Literal["chat", "info", "index", "debug"]] = None

def __init__(self, content: Optional[Any] = None, *args, **kwargs):
if content is None:
Expand Down
29 changes: 16 additions & 13 deletions libs/kotaemon/kotaemon/indices/ingests/files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Type

from llama_index.readers import PDFReader
from llama_index.readers.base import BaseReader

from kotaemon.base import BaseComponent, Document, Param
Expand All @@ -17,18 +18,20 @@
UnstructuredReader,
)

KH_DEFAULT_FILE_EXTRACTORS: dict[str, Type[BaseReader]] = {
".xlsx": PandasExcelReader,
".docx": UnstructuredReader,
".xls": UnstructuredReader,
".doc": UnstructuredReader,
".html": HtmlReader,
".mhtml": MhtmlReader,
".png": UnstructuredReader,
".jpeg": UnstructuredReader,
".jpg": UnstructuredReader,
".tiff": UnstructuredReader,
".tif": UnstructuredReader,
unstructured = UnstructuredReader()
KH_DEFAULT_FILE_EXTRACTORS: dict[str, BaseReader] = {
".xlsx": PandasExcelReader(),
".docx": unstructured,
".xls": unstructured,
".doc": unstructured,
".html": HtmlReader(),
".mhtml": MhtmlReader(),
".png": unstructured,
".jpeg": unstructured,
".jpg": unstructured,
".tiff": unstructured,
".tif": unstructured,
".pdf": PDFReader(),
}


Expand Down Expand Up @@ -64,7 +67,7 @@ class DocumentIngestor(BaseComponent):
def _get_reader(self, input_files: list[str | Path]):
"""Get appropriate readers for the input files based on file extension"""
file_extractors: dict[str, BaseReader] = {
ext: cls() for ext, cls in KH_DEFAULT_FILE_EXTRACTORS.items()
ext: reader for ext, reader in KH_DEFAULT_FILE_EXTRACTORS.items()
}
for ext, cls in self.override_file_extractors.items():
file_extractors[ext] = cls()
Expand Down
2 changes: 2 additions & 0 deletions libs/kotaemon/kotaemon/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class BaseReader(BaseComponent):
"""The base class for all readers"""

...


Expand Down
2 changes: 1 addition & 1 deletion libs/ktem/ktem/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def get_indexing_pipeline(
...

def get_retriever_pipelines(
self, settings: dict, selected: Any = None
self, settings: dict, user_id: int, selected: Any = None
) -> list["BaseComponent"]:
"""Return the retriever pipelines to retrieve the entity from the index"""
return []
78 changes: 46 additions & 32 deletions libs/ktem/ktem/index/file/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from pathlib import Path
from typing import Optional
from typing import Generator, Optional

from kotaemon.base import BaseComponent
from kotaemon.base import BaseComponent, Document, Param


class BaseFileIndexRetriever(BaseComponent):

Source = Param(help="The SQLAlchemy Source table")
Index = Param(help="The SQLAlchemy Index table")
VS = Param(help="The VectorStore")
DS = Param(help="The DocStore")
FSPath = Param(help="The file storage path")
user_id = Param(help="The user id")

@classmethod
def get_user_settings(cls) -> dict:
"""Get the user settings for indexing
Expand All @@ -24,20 +32,6 @@ def get_pipeline(
) -> "BaseFileIndexRetriever":
raise NotImplementedError

def set_resources(self, resources: dict):
"""Set the resources for the indexing pipeline

This will setup the tables, the vector store and docstore.

Args:
resources (dict): the resources for the indexing pipeline
"""
self._Source = resources["Source"]
self._Index = resources["Index"]
self._VS = resources["VectorStore"]
self._DS = resources["DocStore"]
self._fs_path = resources["FileStoragePath"]


class BaseFileIndexIndexing(BaseComponent):
"""The pipeline to index information into the data store
Expand All @@ -54,11 +48,45 @@ class BaseFileIndexIndexing(BaseComponent):
- self._DS: the docstore
"""

def run(self, file_paths: str | Path | list[str | Path], *args, **kwargs):
Source = Param(help="The SQLAlchemy Source table")
Index = Param(help="The SQLAlchemy Index table")
VS = Param(help="The VectorStore")
DS = Param(help="The DocStore")
FSPath = Param(help="The file storage path")
user_id = Param(help="The user id")

def run(
self, file_paths: str | Path | list[str | Path], *args, **kwargs
) -> tuple[list[str | None], list[str | None]]:
"""Run the indexing pipeline

Args:
file_paths (str | Path | list[str | Path]): the file paths to index

Returns:
- the indexed file ids (each file id corresponds to an input file path, or
None if the indexing failed for that file path)
- the error messages (each error message corresponds to an input file path,
or None if the indexing was successful for that file path)
"""
raise NotImplementedError

def stream(
self, file_paths: str | Path | list[str | Path], *args, **kwargs
) -> Generator[Document, None, tuple[list[str | None], list[str | None]]]:
"""Stream the indexing pipeline

Args:
file_paths (str | Path | list[str | Path]): the file paths to index

Yields:
Document: the output message to the UI, must have channel == index or debug

Returns:
- the indexed file ids (each file id corresponds to an input file path, or
None if the indexing failed for that file path)
- the error messages (each error message corresponds to an input file path,
or None if the indexing was successful for that file path)
"""
raise NotImplementedError

Expand All @@ -78,20 +106,6 @@ def get_user_settings(cls) -> dict:
"""
return {}

def set_resources(self, resources: dict):
"""Set the resources for the indexing pipeline

This will setup the tables, the vector store and docstore.

Args:
resources (dict): the resources for the indexing pipeline
"""
self._Source = resources["Source"]
self._Index = resources["Index"]
self._VS = resources["VectorStore"]
self._DS = resources["DocStore"]
self._fs_path = resources["FileStoragePath"]

def copy_to_filestorage(
self, file_paths: str | Path | list[str | Path]
) -> list[str]:
Expand All @@ -113,7 +127,7 @@ def copy_to_filestorage(
for file_path in file_paths:
with open(file_path, "rb") as f:
paths.append(sha256(f.read()).hexdigest())
shutil.copy(file_path, self._fs_path / paths[-1])
shutil.copy(file_path, self.FSPath / paths[-1])

return paths

Expand Down
17 changes: 13 additions & 4 deletions libs/ktem/ktem/index/file/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,17 @@ def get_indexing_pipeline(self, settings, user_id) -> BaseFileIndexIndexing:
stripped_settings[key] = value

obj = self._indexing_pipeline_cls.get_pipeline(stripped_settings, self.config)
obj.set_resources(resources=self._resources)
obj._user_id = user_id
obj.Source = self._resources["Source"]
obj.Index = self._resources["Index"]
obj.VS = self._vs
obj.DS = self._docstore
obj.FSPath = self._fs_path
obj.user_id = user_id

return obj

def get_retriever_pipelines(
self, settings: dict, selected: Any = None
self, settings: dict, user_id: int, selected: Any = None
) -> list["BaseFileIndexRetriever"]:
# retrieval settings
prefix = f"index.options.{self.id}."
Expand All @@ -387,7 +391,12 @@ def get_retriever_pipelines(
obj = cls.get_pipeline(stripped_settings, self.config, selected_ids)
if obj is None:
continue
obj.set_resources(self._resources)
obj.Source = self._resources["Source"]
obj.Index = self._resources["Index"]
obj.VS = self._vs
obj.DS = self._docstore
obj.FSPath = self._fs_path
obj.user_id = user_id
retrievers.append(obj)

return retrievers
Loading
Loading