Skip to content

Commit

Permalink
Merge pull request #87 from TogetherCrew/fastapi
Browse files Browse the repository at this point in the history
feat: Added time for data dump + fix 500 error!
  • Loading branch information
amindadgar authored Nov 4, 2024
2 parents 5cc5aee + b2c84c7 commit db72e35
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 122 deletions.
35 changes: 22 additions & 13 deletions routers/http.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from celery.result import AsyncResult
from fastapi import APIRouter
from pydantic import BaseModel
Expand Down Expand Up @@ -37,16 +39,23 @@ async def ask(payload: RequestPayload):
@router.get("/status")
async def status(task_id: str):
task = AsyncResult(task_id)

# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

return {"id": task.id, "status": task.status, "result": task.result}
if task.status == "SUCCESS":
http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)

# persisting the data updates in db
try:
persister = PersistPayload()
persister.persist_http(http_payload, update=True)
except Exception as e:
logging.error(f"Failed to persist task result: {e}")

results = {"id": task.id, "status": task.status, "result": task.result}
else:
results = {"id": task.id, "status": task.status}

return results
17 changes: 15 additions & 2 deletions utils/persist_payload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime, timezone

from schema import AMQPPayload, HTTPPayload
from utils.mongo import MongoSingleton
Expand All @@ -24,7 +25,11 @@ def persist_amqp(self, payload: AMQPPayload) -> None:
community_id = payload.communityId
try:
self.client[self.db][self.internal_msgs_collection].insert_one(
payload.model_dump()
{
**payload.model_dump(),
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
}
)
logging.info(
f"Payload for community id: {community_id} persisted successfully!"
Expand Down Expand Up @@ -60,7 +65,15 @@ def persist_http(self, payload: HTTPPayload, update: bool = False) -> None:
else:
self.client[self.db][self.external_msgs_collection].update_one(
{"taskId": payload.taskId},
{"$set": {"response": payload.response.model_dump()}},
{
"$set": {
"response": payload.response.model_dump(),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
},
"$setOnInsert": {
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
},
},
upsert=True,
)
logging.info(
Expand Down
115 changes: 8 additions & 107 deletions worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,26 @@
import gc
import json
import logging
from typing import Any

from celery.signals import task_postrun, task_prerun
from subquery import query_multiple_source
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import (
InteractionCallbackData,
)
from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import (
ChatInputCommandInteraction,
)
from tc_messageBroker.rabbit_mq.payload.payload import Payload
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.data_source_selector import DataSourceSelector
from utils.traceloop import init_tracing
from worker.celery import app
from worker.utils.fire_event import job_send


@app.task
def ask_question_auto_search_discord_interaction(
question: str,
community_id: str,
bot_given_info: dict[str, Any],
) -> None:
"""
this task is for the case that the user asks a question
and use the discord interaction schema
it would first retrieve the search metadata from summaries
then perform a query on the filetred raw data to find answer
Parameters
------------
question : str
the user question
community_id : str
the community that the question was asked in
bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction
the information data that needed to be sent back to the bot again.
This would be a dictionary representing the keys
- `event`
- `date`
- `content`: which is the `ChatInputCommandInteraction` as a dictionary
"""

prefix = f"COMMUNITY_ID: {community_id} | "
logging.info(f"{prefix}Processing question!")
interaction = json.loads(bot_given_info["content"]["interaction"])
chat_input_interaction = ChatInputCommandInteraction.from_dict(interaction)

try:
# create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create(
# interaction=chat_input_interaction,
# data=InteractionResponse(
# type=4,
# data=InteractionCallbackData(
# content="Processing your question ...", flags=64
# ),
# ),
# ).to_dict()

# logging.info(f"{prefix}Sending process question to discord-bot!")
# job_send(
# event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE,
# queue_name=Queue.DISCORD_BOT,
# content=create_interaction_content,
# )
logging.info(f"{prefix}Querying the data sources!")
# for now we have just the discord platform
response = query_data_sources(community_id=community_id, query=question)

# source_nodes_dict: list[dict[str, Any]] = []
# for node in source_nodes:
# node_dict = dict(node)
# node_dict.pop("relationships", None)
# source_nodes_dict.append(node_dict)

# results = {
# "response": response,
# The source of answers is commented for now
# "source_nodes": source_nodes_dict,
# }
results = f"**Question:** {question}\n**Answer:** {response}"

response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
# content=json.dumps(results)
content=results
),
).to_dict()

logging.info(f"{prefix}Sending Edit response to discord-bot!")
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)
except Exception as exp:
logging.error(f"Exception {exp} | during processing the question {question}")
response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
content="Sorry, We cannot process your question at the moment."
),
).to_dict()
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)


@app.task
def ask_question_auto_search(
community_id: str,
query: str,
) -> dict[str, str]:
response = query_data_sources(community_id=community_id, query=query)
try:
response = query_data_sources(community_id=community_id, query=query)
except Exception:
response = "Sorry, We cannot process your question at the moment."
logging.error(
f"Errors raised while processing the question for community: {community_id}!"
)

return {
"community_id": community_id,
"question": query,
Expand Down

0 comments on commit db72e35

Please sign in to comment.