Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added time for data dump + fix 500 error! #87

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions routers/http.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from celery.result import AsyncResult
from fastapi import APIRouter
from pydantic import BaseModel
Expand Down Expand Up @@ -37,16 +39,23 @@ async def ask(payload: RequestPayload):
@router.get("/status")
async def status(task_id: str):
task = AsyncResult(task_id)

# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

return {"id": task.id, "status": task.status, "result": task.result}
if task.status == "SUCCESS":
http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)

# persisting the data updates in db
try:
persister = PersistPayload()
persister.persist_http(http_payload, update=True)
except Exception as e:
logging.error(f"Failed to persist task result: {e}")

results = {"id": task.id, "status": task.status, "result": task.result}
else:
results = {"id": task.id, "status": task.status}

return results
17 changes: 15 additions & 2 deletions utils/persist_payload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime, timezone

from schema import AMQPPayload, HTTPPayload
from utils.mongo import MongoSingleton
Expand All @@ -24,7 +25,11 @@ def persist_amqp(self, payload: AMQPPayload) -> None:
community_id = payload.communityId
try:
self.client[self.db][self.internal_msgs_collection].insert_one(
payload.model_dump()
{
**payload.model_dump(),
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
}
)
logging.info(
f"Payload for community id: {community_id} persisted successfully!"
Expand Down Expand Up @@ -60,7 +65,15 @@ def persist_http(self, payload: HTTPPayload, update: bool = False) -> None:
else:
self.client[self.db][self.external_msgs_collection].update_one(
{"taskId": payload.taskId},
{"$set": {"response": payload.response.model_dump()}},
{
"$set": {
"response": payload.response.model_dump(),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
},
"$setOnInsert": {
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
},
},
upsert=True,
)
logging.info(
Expand Down
115 changes: 8 additions & 107 deletions worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,125 +1,26 @@
import gc
import json
import logging
from typing import Any

from celery.signals import task_postrun, task_prerun
from subquery import query_multiple_source
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.payload.discord_bot.base_types.interaction_callback_data import (
InteractionCallbackData,
)
from tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction import (
ChatInputCommandInteraction,
)
from tc_messageBroker.rabbit_mq.payload.payload import Payload
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.data_source_selector import DataSourceSelector
from utils.traceloop import init_tracing
from worker.celery import app
from worker.utils.fire_event import job_send


@app.task
def ask_question_auto_search_discord_interaction(
question: str,
community_id: str,
bot_given_info: dict[str, Any],
) -> None:
"""
this task is for the case that the user asks a question
and use the discord interaction schema
it would first retrieve the search metadata from summaries
then perform a query on the filetred raw data to find answer

Parameters
------------
question : str
the user question
community_id : str
the community that the question was asked in
bot_given_info : tc_messageBroker.rabbit_mq.payload.discord_bot.chat_input_interaction.ChatInputCommandInteraction
the information data that needed to be sent back to the bot again.
This would be a dictionary representing the keys
- `event`
- `date`
- `content`: which is the `ChatInputCommandInteraction` as a dictionary
"""

prefix = f"COMMUNITY_ID: {community_id} | "
logging.info(f"{prefix}Processing question!")
interaction = json.loads(bot_given_info["content"]["interaction"])
chat_input_interaction = ChatInputCommandInteraction.from_dict(interaction)

try:
# create_interaction_content = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Create(
# interaction=chat_input_interaction,
# data=InteractionResponse(
# type=4,
# data=InteractionCallbackData(
# content="Processing your question ...", flags=64
# ),
# ),
# ).to_dict()

# logging.info(f"{prefix}Sending process question to discord-bot!")
# job_send(
# event=Event.DISCORD_BOT.INTERACTION_RESPONSE.CREATE,
# queue_name=Queue.DISCORD_BOT,
# content=create_interaction_content,
# )
logging.info(f"{prefix}Querying the data sources!")
# for now we have just the discord platform
response = query_data_sources(community_id=community_id, query=question)

# source_nodes_dict: list[dict[str, Any]] = []
# for node in source_nodes:
# node_dict = dict(node)
# node_dict.pop("relationships", None)
# source_nodes_dict.append(node_dict)

# results = {
# "response": response,
# The source of answers is commented for now
# "source_nodes": source_nodes_dict,
# }
results = f"**Question:** {question}\n**Answer:** {response}"

response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
# content=json.dumps(results)
content=results
),
).to_dict()

logging.info(f"{prefix}Sending Edit response to discord-bot!")
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)
except Exception as exp:
logging.error(f"Exception {exp} | during processing the question {question}")
response_payload = Payload.DISCORD_BOT.INTERACTION_RESPONSE.Edit(
interaction=chat_input_interaction,
data=InteractionCallbackData(
content="Sorry, We cannot process your question at the moment."
),
).to_dict()
job_send(
event=Event.DISCORD_BOT.INTERACTION_RESPONSE.EDIT,
queue_name=Queue.DISCORD_BOT,
content=response_payload,
)


@app.task
def ask_question_auto_search(
community_id: str,
query: str,
) -> dict[str, str]:
response = query_data_sources(community_id=community_id, query=query)
try:
response = query_data_sources(community_id=community_id, query=query)
except Exception:
response = "Sorry, We cannot process your question at the moment."
logging.error(
f"Errors raised while processing the question for community: {community_id}!"
)
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

return {
"community_id": community_id,
"question": query,
Expand Down
Loading