Skip to content

Commit

Permalink
Merge pull request #82 from TogetherCrew/fastapi
Browse files Browse the repository at this point in the history
implement fastapi for http server
  • Loading branch information
amindadgar authored Nov 1, 2024
2 parents b22b823 + bc71ffc commit 7ca89ee
Show file tree
Hide file tree
Showing 15 changed files with 715 additions and 87 deletions.
41 changes: 39 additions & 2 deletions docker-compose.example.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
---

services:
server:
api:
build:
context: .
target: prod
command: python3 server.py
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
ports:
- 8000:8000
environment:
- RABBIT_USER=root
- RABBIT_PASSWORD=pass
- RABBIT_HOST=rabbitmq
- RABBIT_PORT=5672
- REDIS_PASSWORD=pass
- REDIS_HOST=redis
- REDIS_PORT=6379
volumes:
- ./:/project/
worker:
build:
context: .
target: prod
dockerfile: Dockerfile
environment:
- RABBIT_USER=root
- RABBIT_PASSWORD=pass
- RABBIT_HOST=rabbitmq
- RABBIT_PORT=5672
- REDIS_PASSWORD=pass
- REDIS_HOST=redis
- REDIS_PORT=6379
rabbitmq:
image: "rabbitmq:3-management-alpine"
environment:
- RABBITMQ_DEFAULT_USER=root
- RABBITMQ_DEFAULT_PASS=pass
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 2
start_period: 40s
ports:
- 5672:5672
redis:
image: bitnami/redis
environment:
- REDIS_PASSWORD=pass
8 changes: 8 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from fastapi import FastAPI
from routers.amqp import router as amqpRouter
from routers.http import router as httpRouter

app = FastAPI()

app.include_router(httpRouter)
app.include_router(amqpRouter)
8 changes: 6 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ python-dotenv==1.0.0
tc-hivemind-backend==1.2.2
llama-index-question-gen-guidance==0.1.2
llama-index-vector-stores-postgres==0.1.2
celery>=5.3.6, <6.0.0
celery[redis]>=5.3.6, <6.0.0
guidance==0.1.14
tc-messageBroker==1.6.6
tc-messageBroker==1.7.1
traceloop-sdk==0.14.1
backoff==2.2.1
fastapi[standard]==0.114.1
faststream==0.5.23
aio_pika==9.4.0
mongomock==4.2.0.post1
61 changes: 61 additions & 0 deletions routers/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import datetime

from faststream.rabbit import RabbitBroker
from faststream.rabbit.fastapi import Logger, RabbitRouter # type: ignore
from faststream.rabbit.schemas.queue import RabbitQueue
from pydantic import BaseModel
from schema import AMQPPayload, ResponseModel
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.credentials import load_rabbitmq_credentials
from utils.persist_payload import PersistPayload
from worker.tasks import query_data_sources

rabbitmq_creds = load_rabbitmq_credentials()

router = RabbitRouter(rabbitmq_creds["url"])
broker = RabbitBroker(url=rabbitmq_creds["url"])


class Payload(BaseModel):
event: str
date: datetime | str
content: AMQPPayload


@router.subscriber(queue=RabbitQueue(name=Queue.HIVEMIND, durable=True))
async def ask(payload: Payload, logger: Logger):
if payload.event == Event.HIVEMIND.QUESTION_RECEIVED:
try:
question = payload.content.question.message
community_id = payload.content.communityId

logger.info(f"COMMUNITY_ID: {community_id} Received job")
response = query_data_sources(community_id=community_id, query=question)
logger.info(f"COMMUNITY_ID: {community_id} Job finished")

response_payload = AMQPPayload(
communityId=community_id,
route=payload.content.route,
question=payload.content.question,
response=ResponseModel(message=response),
metadata=payload.content.metadata,
)
# dumping the whole payload of question & answer to db
persister = PersistPayload()
persister.persist_amqp(response_payload)

result = Payload(
event=payload.content.route.destination.event,
date=str(datetime.now()),
content=response_payload.model_dump(),
)
await broker.publish(
message=result, queue=payload.content.route.destination.queue
)
except Exception as e:
logger.exception(f"Errors While processing job! {e}")
else:
logger.error(
f"No such `{payload.event}` event available for {Queue.HIVEMIND} queue!"
)
52 changes: 52 additions & 0 deletions routers/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from celery.result import AsyncResult
from fastapi import APIRouter
from pydantic import BaseModel
from schema import HTTPPayload, QuestionModel, ResponseModel
from utils.persist_payload import PersistPayload
from worker.tasks import ask_question_auto_search


class RequestPayload(BaseModel):
question: QuestionModel
communityId: str


router = APIRouter()


@router.post("/ask")
async def ask(payload: RequestPayload):
query = payload.question.message
community_id = payload.communityId
task = ask_question_auto_search.delay(
community_id=community_id,
query=query,
)
payload_http = HTTPPayload(
communityId=community_id,
question=payload.question,
taskId=task.id,
)
# persisting the payload
persister = PersistPayload()
persister.persist_http(payload_http)

return {"id": task.id}


@router.get("/status")
async def status(task_id: str):
task = AsyncResult(task_id)

# persisting the data updates in db
persister = PersistPayload()

http_payload = HTTPPayload(
communityId=task.result["community_id"],
question=QuestionModel(message=task.result["question"]),
response=ResponseModel(message=task.result["response"]),
taskId=task.id,
)
persister.persist_http(http_payload, update=True)

return {"id": task.id, "status": task.status, "result": task.result}
1 change: 1 addition & 0 deletions schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .payload import AMQPPayload, HTTPPayload, QuestionModel, ResponseModel
35 changes: 35 additions & 0 deletions schema/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pydantic import BaseModel


class DestinationModel(BaseModel):
queue: str
event: str


class RouteModel(BaseModel):
source: str
destination: DestinationModel | None


class QuestionModel(BaseModel):
message: str
filters: dict | None = None


class ResponseModel(BaseModel):
message: str


class AMQPPayload(BaseModel):
communityId: str
route: RouteModel
question: QuestionModel
response: ResponseModel | None = None
metadata: dict | None


class HTTPPayload(BaseModel):
communityId: str
question: QuestionModel
response: ResponseModel | None = None
taskId: str
70 changes: 0 additions & 70 deletions server.py

This file was deleted.

26 changes: 26 additions & 0 deletions test_rb_send_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from tc_messageBroker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue

if __name__ == "__main__":
broker_url = "localhost"
port = 5672
username = "root"
password = "pass"

rabbit_mq = RabbitMQ(
broker_url=broker_url, port=port, username=username, password=password
)

rabbit_mq.connect(Queue.HIVEMIND, queue_durable=False)

content = {
"community_id": "****",
"question": "What is Hivemind?",
}

rabbit_mq.publish(
Queue.HIVEMIND,
event=Event.DISCORD_ANALYZER.RUN,
content=content,
)
Loading

0 comments on commit 7ca89ee

Please sign in to comment.