Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

up #1556

Merged
merged 2 commits into from
Nov 4, 2024
Merged

up #1556

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions js/sdk/src/r2rClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ export class r2rClient {
document_ids?: string[];
user_ids?: (string | null)[];
ingestion_config?: Record<string, any>;
collection_ids?: string[];
run_with_orchestration?: boolean;
} = {},
): Promise<any> {
Expand Down Expand Up @@ -560,6 +561,9 @@ export class r2rClient {
ingestion_config: options.ingestion_config
? JSON.stringify(options.ingestion_config)
: undefined,
collection_ids: options.collection_ids
? JSON.stringify(options.collection_ids)
: undefined,
run_with_orchestration:
options.run_with_orchestration != undefined
? String(options.run_with_orchestration)
Expand Down Expand Up @@ -601,6 +605,7 @@ export class r2rClient {
document_ids: string[];
metadatas?: Record<string, any>[];
ingestion_config?: Record<string, any>;
collection_ids?: string[];
run_with_orchestration?: boolean;
},
): Promise<any> {
Expand Down Expand Up @@ -642,6 +647,9 @@ export class r2rClient {
ingestion_config: options.ingestion_config
? JSON.stringify(options.ingestion_config)
: undefined,
collection_ids: options.collection_ids
? JSON.stringify(options.collection_ids)
: undefined,
run_with_orchestration:
options.run_with_orchestration != undefined
? String(options.run_with_orchestration)
Expand Down Expand Up @@ -675,13 +683,15 @@ export class r2rClient {
chunks: RawChunk[],
documentId?: string,
metadata?: Record<string, any>,
collection_ids?: string[],
run_with_orchestration?: boolean,
): Promise<Record<string, any>> {
this._ensureAuthenticated();
let inputData: Record<string, any> = {
chunks: chunks,
document_id: documentId,
metadata: metadata,
collection_ids: collection_ids,
run_with_orchestration: run_with_orchestration,
};

Expand Down
2 changes: 1 addition & 1 deletion py/core/base/abstractions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
)
from shared.abstractions.graph import (
Community,
CommunityInfo,
CommunityReport,
Entity,
EntityLevel,
EntityType,
KGExtraction,
RelationshipType,
Triple,
CommunityInfo,
)
from shared.abstractions.ingestion import (
ChunkEnrichmentSettings,
Expand Down
17 changes: 17 additions & 0 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ async def ingest_files_app(
None,
description=ingest_files_descriptions.get("document_ids"),
),
collection_ids: Optional[Json[list[list[UUID]]]] = Form(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
metadatas: Optional[Json[list[dict]]] = Form(
None, description=ingest_files_descriptions.get("metadatas")
),
Expand Down Expand Up @@ -181,6 +185,9 @@ async def ingest_files_app(
"ingestion_config": ingestion_config,
"user": auth_user.model_dump_json(),
"size_in_bytes": content_length,
"collection_ids": (
collection_ids[it] if collection_ids else None
),
"is_update": False,
}

Expand Down Expand Up @@ -241,6 +248,10 @@ async def update_files_app(
document_ids: Optional[Json[list[UUID]]] = Form(
None, description=ingest_files_descriptions.get("document_ids")
),
collection_ids: Optional[Json[list[list[UUID]]]] = Form(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
metadatas: Optional[Json[list[dict]]] = Form(
None, description=ingest_files_descriptions.get("metadatas")
),
Expand Down Expand Up @@ -314,6 +325,7 @@ async def update_files_app(
"ingestion_config": ingestion_config,
"user": auth_user.model_dump_json(),
"is_update": True,
"collection_ids": collection_ids,
}

if run_with_orchestration:
Expand Down Expand Up @@ -357,6 +369,10 @@ async def ingest_chunks_app(
metadata: Optional[dict] = Body(
None, description=ingest_files_descriptions.get("metadata")
),
collection_ids: Optional[Json[list[list[UUID]]]] = Body(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
run_with_orchestration: Optional[bool] = Body(
True,
description=ingest_files_descriptions.get(
Expand Down Expand Up @@ -388,6 +404,7 @@ async def ingest_chunks_app(
"chunks": [chunk.model_dump() for chunk in chunks],
"metadata": metadata or {},
"user": auth_user.model_dump_json(),
"collection_ids": collection_ids,
}
if run_with_orchestration:
raw_message = await self.orchestration_provider.run_workflow(
Expand Down
91 changes: 72 additions & 19 deletions py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,43 @@ async def parse(self, context: Context) -> dict:
status=IngestionStatus.SUCCESS,
)

# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
collection_ids = context.workflow_input()["request"].get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate logic for assigning documents to collections is already present in simple/ingestion_workflow.py. Consider reusing or refactoring the existing code.

"collection_ids"
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)

# get server chunk enrichment settings and override parts of it if provided in the ingestion config
server_chunk_enrichment_settings = getattr(
Expand Down Expand Up @@ -450,16 +476,43 @@ async def finalize(self, context: Context) -> dict:

try:
# TODO - Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await self.ingestion_service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await self.ingestion_service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
collection_ids = context.workflow_input()["request"].get(
"collection_ids"
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title or "N/A",
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
93 changes: 72 additions & 21 deletions py/core/main/orchestration/simple/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,43 @@ async def ingest_files(input_data):
document_info, status=IngestionStatus.SUCCESS
)

collection_ids = parsed_data.get("collection_ids")

try:
# TODO - Move logic onto management service
collection_id = generate_default_user_collection_id(
str(document_info.user_id)
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_info.id, collection_id
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down Expand Up @@ -229,18 +254,44 @@ async def ingest_chunks(input_data):
document_info, status=IngestionStatus.SUCCESS
)

collection_ids = parsed_data.get("collection_ids")

try:
# TODO - Move logic onto management service
collection_id = generate_default_user_collection_id(
str(document_info.user_id)
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
5 changes: 2 additions & 3 deletions py/core/main/orchestration/simple/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import math
import uuid

from core import GenerationConfig
from core import R2RException
from core import GenerationConfig, R2RException
from core.base.abstractions import KGEnrichmentStatus

from ...services import KgService
from core.base.abstractions import KGEnrichmentStatus

logger = logging.getLogger()

Expand Down
2 changes: 2 additions & 0 deletions py/core/main/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def ingest_file_ingress(
metadata: Optional[dict] = None,
version: Optional[str] = None,
is_update: bool = False,
collection_ids: Optional[list[UUID]] = None,
*args: Any,
**kwargs: Any,
) -> dict:
Expand Down Expand Up @@ -634,6 +635,7 @@ def parse_ingest_file_input(data: dict) -> dict:
"is_update": data.get("is_update", False),
"file_data": data["file_data"],
"size_in_bytes": data["size_in_bytes"],
"collection_ids": data.get("collection_ids", []),
}

@staticmethod
Expand Down
3 changes: 2 additions & 1 deletion py/core/pipes/kg/deduplication.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import logging
from typing import Any, Optional, Union
from uuid import UUID
import json

from core.base import AsyncState, R2RException
from core.base.abstractions import Entity, KGEntityDeduplicationType
from core.base.pipes import AsyncPipe
Expand Down
4 changes: 2 additions & 2 deletions py/core/providers/database/kg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
Triple,
)
from core.base.abstractions import (
EntityLevel,
CommunityInfo,
KGEnrichmentStatus,
EntityLevel,
KGCreationSettings,
KGEnrichmentSettings,
KGEnrichmentStatus,
KGEntityDeduplicationSettings,
VectorQuantizationType,
)
Expand Down
Loading
Loading