diff --git a/routers/http.py b/routers/http.py index a000a5b..d53e790 100644 --- a/routers/http.py +++ b/routers/http.py @@ -1,3 +1,5 @@ +import logging + from celery.result import AsyncResult from fastapi import APIRouter from pydantic import BaseModel @@ -37,16 +39,23 @@ async def ask(payload: RequestPayload): @router.get("/status") async def status(task_id: str): task = AsyncResult(task_id) - - # persisting the data updates in db - persister = PersistPayload() - - http_payload = HTTPPayload( - communityId=task.result["community_id"], - question=QuestionModel(message=task.result["question"]), - response=ResponseModel(message=task.result["response"]), - taskId=task.id, - ) - persister.persist_http(http_payload, update=True) - - return {"id": task.id, "status": task.status, "result": task.result} + if task.status == "SUCCESS": + http_payload = HTTPPayload( + communityId=task.result["community_id"], + question=QuestionModel(message=task.result["question"]), + response=ResponseModel(message=task.result["response"]), + taskId=task.id, + ) + + # persisting the data updates in db + try: + persister = PersistPayload() + persister.persist_http(http_payload, update=True) + except Exception as e: + logging.error(f"Failed to persist task result: {e}") + + results = {"id": task.id, "status": task.status, "result": task.result} + else: + results = {"id": task.id, "status": task.status} + + return results diff --git a/utils/persist_payload.py b/utils/persist_payload.py index 14e3107..42a04dc 100644 --- a/utils/persist_payload.py +++ b/utils/persist_payload.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime, timezone from schema import AMQPPayload, HTTPPayload from utils.mongo import MongoSingleton @@ -24,7 +25,11 @@ def persist_amqp(self, payload: AMQPPayload) -> None: community_id = payload.communityId try: self.client[self.db][self.internal_msgs_collection].insert_one( - payload.model_dump() + { + **payload.model_dump(), + "createdAt": datetime.now().replace(tzinfo=timezone.utc), + "updatedAt": datetime.now().replace(tzinfo=timezone.utc), + } ) logging.info( f"Payload for community id: {community_id} persisted successfully!" @@ -60,7 +65,15 @@ def persist_http(self, payload: HTTPPayload, update: bool = False) -> None: else: self.client[self.db][self.external_msgs_collection].update_one( {"taskId": payload.taskId}, - {"$set": {"response": payload.response.model_dump()}}, + { + "$set": { + "response": payload.response.model_dump(), + "updatedAt": datetime.now().replace(tzinfo=timezone.utc), + }, + "$setOnInsert": { + "createdAt": datetime.now().replace(tzinfo=timezone.utc), + }, + }, upsert=True, ) logging.info( diff --git a/worker/tasks.py b/worker/tasks.py index 3a411d2..7764b70 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -1,117 +1,11 @@ import gc -import json import logging -from typing import Any from celery.signals import task_postrun, task_prerun from subquery import query_multiple_source -from tc_messageBroker.rabbit_mq.event import Event -from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import ( - InteractionCallbackData, -) -from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import ( - ChatInputCommandInteraction, -) -from tc_messageBroker.rabbit_mq.payload.payload import Payload -from tc_messageBroker.rabbit_mq.queue import Queue from utils.data_source_selector import DataSourceSelector from utils.traceloop import init_tracing from worker.celery import app -from worker.utils.fire_event import job_send - - -@app.task -def ask_question_auto_search_discord_interaction( - question: str, - community_id: str, - bot_given_info: dict[str, Any], -) -> None: - """ - this task is for the case that the user asks a question - and use the discord interaction schema - it would first retrieve the search metadata from summaries - then perform a query on the filetred raw data to find answer - - Parameters - ------------ - question : str - the user question - community_id : str - the community that the question was asked in - bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction - the information data that needed to be sent back to the bot again. - This would be a dictionary representing the keys - - `event` - - `date` - - `content`: which is the `ChatInputCommandInteraction` as a dictionary - """ - - prefix = f"COMMUNITY_ID: {community_id} | " - logging.info(f"{prefix}Processing question!") - interaction = json.loads(bot_given_info["content"]["interaction"]) - chat_input_interaction = ChatInputCommandInteraction.from_dict(interaction) - - try: - # create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create( - # interaction=chat_input_interaction, - # data=InteractionResponse( - # type=4, - # data=InteractionCallbackData( - # content="Processing your question ...", flags=64 - # ), - # ), - # ).to_dict() - - # logging.info(f"{prefix}Sending process question to discord-bot!") - # job_send( - # event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE, - # queue_name=Queue.DISCORD_BOT, - # content=create_interaction_content, - # ) - logging.info(f"{prefix}Querying the data sources!") - # for now we have just the discord platform - response = query_data_sources(community_id=community_id, query=question) - - # source_nodes_dict: list[dict[str, Any]] = [] - # for node in source_nodes: - # node_dict = dict(node) - # node_dict.pop("relationships", None) - # source_nodes_dict.append(node_dict) - - # results = { - # "response": response, - # The source of answers is commented for now - # "source_nodes": source_nodes_dict, - # } - results = f"**Question:** {question}\n**Answer:** {response}" - - response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit( - interaction=chat_input_interaction, - data=InteractionCallbackData( - # content=json.dumps(results) - content=results - ), - ).to_dict() - - logging.info(f"{prefix}Sending Edit response to discord-bot!") - job_send( - event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT, - queue_name=Queue.DISCORD_BOT, - content=response_payload, - ) - except Exception as exp: - logging.error(f"Exception {exp} | during processing the question {question}") - response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit( - interaction=chat_input_interaction, - data=InteractionCallbackData( - content="Sorry, We cannot process your question at the moment." - ), - ).to_dict() - job_send( - event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT, - queue_name=Queue.DISCORD_BOT, - content=response_payload, - ) @app.task @@ -119,7 +13,14 @@ def ask_question_auto_search( community_id: str, query: str, ) -> dict[str, str]: - response = query_data_sources(community_id=community_id, query=query) + try: + response = query_data_sources(community_id=community_id, query=query) + except Exception: + response = "Sorry, We cannot process your question at the moment." + logging.error( + f"Errors raised while processing the question for community: {community_id}!" + ) + return { "community_id": community_id, "question": query,