Integrates php-amqplib with Zend Framework 3 and RabbitMq.
Inspired from RabbitMqBundle for Symfony 2
You can configure multiple connections in configuration:
return [
'rabbitmq_module' => [
'connection' => [
// connection name
'default' => [ // default values
'type' => 'stream', // Available: stream, socket, ssl, lazy
'host' => 'localhost',
'port' => 5672,
'username' => 'guest',
'password' => 'guest',
'vhost' => '/',
'insist' => false,
'read_write_timeout' => 2,
'keep_alive' => false,
'connection_timeout' => 3,
'heartbeat' => 0
]
]
]
]
You can find all available options here:
You can retrieve the connection from service locator:
// Getting the 'default' connection
/** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/
$connection = $serviceLocator->get('rabbitmq.connection.default');
You can configure multiple producers in configuration:
return [
'rabbitmq_module' => [
'producer' => [
'producer_name' => [
'connection' => 'default', // the connection name
'exchange' => [
'type' => 'direct',
'name' => 'exchange-name',
'durable' => true, // (default)
'auto_delete' => false, // (default)
'internal' => false, // (default)
'no_wait' => false, // (default)
'declare' => true, // (default)
'arguments' => [], // (default)
'ticket' => 0, // (default)
'exchange_binds' => [] // (default)
],
'queue' => [ // optional queue
'name' => 'queue-name' // can be an empty string,
'type' => null, // (default)
'passive' => false, // (default)
'durable' => true, // (default)
'auto_delete' => false, // (default)
'exclusive' => false, // (default)
'no_wait' => false, // (default)
'arguments' => [], // (default)
'ticket' => 0, // (default)
'routing_keys' => [] // (default)
],
'auto_setup_fabric_enabled' => true // auto-setup exchanges and queues
]
]
]
]
You can find all available options here:
You can retrieve the producer from service locator:
// Getting a producer
/** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/
/** @var \RabbitMqModule\ProducerInterface $producer **/
$producer = $serviceLocator->get('rabbitmq.producer.producer_name');
// Sending a message
$producer->publish(json_encode(['foo' => 'bar']));
You can configure multiple consumers in configuration:
return [
'rabbitmq_module' => [
'consumer' => [
'consumer_name' => [
'description' => 'Consumer description',
'connection' => 'default', // the connection name
'exchange' => [
'type' => 'direct',
'name' => 'exchange-name'
],
'queue' => [
'name' => 'queue-name' // can be an empty string,
'routing_keys' => [
// optional routing keys
]
],
'auto_setup_fabric_enabled' => true, // auto-setup exchanges and queues
'qos' => [
// optional QOS options for RabbitMQ
'prefetch_size' => 0,
'prefetch_count' => 1,
'global' => false
],
'callback' => 'my-service-name',
]
]
]
]
You can find all available options here:
The callback
key must contain one of the following:
- A
callable
: a closure or an invokable object that receive anPhpAmqpLib\Message\AMQPMessage
object. - An instance of
RabbitMqModule\\ConsumerInterface
. - A string service name in service locator (can be anything
callable
or an instance ofRabbitMqModule\\ConsumerInterface
.
Take a look on RabbitMqModule\\ConsumerInterface
class constants for available return values.
If your callback return false
than the message will be rejected and requeued.
If your callback return anything else different from false
and one of ConsumerInterface
constants, the default response is like MSG_ACK
constant.
You can retrieve the consumer from service locator:
// Getting a consumer
/** @var \Zend\ServiceManager\ServiceLocatorInterface $serviceLocator **/
/** @var \RabbitMqModule\Consumer $consumer **/
$consumer = $serviceLocator->get('rabbitmq.consumer.consumer_name');
// Start consumer
$consumer->consume();
There is a console command available to list and start consumers. See below.
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMqModule\ConsumerInterface;
class FetchProposalsConsumer implements ConsumerInterface
{
/**
* @param AMQPMessage $message
*
* @return int
*/
public function execute(AMQPMessage $message)
{
$data = json_decode($message->body, true);
try {
// do something...
} catch (\PDOException $e) {
return ConsumerInterface::MSG_REJECT_REQUEUE;
} catch (\Exception $e) {
return ConsumerInterface::MSG_REJECT;
}
return ConsumerInterface::MSG_ACK;
}
}
You can configure exchange2exchange binding in producers or consumers. Example:
return [
'rabbitmq_module' => [
'consumer' => [
'consumer_name' => [
// ...
'exchange' => [
'type' => 'fanout',
'name' => 'exchange_to_bind_to',
'exchange_binds' => [
[
'exchange' => [
'type' => 'fanout',
'name' => 'main_exchange'
],
'routing_keys' => [
'#'
]
]
]
],
]
]
]
]
There are some console commands available:
rabbitmq-module setup-fabric
: Setup fabric for each service, declaring exchanges and queuesrabbitmq-module list consumers
: List available consumersrabbitmq-module consumer <name> [--without-signals|-w]
: Start a consumer by namerabbitmq-module rpc_server <name> [--without-signals|-w]
: Start a rpc server by namerabbitmq-module stdin-producer <name> [--route=] <msg>
: Send a message with a producer