composer require 'arus/amqp-bridge'
declare(strict_types=1);
namespace App\QueueMessageHandler;
use Arus\AMQP\Bridge\PayloadDecoder\JsonDecoder;
use Arus\AMQP\Bridge\MessageHandlerInterface;
use Arus\AMQP\Bridge\MessageInterface;
use const JSON_OBJECT_AS_ARRAY;
/**
* @JsonSchemaReference("config/json-schemas/SomeQueueMessage.json")
*/
final class SomeQueueMessageHandler implements MessageHandlerInterface
{
/**
* {@inheritDoc}
*/
public function handle(MessageInterface $message) : void
{
$data = (new JsonDecoder)->decode($message, JSON_OBJECT_AS_ARRAY);
// some code...
}
}
use App\QueueMessageHandler\SomeQueueMessageHandler;
use Arus\AMQP\Bridge\Consumer;
$connection = new AMQPConnection();
$connection->setHost('localhost');
$connection->setPort(5672);
$connection->setVhost('/');
$connection->setLogin('guest');
$connection->setPassword('guest');
$connection->connect();
$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(100);
$queue = new AMQPQueue($channel);
$queue->setName('queue.name');
// init the message queue consumer...
$consumer = new Consumer(new SomeQueueMessageHandler());
// [optional] set a logger based on PSR-3...
$consumer->setLogger($logger);
// [optional] set a custom payload validator...
$consumer->setPayloadValidator($payloadValidator);
// [optional] set a custom annotation reader...
$consumer->setAnnotationReader($annotationReader);
// [optional] use a JSON schema validator for queue messages...
$consumer->useJsonSchemaValidator();
// [optional] set a callback that will be called when a queue message is received...
$consumer->setMessageReceivedCallback(function ($message) {
// here you can, for example, re-open doctrine entity managers...
});
// [optional] set a callback that will be called when a queue message is handled...
$consumer->setMessageHandledCallback(function ($message) {
// here you can, for example, clear doctrine entity managers...
});
try {
$queue->consume($consumer);
} catch (Throwable $e) {
$connection->disconnect();
throw $e;
}
- If a queue message was handled without errors, such a message will be automatically acknowledged;
- If a queue message contains undecodable or invalid payload, such a message will be automatically rejected;
- If a queue message was handled with an unexpected error, such a message will be automatically requeued;
- If you need to reject a queue message in code, just throw an exception
Arus\AMQP\Bridge\Exception\UnacknowledgableQueueMessageExceptionInterface
.
composer test