Requirements • Set up the Project • PostgresSQL • RabbitMQ
The project runs with Python 3.12.
The recommended way to install Python is using pyenv if you are on Linux or MacOS. Here is a summary of the steps, but it's recommended to visit the documentation for more details.
Install Python with pyenv
-
Install pyenv:
curl https://pyenv.run | bash
-
Set you bash profile to load pyenv. In my case I use fish:
set -Ux PYENV_ROOT $HOME/.pyenv fish_add_path $PYENV_ROOT/bin
Then, add the following line to
~/.config/fish/config.fish
:echo pyenv init - | source >> ~/.config/fish/config.fish
-
Install the selected Python version (you can see available version with
pyenv install --list
):pyenv install 3.12
-
Go to your project folder and select this Python version for the folder
pyenv local 3.12
After installing pyenv you only need to install the package manager, in this case I prefer to use pdm. Just need to run the following command on your project folder:
pip install pdm
To install directly all dependencies, run:
make install
In order to set up the project, you need to follow the steps below:
- Clone the repository on you local machine
git clone <repo_url>
- Run the
make local-setup
command to be able to run the hooks inside hooks folder.
Note
If you want to ignore the hooks folder, you can remove it and just run make install
command.
- Run infra containers declared in the docker-compose.yml file:
docker-compose up -d
- Run the tests to check if everything is working:
make test
@TODO
Python tutorial
Producers
publish messages toexchanges
.- The
exchanges
takes those messages and route them toqueues
. Exchanges
distribute the messages to thequeues
based on thebindings
.- The
consumers
are subscribed toqueues
and consume the messages from them.
They are responsible for getting
producers
messages and routing them to thequeues
.
Exchanges can be configured with different attributes:
exchange
: The name of the exchange. If not set, a random exchange name will be generated.durable
: If set toTrue
the exchange will survive server restarts, otherwise it will be deleted.auto_doelete
: If set toTrue
the exchange will be deleted when no queues are bound to it.exchange_type
: The type of the exchange. The default isdirect
, but there are other types likefanout
,topic
.'direct'
: The message is routed to the queues whose binding key exactly matches the routing key of the message.'fanout'
: The message is routed to all the queues bound to the exchange. Here routing key is ignored.'topic'
: The message is routed to the queues whose binding key matches the routing key of the message.
To create a new exchange we need to run the following command:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange="videos",
exchange_type="topic"
)
They store messages until they are consumed by the
consumers
.
Queues can be configured with different attributes:
queue
: The name of the queue. If not set, a random queue name will be generated.durable
: If set toTrue
the queue will survive server restarts, otherwise it will be deleted.exclusive
: If set toTrue
the queue will be used by only one connection and will be deleted when the connection closes.auto_delete
: If set toTrue
the queue will be deleted when no consumers are connected to it.
To create a new queue we need to run the following command:
channel.queue_declare(
queue="users.send_email_on_video_created",
durable=True,
exclusive=True
)
Creating queues are idempotent operations, so we can run the same command multiple times without any side effects. If we don't know who will create the queue first, we can create it in the
producer
andconsumer
code.
To let the exchange
know where to send the messages we need to create a binding
between the exchange
and the queues
.
channel.queue_bind(
exchange="videos",
queue="users.send_email_on_video_created",
routing_key="videos.created"
)
The queue
will receive the messages when its routing_key
matches the binding_key
of the exchange
.
They are the services that publish messages to the
exchanges
.
Producers are intended to be long-lived and open their connections on startup.
To publish an event we need to create an exchange
, we can't send a message directly to a queue
.
-
Declare the
exchange
they want to publish the message to (same steps as in the exchanges section):channel.exchange_declare( exchange="videos", exchange_type="topic" )
-
Publish the message specifying the
exchange
(name) and therouting_key
arguments if it's declared of typetopic
ordirect
. This routing key should have the same name as thebinding_key
of thequeue
that will receive the message.channel.basic_publish( exchange="videos", routing_key="videos.created", body="Video Created!" )
If we want to ensure that the event survives a server restart, we need to set the
delivery_mode
toPersistent
:import pika channel.basic_publish( exchange="videos", routing_key="videos.created", body="Video Created!", properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent ) )
They are the services that consume the messages from the
queues
.
Consumers are intended to be long-lived and open their connections on startup. We will say that a consumer is subscribed to a queue when it starts consuming messages from it.
All consumers need to:
-
Define the
queue
they want to consume messages from. Additionally, they can define theexchange
the queue will be subscribed to. As creating aqueue
, this is an idempotent operation, so we will create just one exchange.channel.exchange_declare( exchange="videos", exchange_type="topic" ) channel.queue_declare( queue="users.send_email_on_video_created", durable=True, exclusive=True )
-
Bind that
queue
to theexchange
with therouting_key
.channel.queue_bind( exchange="videos", queue="users.send_email_on_video_created", routing_key="videos.created" )
-
Define a callback function that will be called when a message is received. This function will be responsible for processing the message.
from pika.channel import Channel from pika.spec import BasicProperties, Basic def callback(channel: Channel, method: Basic.Deliver, properties: BasicProperties, body: bytes): print(f"[x] Received {method.routing_key}: {body.decode()}")
To ensure that the message is not lost if the consumer crashes it's recommended to add a manual message acknowledgment in the callback:
from pika.channel import Channel from pika.spec import BasicProperties, Basic def callback(channel: Channel, method: Basic.Deliver, properties: BasicProperties, body: bytes): print(f"[x] Received {method.routing_key}: {body.decode()}") channel.basic_ack(delivery_tag=method.delivery_tag)
-
Start consuming messages by subscribing to the queue.
channel.basic_consume( queue="users.send_email_on_video_created", on_message_callback=callback, auto_ack=False # Set to True if you want to automatically acknowledge the message ) channel.start_consuming()
When consuming, we can configure the
queue
to not send a new message to the consumer until it has processed and acknowledged the previous one. This is called fair dispatch and can be set as follows:channel.basic_qos(prefetch_count=1)