Skip to content

Commit

Permalink
i3 worker integration (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
ciur authored Dec 14, 2024
1 parent ef234dc commit 25c2c7f
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 103 deletions.
7 changes: 6 additions & 1 deletion docker/standard/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@ exec_createsuperuser() {
}

exec_index_schema_apply() {
VIRTUAL_ENV=/core_app/.venv && cd /core_app && poetry run ./manage.py index_schema apply
echo "RUNNING: exec_index_schema_apply"
if [[ -z "${PAPERMERGE__SEARCH__URL}" ]]; then
echo "env var PAPERMERGE__SEARCH__URL is NON-EMPTY... running..."
cd /core_app && poetry run paper-cli index-schema apply
fi
}

exec_init() {
exec_migrate
exec_perms_sync
exec_createsuperuser
exec_index_schema_apply
}

rm -f /etc/nginx/nginx.conf
Expand Down
2 changes: 2 additions & 0 deletions papermerge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from papermerge.core.cli import token as token_cli
from papermerge.search.cli import search
from papermerge.search.cli import index
from papermerge.search.cli import index_schema

app = typer.Typer(help="Papermerge DMS command line management tool")
app.add_typer(usr_cli.app, name="users")
Expand All @@ -16,6 +17,7 @@
app.add_typer(token_cli.app, name="tokens")
app.add_typer(search.app, name="search")
app.add_typer(index.app, name="index")
app.add_typer(index_schema.app, name="index-schema")

if __name__ == "__main__":
app()
6 changes: 5 additions & 1 deletion papermerge/core/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
get_docs_count_by_type,
update_doc_type,
update_doc_cfv,
get_doc_cfv
get_doc_cfv,
get_doc_ver_pages
)
from .features.nodes.db.api import get_nodes
from .features.document_types.db.api import (
create_document_type,
get_document_types,
Expand All @@ -21,8 +23,10 @@
from .features.custom_fields.db.api import create_custom_field

__all__ = [
"get_nodes",
"move_pages",
"get_last_doc_ver",
"get_doc_ver_pages",
"get_doc_ver",
"get_doc",
"get_doc_cfv",
Expand Down
1 change: 1 addition & 0 deletions papermerge/core/features/document/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class Document(BaseModel):
ocr: bool = True # will this document be OCRed?
ocr_status: OCRStatusEnum = OCRStatusEnum.unknown
thumbnail_url: ThumbnailUrl = None
user_id: UUID

@field_validator("thumbnail_url", mode="before")
def thumbnail_url_validator(cls, value, info):
Expand Down
15 changes: 8 additions & 7 deletions papermerge/core/features/nodes/db/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def str2colexpr(keys: list[str]):


def get_nodes(
db_session: Session, user_id: UUID, node_ids: list[UUID] | None = None
db_session: Session, user_id: UUID | None = None, node_ids: list[UUID] | None = None
) -> list[schema.Document | schema.Folder]:
items = []
if node_ids is None:
Expand All @@ -55,18 +55,19 @@ def get_nodes(
stmt = (
select(orm.Node)
.options(selectinload(orm.Node.tags))
.filter(orm.Node.id.in_(node_ids), orm.Node.user_id == user_id)
.filter(orm.Node.id.in_(node_ids))
)
else:
stmt = (
select(orm.Node)
.options(selectinload(orm.Node.tags))
.filter(orm.Node.user_id == user_id)
)
stmt = select(orm.Node).options(selectinload(orm.Node.tags))

if user_id is not None:
stmt = stmt.filter(orm.Node.user_id == user_id)

nodes = db_session.scalars(stmt).all()

for node in nodes:
breadcrumb = get_ancestors(db_session, node.id, include_self=False)
node.breadcrumb = breadcrumb
if node.ctype == "folder":
items.append(schema.Folder.model_validate(node))
else:
Expand Down
28 changes: 15 additions & 13 deletions papermerge/core/features/nodes/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@
from fastapi import APIRouter, Depends, HTTPException, Query, Security
from sqlalchemy.exc import NoResultFound, IntegrityError

from papermerge.celery_app import app as celery_app
from papermerge.core.constants import INDEX_REMOVE_NODE
from papermerge.core.tasks import send_task
from papermerge.core import utils, schema, config
from papermerge.core.features.auth import get_current_user
from papermerge.core.features.auth import scopes
from papermerge.core.features.auth import scopes, get_current_user
from papermerge.core.constants import INDEX_ADD_NODE
from papermerge.core.db.engine import Session
from papermerge.core.features.document.db import api as doc_dbapi
from papermerge.core.features.nodes.db import api as nodes_dbapi
from papermerge.core.routers.common import OPEN_API_GENERIC_JSON_DETAIL
from papermerge.core.routers.params import CommonQueryParams
from papermerge.core.utils.decorators import if_redis_present
from papermerge.core.exceptions import EntityNotFound


Expand Down Expand Up @@ -121,6 +120,7 @@ def create_node(
if error:
raise HTTPException(status_code=400, detail=error.model_dump())

send_task(INDEX_ADD_NODE, kwargs={"node_id": str(created_node.id)}, route_name="i3")
return created_node


Expand All @@ -146,6 +146,7 @@ def update_node(
db_session, node_id=node_id, user_id=user.id, attrs=node
)

send_task(INDEX_ADD_NODE, kwargs={"node_id": str(updated_node.id)}, route_name="i3")
return updated_node


Expand Down Expand Up @@ -173,6 +174,12 @@ def delete_nodes(
if error:
raise HTTPException(status_code=400, detail=error.model_dump())

send_task(
INDEX_REMOVE_NODE,
kwargs={"item_ids": [str(i) for i in list_of_uuids]},
route_name="i3",
)


@router.post(
"/move",
Expand Down Expand Up @@ -284,7 +291,7 @@ def assign_node_tags(
if error:
raise HTTPException(status_code=400, detail=error.model_dump())

_notify_index(node_id)
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")

return node

Expand Down Expand Up @@ -355,7 +362,7 @@ def update_node_tags(
if error:
raise HTTPException(status_code=400, detail=error.model_dump())

_notify_index(node.id)
send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")

return node

Expand Down Expand Up @@ -412,11 +419,6 @@ def remove_node_tags(
if error:
raise HTTPException(status_code=400, detail=error.model_dump())

_notify_index(node.id)
return node

send_task(INDEX_ADD_NODE, kwargs={"node_id": str(node_id)}, route_name="i3")

@if_redis_present
def _notify_index(node_id: uuid.UUID):
id_as_str = str(node_id) # just in case, make sure it is str
celery_app.send_task(INDEX_ADD_NODE, kwargs={"node_id": id_as_str}, route_name="i3")
return node
1 change: 1 addition & 0 deletions papermerge/core/features/nodes/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Folder(NewFolder):
tags: List[Tag] = []
created_at: datetime
updated_at: datetime
user_id: UUID

breadcrumb: List[Tuple[UUID, str]] = []

Expand Down
26 changes: 18 additions & 8 deletions papermerge/core/features/page_mngm/db/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pikepdf import Pdf
from sqlalchemy import select, delete

from papermerge.celery_app import app as current_app
from papermerge.core import tasks
from papermerge.core import constants as const
from papermerge.core.pathlib import abs_page_path
from papermerge.core.storage import get_storage_instance
Expand Down Expand Up @@ -706,14 +706,18 @@ def get_docver_ids(db_session, document_ids: list[uuid.UUID]) -> list[uuid.UUID]
@if_redis_present
def notify_version_update(add_ver_id: str, remove_ver_id: str):
# Send tasks to the index to remove/add pages
current_app.send_task(const.INDEX_UPDATE, (add_ver_id, remove_ver_id))
tasks.send_task(
const.INDEX_UPDATE,
kwargs={"add_ver_id": add_ver_id, "remove_ver_id": str(remove_ver_id)},
route_name="i3",
)

current_app.send_task(
tasks.send_task(
const.S3_WORKER_ADD_DOC_VER,
kwargs={"doc_ver_ids": [add_ver_id]},
route_name="s3",
)
current_app.send_task(
tasks.send_task(
const.S3_WORKER_REMOVE_DOC_VER,
kwargs={"doc_ver_ids": [remove_ver_id]},
route_name="s3",
Expand All @@ -724,13 +728,19 @@ def notify_version_update(add_ver_id: str, remove_ver_id: str):
def notify_add_docs(db_session, add_doc_ids: List[uuid.UUID]):
# send task to index
logger.debug(f"Sending task {const.INDEX_ADD_DOCS} with {add_doc_ids}")
current_app.send_task(const.INDEX_ADD_DOCS, (add_doc_ids,))
tasks.send_task(
const.INDEX_ADD_DOCS,
kwargs={
"doc_ids": [str(i) for i in add_doc_ids],
},
route_name="i3",
)

ids = [
str(doc_id) for doc_id in get_docver_ids(db_session, document_ids=add_doc_ids)
]

current_app.send_task(
tasks.send_task(
const.S3_WORKER_ADD_DOC_VER,
kwargs={"doc_ver_ids": ids},
route_name="s3",
Expand All @@ -740,15 +750,15 @@ def notify_add_docs(db_session, add_doc_ids: List[uuid.UUID]):
@if_redis_present
def notify_generate_previews(doc_id: list[str] | str):
if isinstance(doc_id, str):
current_app.send_task(
tasks.send_task(
const.S3_WORKER_GENERATE_PREVIEW,
kwargs={"doc_id": doc_id},
route_name="s3preview",
)
return
elif isinstance(doc_id, list):
for item in doc_id:
current_app.send_task(
tasks.send_task(
const.S3_WORKER_GENERATE_PREVIEW,
kwargs={"doc_id": item},
route_name="s3preview",
Expand Down
65 changes: 33 additions & 32 deletions papermerge/search/cli/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from salinic import IndexRW, create_engine
from typing_extensions import Annotated

from papermerge.core import db, schemas
from papermerge.core import dbapi, schema
from papermerge.core.db.engine import Session
from papermerge.search.schema import FOLDER, PAGE, SearchIndex

app = typer.Typer(help="Index commands")
Expand All @@ -25,42 +26,42 @@ def index_cmd(node_ids: NodeIDsType = None, dry_run: bool = False):

engine = create_engine(SEARCH_URL)
index = IndexRW(engine, schema=SearchIndex)
db_session = db.get_session()

nodes = db.get_nodes(db_session, node_ids)
items = [] # to be added to the index
for node in nodes:
if isinstance(node, schemas.Document):
last_ver = db.get_last_doc_ver(
db_session, user_id=node.user_id, doc_id=node.id
)
pages = db.get_doc_ver_pages(db_session, last_ver.id)
for page in pages:
with Session() as db_session:
nodes = dbapi.get_nodes(db_session, node_ids)
items = [] # to be added to the index
for node in nodes:
if isinstance(node, schema.Document):
last_ver = dbapi.get_last_doc_ver(
db_session, user_id=node.user_id, doc_id=node.id
)
pages = dbapi.get_doc_ver_pages(db_session, last_ver.id)
for page in pages:
item = SearchIndex(
id=str(page.id),
title=node.title,
user_id=str(node.user_id),
document_id=str(node.id),
document_version_id=str(last_ver.id),
page_number=page.number,
text=page.text,
entity_type=PAGE,
tags=[tag.name for tag in node.tags],
)
items.append(item)
else:
item = SearchIndex(
id=str(page.id),
id=str(node.id),
title=node.title,
user_id=str(node.user_id),
document_id=str(node.id),
document_version_id=str(last_ver.id),
page_number=page.number,
text=page.text,
entity_type=PAGE,
entity_type=FOLDER,
tags=[tag.name for tag in node.tags],
)
items.append(item)
else:
item = SearchIndex(
id=str(node.id),
title=node.title,
user_id=str(node.user_id),
entity_type=FOLDER,
tags=[tag.name for tag in node.tags],
)
items.append(item)

if dry_run:
for item in items:
print_json(data=item.model_dump())
else:
for item in items:
index.add(item)
if dry_run:
for item in items:
print_json(data=item.model_dump())
else:
for item in items:
index.add(item)
2 changes: 1 addition & 1 deletion papermerge/search/cli/index_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

app = typer.Typer(help="Index Schema Management")

SEARCH_URL = os.environ.get('PAPERMERGE__SEARCH__URL')
SEARCH_URL = os.environ.get("PAPERMERGE__SEARCH__URL")
if not SEARCH_URL:
raise ValueError("missing PAPERMERGE__SEARCH__URL")

Expand Down
11 changes: 7 additions & 4 deletions papermerge/search/cli/search.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid

import typer
from salinic import IndexRO, Search, create_engine
Expand All @@ -9,7 +10,9 @@


@app.callback(name="search", invoke_without_command=True)
def search_cmd(query: str):
def search_cmd(
query: str, user_id: uuid.UUID, page_number: int = 1, page_size: int = 10
):
SEARCH_URL = os.environ.get("PAPERMERGE__SEARCH__URL")
if not SEARCH_URL:
print("[red][bold]PAPERMERGE__SEARCH__URL[/bold] is missing[/red]")
Expand All @@ -19,7 +22,7 @@ def search_cmd(query: str):
engine = create_engine(SEARCH_URL)
index = IndexRO(engine, schema=Index)

sq = Search(Index).query(query)
sq = Search(Index).query(query, page_number=page_number, page_size=page_size)

for entity in index.search(sq):
print(entity.model_dump())
results = index.search(sq, user_id=str(user_id))
print(results)
Loading

0 comments on commit 25c2c7f

Please sign in to comment.