This PHP SDK aims for easy integration of message queues in various developments such as microservices. The use of this SDK allows only the publishing and fetching for messages, you cannot create, delete, purge queue or any other action.
- Amazon SQS v.2
- AMQP v.0.9.1
- Batch sending, receiving and deletion not supported
- Message attribute binary type not supported
- Publishing to exchange not supported
See the CHANGE LOG for version release information.
Then run the command:
$ composer require eexit/php-mq-sdk:~1.0
See the examples
directory content.
Example with a PSR-3 logger such as Monolog:
<?php
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
$handler = new StreamHandler(__DIR__ . '/sandbox.log', Logger::INFO);
$logger = new Logger('Sandbox');
$logger->pushHandler($handler);
/** \Eexit\Mq\MessageQueue $mq */
$mq->setLogger($logger);
Example of log with the INFO
level:
[2015-07-08 13:33:57] Sandbox.INFO: Open a connection [] []
[2015-07-08 13:33:59] Sandbox.INFO: Published message abb12d0a-97c3-4dcd-a45f-8be097bbe6bf in 1.6550381183624 ms [] []
[2015-07-08 13:33:59] Sandbox.INFO: Start listening to on incoming messages [] []
[2015-07-08 13:33:59] Sandbox.INFO: Fetched message 5c13c13e-86e5-4100-8e50-5168a0bd9608 in 0.15714406967163 ms [] []
[2015-07-08 13:33:59] Sandbox.INFO: Acked message 5c13c13e-86e5-4100-8e50-5168a0bd9608 in 0.13068604469299 ms [] []
[2015-07-08 13:33:59] Sandbox.INFO: Close the connection [] []
If you use the DEBUG
level, you'll get way more information such as the message content and error stack traces.
The SDK supports Unix signal handling (via PCNTL extension) in order to gracefully shutdown your processes:
<?php
// MQ signal handler:
$signalHandler = function ($signal) {
switch ($signal) {
case SIGINT:
case SIGQUIT:
case SIGTERM:
$this->mq->stop();
\pcntl_signal($signal, SIG_DFL); // Restores original signal handler
break;
}
};
// If the extension is loaded, registers the signal handlers
if (extension_loaded('pcntl')) {
\pcntl_signal(SIGINT, $signalHandler);
\pcntl_signal(SIGQUIT, $signalHandler);
\pcntl_signal(SIGTERM, $signalHandler);
}
/*
MQ bootstrap...
*/
$mq->listen($queue, function(EnvelopeInterface $message, MessageQueue $mq) {
// The process can be stop from inside
return $mq->stop();
throw new WillNeverBeThrown();
});
// Closes the connections/gathers log & metrics accordingly!
$mq->close();
There is a working example of signal handling for AMQP.
This library use the Collector interface of beberlei/metrics library. This allows you to use any of the supported metric backends.
Here's an example with StatsD:
<?php
use Eexit\Mq\Adapter\Sqs\Sqs;
use Beberlei\Metrics\Collector\StatsD;
$collector = new StatsD(/* backend host */);
// Adds the collector and a prefix to avoid metric naming conflicts
// You can use the adapter prefix if you want
/** \Eexit\Mq\MessageQueue $mq */
$mq->setMetricCollector($collector, Sqs::METRIC_PREFIX);
// In your worker business code you can add other metrics
// Note: the metric prefix is only used internally. You may use you own prefix here
$mq->getMetricCollector()->increment('my_app.my_metric.succeed');
Description | Metric name |
---|---|
Connection open success count | {prefix}.connection.open.succeed |
Connection open duration | {prefix}.connection.open_time |
Connection open failure count | {prefix}.connection.open.failed |
Connection stop success count | {prefix}.connection.stop.succeed |
Connection stop duration | {prefix}.connection.stop_time |
Connection stop failure count | {prefix}.connection.stop.failed |
Connection close success count | {prefix}.connection.close.succeed |
Connection close duration | {prefix}.connection.close_time |
Connection close failure count | {prefix}.connection.close.failed |
Message publication success count | {prefix}.message.publish.succeed |
Message publication duration | {prefix}.message.publish_time |
Message publication failure count | {prefix}.message.publish.failed |
Message fetch success count | {prefix}.message.fetch.succeed |
Message fetch duration | {prefix}.message.fetch_time |
Message listen failure count | {prefix}.message.listen.failed |
Message ack success count | {prefix}.message.ack.succeed |
Message ack duration | {prefix}.message.ack_time |
Message ack failure count | {prefix}.message.ack.failed |
Message nack success count | {prefix}.message.nack.succeed |
Message nack duration | {prefix}.message.nack_time |
Message nack failure count | {prefix}.message.nack.failed |
Message processing duration | {prefix}.message.process_time |
For example, if you use the SQS adapter and use the Sqs::METRIC_PREFIX
prefix, your metrics will look like this:
mq.sqs.connection.open_time
mq.sqs.message.publish.succeed
mq.sqs.message.publish_time