Skip to content

Commit f42c39f

Browse files
trducngphv2312
authored andcommitted
(pump:minor) Allow the indexing pipeline to report the indexing progress onto the UI (Cinnamon#81)
* Turn the file indexing event to generator to report progress * Fix React text's trimming function * Refactor delete file into a method
1 parent 46c9f70 commit f42c39f

File tree

11 files changed

+509
-288
lines changed

11 files changed

+509
-288
lines changed

libs/kotaemon/kotaemon/base/component.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import abstractmethod
2-
from typing import AsyncGenerator, Iterator, Optional
2+
from typing import Any, AsyncGenerator, Iterator, Optional
33

44
from theflow import Function, Node, Param, lazy
55

@@ -58,7 +58,7 @@ def astream(self, *args, **kwargs) -> AsyncGenerator[Document, None] | None:
5858
@abstractmethod
5959
def run(
6060
self, *args, **kwargs
61-
) -> Document | list[Document] | Iterator[Document] | None:
61+
) -> Document | list[Document] | Iterator[Document] | None | Any:
6262
"""Run the component."""
6363
...
6464

libs/kotaemon/kotaemon/base/schema.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@ class Document(BaseDocument):
3232
channel: the channel to show the document. Optional.:
3333
- chat: show in chat message
3434
- info: show in information panel
35+
- index: show in index panel
3536
- debug: show in debug panel
3637
"""
3738

3839
content: Any = None
3940
source: Optional[str] = None
40-
channel: Optional[Literal["chat", "info", "debug"]] = None
41+
channel: Optional[Literal["chat", "info", "index", "debug"]] = None
4142

4243
def __init__(self, content: Optional[Any] = None, *args, **kwargs):
4344
if content is None:

libs/kotaemon/kotaemon/indices/ingests/files.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pathlib import Path
22
from typing import Type
33

4+
from llama_index.readers import PDFReader
45
from llama_index.readers.base import BaseReader
56

67
from kotaemon.base import BaseComponent, Document, Param
@@ -17,18 +18,20 @@
1718
UnstructuredReader,
1819
)
1920

20-
KH_DEFAULT_FILE_EXTRACTORS: dict[str, Type[BaseReader]] = {
21-
".xlsx": PandasExcelReader,
22-
".docx": UnstructuredReader,
23-
".xls": UnstructuredReader,
24-
".doc": UnstructuredReader,
25-
".html": HtmlReader,
26-
".mhtml": MhtmlReader,
27-
".png": UnstructuredReader,
28-
".jpeg": UnstructuredReader,
29-
".jpg": UnstructuredReader,
30-
".tiff": UnstructuredReader,
31-
".tif": UnstructuredReader,
21+
unstructured = UnstructuredReader()
22+
KH_DEFAULT_FILE_EXTRACTORS: dict[str, BaseReader] = {
23+
".xlsx": PandasExcelReader(),
24+
".docx": unstructured,
25+
".xls": unstructured,
26+
".doc": unstructured,
27+
".html": HtmlReader(),
28+
".mhtml": MhtmlReader(),
29+
".png": unstructured,
30+
".jpeg": unstructured,
31+
".jpg": unstructured,
32+
".tiff": unstructured,
33+
".tif": unstructured,
34+
".pdf": PDFReader(),
3235
}
3336

3437

@@ -64,7 +67,7 @@ class DocumentIngestor(BaseComponent):
6467
def _get_reader(self, input_files: list[str | Path]):
6568
"""Get appropriate readers for the input files based on file extension"""
6669
file_extractors: dict[str, BaseReader] = {
67-
ext: cls() for ext, cls in KH_DEFAULT_FILE_EXTRACTORS.items()
70+
ext: reader for ext, reader in KH_DEFAULT_FILE_EXTRACTORS.items()
6871
}
6972
for ext, cls in self.override_file_extractors.items():
7073
file_extractors[ext] = cls()

libs/kotaemon/kotaemon/loaders/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99

1010
class BaseReader(BaseComponent):
11+
"""The base class for all readers"""
12+
1113
...
1214

1315

libs/ktem/ktem/index/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def get_indexing_pipeline(
126126
...
127127

128128
def get_retriever_pipelines(
129-
self, settings: dict, selected: Any = None
129+
self, settings: dict, user_id: int, selected: Any = None
130130
) -> list["BaseComponent"]:
131131
"""Return the retriever pipelines to retrieve the entity from the index"""
132132
return []

libs/ktem/ktem/index/file/base.py

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
from pathlib import Path
2-
from typing import Optional
2+
from typing import Generator, Optional
33

4-
from kotaemon.base import BaseComponent
4+
from kotaemon.base import BaseComponent, Document, Param
55

66

77
class BaseFileIndexRetriever(BaseComponent):
8+
9+
Source = Param(help="The SQLAlchemy Source table")
10+
Index = Param(help="The SQLAlchemy Index table")
11+
VS = Param(help="The VectorStore")
12+
DS = Param(help="The DocStore")
13+
FSPath = Param(help="The file storage path")
14+
user_id = Param(help="The user id")
15+
816
@classmethod
917
def get_user_settings(cls) -> dict:
1018
"""Get the user settings for indexing
@@ -24,20 +32,6 @@ def get_pipeline(
2432
) -> "BaseFileIndexRetriever":
2533
raise NotImplementedError
2634

27-
def set_resources(self, resources: dict):
28-
"""Set the resources for the indexing pipeline
29-
30-
This will setup the tables, the vector store and docstore.
31-
32-
Args:
33-
resources (dict): the resources for the indexing pipeline
34-
"""
35-
self._Source = resources["Source"]
36-
self._Index = resources["Index"]
37-
self._VS = resources["VectorStore"]
38-
self._DS = resources["DocStore"]
39-
self._fs_path = resources["FileStoragePath"]
40-
4135

4236
class BaseFileIndexIndexing(BaseComponent):
4337
"""The pipeline to index information into the data store
@@ -54,11 +48,45 @@ class BaseFileIndexIndexing(BaseComponent):
5448
- self._DS: the docstore
5549
"""
5650

57-
def run(self, file_paths: str | Path | list[str | Path], *args, **kwargs):
51+
Source = Param(help="The SQLAlchemy Source table")
52+
Index = Param(help="The SQLAlchemy Index table")
53+
VS = Param(help="The VectorStore")
54+
DS = Param(help="The DocStore")
55+
FSPath = Param(help="The file storage path")
56+
user_id = Param(help="The user id")
57+
58+
def run(
59+
self, file_paths: str | Path | list[str | Path], *args, **kwargs
60+
) -> tuple[list[str | None], list[str | None]]:
5861
"""Run the indexing pipeline
5962
6063
Args:
6164
file_paths (str | Path | list[str | Path]): the file paths to index
65+
66+
Returns:
67+
- the indexed file ids (each file id corresponds to an input file path, or
68+
None if the indexing failed for that file path)
69+
- the error messages (each error message corresponds to an input file path,
70+
or None if the indexing was successful for that file path)
71+
"""
72+
raise NotImplementedError
73+
74+
def stream(
75+
self, file_paths: str | Path | list[str | Path], *args, **kwargs
76+
) -> Generator[Document, None, tuple[list[str | None], list[str | None]]]:
77+
"""Stream the indexing pipeline
78+
79+
Args:
80+
file_paths (str | Path | list[str | Path]): the file paths to index
81+
82+
Yields:
83+
Document: the output message to the UI, must have channel == index or debug
84+
85+
Returns:
86+
- the indexed file ids (each file id corresponds to an input file path, or
87+
None if the indexing failed for that file path)
88+
- the error messages (each error message corresponds to an input file path,
89+
or None if the indexing was successful for that file path)
6290
"""
6391
raise NotImplementedError
6492

@@ -78,20 +106,6 @@ def get_user_settings(cls) -> dict:
78106
"""
79107
return {}
80108

81-
def set_resources(self, resources: dict):
82-
"""Set the resources for the indexing pipeline
83-
84-
This will setup the tables, the vector store and docstore.
85-
86-
Args:
87-
resources (dict): the resources for the indexing pipeline
88-
"""
89-
self._Source = resources["Source"]
90-
self._Index = resources["Index"]
91-
self._VS = resources["VectorStore"]
92-
self._DS = resources["DocStore"]
93-
self._fs_path = resources["FileStoragePath"]
94-
95109
def copy_to_filestorage(
96110
self, file_paths: str | Path | list[str | Path]
97111
) -> list[str]:
@@ -113,7 +127,7 @@ def copy_to_filestorage(
113127
for file_path in file_paths:
114128
with open(file_path, "rb") as f:
115129
paths.append(sha256(f.read()).hexdigest())
116-
shutil.copy(file_path, self._fs_path / paths[-1])
130+
shutil.copy(file_path, self.FSPath / paths[-1])
117131

118132
return paths
119133

libs/ktem/ktem/index/file/index.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,13 +362,17 @@ def get_indexing_pipeline(self, settings, user_id) -> BaseFileIndexIndexing:
362362
stripped_settings[key] = value
363363

364364
obj = self._indexing_pipeline_cls.get_pipeline(stripped_settings, self.config)
365-
obj.set_resources(resources=self._resources)
366-
obj._user_id = user_id
365+
obj.Source = self._resources["Source"]
366+
obj.Index = self._resources["Index"]
367+
obj.VS = self._vs
368+
obj.DS = self._docstore
369+
obj.FSPath = self._fs_path
370+
obj.user_id = user_id
367371

368372
return obj
369373

370374
def get_retriever_pipelines(
371-
self, settings: dict, selected: Any = None
375+
self, settings: dict, user_id: int, selected: Any = None
372376
) -> list["BaseFileIndexRetriever"]:
373377
# retrieval settings
374378
prefix = f"index.options.{self.id}."
@@ -387,7 +391,12 @@ def get_retriever_pipelines(
387391
obj = cls.get_pipeline(stripped_settings, self.config, selected_ids)
388392
if obj is None:
389393
continue
390-
obj.set_resources(self._resources)
394+
obj.Source = self._resources["Source"]
395+
obj.Index = self._resources["Index"]
396+
obj.VS = self._vs
397+
obj.DS = self._docstore
398+
obj.FSPath = self._fs_path
399+
obj.user_id = user_id
391400
retrievers.append(obj)
392401

393402
return retrievers

0 commit comments

Comments
 (0)