Skip to content

Commit

Permalink
Merge pull request #91 from TogetherCrew/fix/rabbitmq-connection-not-…
Browse files Browse the repository at this point in the history
…initialized

fix: rabbitMQ connection was not initialized!
  • Loading branch information
amindadgar authored Nov 7, 2024
2 parents 589f607 + 50606d6 commit 55726aa
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
14 changes: 8 additions & 6 deletions routers/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from utils.credentials import load_rabbitmq_credentials
from utils.persist_payload import PersistPayload
from worker.tasks import query_data_sources
from worker.utils.fire_event import job_send

rabbitmq_creds = load_rabbitmq_credentials()

router = RabbitRouter(rabbitmq_creds["url"])
broker = RabbitBroker(url=rabbitmq_creds["url"])


class Payload(BaseModel):
Expand Down Expand Up @@ -45,14 +45,16 @@ async def ask(payload: Payload, logger: Logger):
persister = PersistPayload()
persister.persist_amqp(response_payload)

result = Payload(
# result = Payload(
# event=payload.content.route.destination.event,
# date=str(datetime.now()),
# content=response_payload.model_dump(),
# )
job_send(
event=payload.content.route.destination.event,
date=str(datetime.now()),
queue_name=payload.content.route.destination.queue,
content=response_payload.model_dump(),
)
await broker.publish(
message=result, queue=payload.content.route.destination.queue
)
except Exception as e:
logger.exception(f"Errors While processing job! {e}")
else:
Expand Down
5 changes: 0 additions & 5 deletions worker/utils/fire_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None:
content : dict[str, Any]
the content to send messages to
"""
logging.info(f"IN JOB_SEND!, event: {event}")

rabbit_creds = load_rabbitmq_credentials()
username = rabbit_creds["user"]
password = rabbit_creds["password"]
Expand All @@ -28,15 +26,12 @@ def job_send(event: str, queue_name: str, content: dict[str, Any]) -> None:
rabbit_mq = RabbitMQ(
broker_url=broker_url, port=port, username=username, password=password
)
logging.info("Connecting to rabbitMQ!")
rabbit_mq.connect(queue_name)
logging.info("Trying to publish on rabbitMQ")
rabbit_mq.publish(
queue_name=queue_name,
event=event,
content=content,
)
logging.info("Published to RabbitMQ!")
try:
rabbit_mq.connection.close()
except Exception as e:
Expand Down

0 comments on commit 55726aa

Please sign in to comment.