Skip to content

Commit

Permalink
up (#1556)
Browse files Browse the repository at this point in the history
* up

* fix
  • Loading branch information
emrgnt-cmplxty authored Nov 4, 2024
1 parent b93db38 commit 40233cc
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 48 deletions.
10 changes: 10 additions & 0 deletions js/sdk/src/r2rClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ export class r2rClient {
document_ids?: string[];
user_ids?: (string | null)[];
ingestion_config?: Record<string, any>;
collection_ids?: string[];
run_with_orchestration?: boolean;
} = {},
): Promise<any> {
Expand Down Expand Up @@ -560,6 +561,9 @@ export class r2rClient {
ingestion_config: options.ingestion_config
? JSON.stringify(options.ingestion_config)
: undefined,
collection_ids: options.collection_ids
? JSON.stringify(options.collection_ids)
: undefined,
run_with_orchestration:
options.run_with_orchestration != undefined
? String(options.run_with_orchestration)
Expand Down Expand Up @@ -601,6 +605,7 @@ export class r2rClient {
document_ids: string[];
metadatas?: Record<string, any>[];
ingestion_config?: Record<string, any>;
collection_ids?: string[];
run_with_orchestration?: boolean;
},
): Promise<any> {
Expand Down Expand Up @@ -642,6 +647,9 @@ export class r2rClient {
ingestion_config: options.ingestion_config
? JSON.stringify(options.ingestion_config)
: undefined,
collection_ids: options.collection_ids
? JSON.stringify(options.collection_ids)
: undefined,
run_with_orchestration:
options.run_with_orchestration != undefined
? String(options.run_with_orchestration)
Expand Down Expand Up @@ -675,13 +683,15 @@ export class r2rClient {
chunks: RawChunk[],
documentId?: string,
metadata?: Record<string, any>,
collection_ids?: string[],
run_with_orchestration?: boolean,
): Promise<Record<string, any>> {
this._ensureAuthenticated();
let inputData: Record<string, any> = {
chunks: chunks,
document_id: documentId,
metadata: metadata,
collection_ids: collection_ids,
run_with_orchestration: run_with_orchestration,
};

Expand Down
2 changes: 1 addition & 1 deletion py/core/base/abstractions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
)
from shared.abstractions.graph import (
Community,
CommunityInfo,
CommunityReport,
Entity,
EntityLevel,
EntityType,
KGExtraction,
RelationshipType,
Triple,
CommunityInfo,
)
from shared.abstractions.ingestion import (
ChunkEnrichmentSettings,
Expand Down
17 changes: 17 additions & 0 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ async def ingest_files_app(
None,
description=ingest_files_descriptions.get("document_ids"),
),
collection_ids: Optional[Json[list[list[UUID]]]] = Form(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
metadatas: Optional[Json[list[dict]]] = Form(
None, description=ingest_files_descriptions.get("metadatas")
),
Expand Down Expand Up @@ -181,6 +185,9 @@ async def ingest_files_app(
"ingestion_config": ingestion_config,
"user": auth_user.model_dump_json(),
"size_in_bytes": content_length,
"collection_ids": (
collection_ids[it] if collection_ids else None
),
"is_update": False,
}

Expand Down Expand Up @@ -241,6 +248,10 @@ async def update_files_app(
document_ids: Optional[Json[list[UUID]]] = Form(
None, description=ingest_files_descriptions.get("document_ids")
),
collection_ids: Optional[Json[list[list[UUID]]]] = Form(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
metadatas: Optional[Json[list[dict]]] = Form(
None, description=ingest_files_descriptions.get("metadatas")
),
Expand Down Expand Up @@ -314,6 +325,7 @@ async def update_files_app(
"ingestion_config": ingestion_config,
"user": auth_user.model_dump_json(),
"is_update": True,
"collection_ids": collection_ids,
}

if run_with_orchestration:
Expand Down Expand Up @@ -357,6 +369,10 @@ async def ingest_chunks_app(
metadata: Optional[dict] = Body(
None, description=ingest_files_descriptions.get("metadata")
),
collection_ids: Optional[Json[list[list[UUID]]]] = Body(
None,
description="Optional collection IDs for the documents, if provided the document will be assigned to them at ingestion.",
),
run_with_orchestration: Optional[bool] = Body(
True,
description=ingest_files_descriptions.get(
Expand Down Expand Up @@ -388,6 +404,7 @@ async def ingest_chunks_app(
"chunks": [chunk.model_dump() for chunk in chunks],
"metadata": metadata or {},
"user": auth_user.model_dump_json(),
"collection_ids": collection_ids,
}
if run_with_orchestration:
raw_message = await self.orchestration_provider.run_workflow(
Expand Down
91 changes: 72 additions & 19 deletions py/core/main/orchestration/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,43 @@ async def parse(self, context: Context) -> dict:
status=IngestionStatus.SUCCESS,
)

# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
collection_ids = context.workflow_input()["request"].get(
"collection_ids"
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)

# get server chunk enrichment settings and override parts of it if provided in the ingestion config
server_chunk_enrichment_settings = getattr(
Expand Down Expand Up @@ -450,16 +476,43 @@ async def finalize(self, context: Context) -> dict:

try:
# TODO - Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await self.ingestion_service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await self.ingestion_service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
collection_ids = context.workflow_input()["request"].get(
"collection_ids"
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title or "N/A",
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
93 changes: 72 additions & 21 deletions py/core/main/orchestration/simple/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,43 @@ async def ingest_files(input_data):
document_info, status=IngestionStatus.SUCCESS
)

collection_ids = parsed_data.get("collection_ids")

try:
# TODO - Move logic onto management service
collection_id = generate_default_user_collection_id(
str(document_info.user_id)
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_info.id, collection_id
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down Expand Up @@ -229,18 +254,44 @@ async def ingest_chunks(input_data):
document_info, status=IngestionStatus.SUCCESS
)

collection_ids = parsed_data.get("collection_ids")

try:
# TODO - Move logic onto management service
collection_id = generate_default_user_collection_id(
str(document_info.user_id)
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id, collection_id=collection_id
)
if not collection_ids:
# TODO: Move logic onto the `management service`
collection_id = generate_default_user_collection_id(
document_info.user_id
)
await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
else:
for collection_id in collection_ids:
try:
await service.providers.database.create_collection(
name=document_info.title,
collection_id=collection_id,
description="",
)
except Exception as e:
logger.warning(
f"Warning, could not create collection with error: {str(e)}"
)

await service.providers.database.assign_document_to_collection_relational(
document_id=document_info.id,
collection_id=collection_id,
)
await service.providers.database.assign_document_to_collection_vector(
document_id=document_info.id,
collection_id=collection_id,
)
except Exception as e:
logger.error(
f"Error during assigning document to collection: {str(e)}"
Expand Down
5 changes: 2 additions & 3 deletions py/core/main/orchestration/simple/kg_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import math
import uuid

from core import GenerationConfig
from core import R2RException
from core import GenerationConfig, R2RException
from core.base.abstractions import KGEnrichmentStatus

from ...services import KgService
from core.base.abstractions import KGEnrichmentStatus

logger = logging.getLogger()

Expand Down
2 changes: 2 additions & 0 deletions py/core/main/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def ingest_file_ingress(
metadata: Optional[dict] = None,
version: Optional[str] = None,
is_update: bool = False,
collection_ids: Optional[list[UUID]] = None,
*args: Any,
**kwargs: Any,
) -> dict:
Expand Down Expand Up @@ -634,6 +635,7 @@ def parse_ingest_file_input(data: dict) -> dict:
"is_update": data.get("is_update", False),
"file_data": data["file_data"],
"size_in_bytes": data["size_in_bytes"],
"collection_ids": data.get("collection_ids", []),
}

@staticmethod
Expand Down
3 changes: 2 additions & 1 deletion py/core/pipes/kg/deduplication.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
import logging
from typing import Any, Optional, Union
from uuid import UUID
import json

from core.base import AsyncState, R2RException
from core.base.abstractions import Entity, KGEntityDeduplicationType
from core.base.pipes import AsyncPipe
Expand Down
4 changes: 2 additions & 2 deletions py/core/providers/database/kg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
Triple,
)
from core.base.abstractions import (
EntityLevel,
CommunityInfo,
KGEnrichmentStatus,
EntityLevel,
KGCreationSettings,
KGEnrichmentSettings,
KGEnrichmentStatus,
KGEntityDeduplicationSettings,
VectorQuantizationType,
)
Expand Down
Loading

0 comments on commit 40233cc

Please sign in to comment.