A RabbitMQ queue driver for Laravel featuring native Laravel Queue API integration with advanced message queuing capabilities.
- PHP 8.2 or higher
- Laravel 11.x or 12.x
- RabbitMQ Server 3.8+
- ext-amqp PHP extension
- ext-pcntl PHP extension (for parallel processing)
composer require iamfarhad/laravel-rabbitmq
php artisan vendor:publish --provider="iamfarhad\LaravelRabbitMQ\LaravelRabbitQueueServiceProvider" --tag="config"
Register the service provider in bootstrap/app.php
:
$app->register(iamfarhad\LaravelRabbitMQ\LaravelRabbitQueueServiceProvider::class);
Add the RabbitMQ connection to config/queue.php
:
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'hosts' => [
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'lazy' => env('RABBITMQ_LAZY_CONNECTION', true),
'keepalive' => env('RABBITMQ_KEEPALIVE_CONNECTION', false),
'heartbeat' => env('RABBITMQ_HEARTBEAT_CONNECTION', 0),
'secure' => env('RABBITMQ_SECURE', false),
],
'options' => [
'ssl_options' => [
'cafile' => env('RABBITMQ_SSL_CAFILE'),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT'),
'local_key' => env('RABBITMQ_SSL_LOCALKEY'),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE'),
],
'queue' => [
'job' => \iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob::class,
'qos' => [
'prefetch_size' => 0,
'prefetch_count' => 10,
'global' => false
]
],
],
],
]
Add to your .env
file:
QUEUE_CONNECTION=rabbitmq
# RabbitMQ Connection
RABBITMQ_HOST=127.0.0.1
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
RABBITMQ_QUEUE=default
# Connection Options
RABBITMQ_LAZY_CONNECTION=true
RABBITMQ_KEEPALIVE_CONNECTION=false
RABBITMQ_HEARTBEAT_CONNECTION=0
# SSL/TLS Configuration (Optional)
RABBITMQ_SECURE=false
#RABBITMQ_SSL_CAFILE=/path/to/ca.pem
#RABBITMQ_SSL_LOCALCERT=/path/to/cert.pem
#RABBITMQ_SSL_LOCALKEY=/path/to/key.pem
#RABBITMQ_SSL_VERIFY_PEER=true
// Dispatch to default queue
dispatch(new ProcessPodcast($podcast));
// Dispatch to specific queue
dispatch(new ProcessPodcast($podcast))->onQueue('podcasts');
// Delayed dispatch
dispatch(new ProcessPodcast($podcast))->delay(now()->addMinutes(10));
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
public Podcast $podcast
) {}
public function handle(): void
{
// Process the podcast...
}
}
php artisan queue:work rabbitmq --queue=default
php artisan rabbitmq:consume --queue=default
Option | Description | Default |
---|---|---|
--queue |
Queue names to process (comma separated) | default |
--once |
Process single job and exit | false |
--stop-when-empty |
Stop when queue is empty | false |
--delay |
Delay for failed jobs (seconds) | 0 |
--memory |
Memory limit in MB | 128 |
--timeout |
Job timeout in seconds | 60 |
--tries |
Number of attempts | 1 |
--sleep |
Sleep when no jobs available | 3 |
--num-processes |
Number of parallel processes | 2 |
--max-jobs |
Maximum jobs before restart | 0 |
--max-time |
Maximum runtime in seconds | 0 |
Enable priority-based processing:
// In config/queue.php
'options' => [
'queue' => [
'arguments' => [
'x-max-priority' => 10
]
]
]
// Dispatch with priority
dispatch(new UrgentJob($data))->onQueue('priority')->withPriority(8);
Configure message prefetching:
'options' => [
'queue' => [
'qos' => [
'prefetch_size' => 0,
'prefetch_count' => 10,
'global' => false
]
]
]
Enable secure connections:
'hosts' => [
'secure' => true,
// other host config...
],
'options' => [
'ssl_options' => [
'cafile' => '/path/to/ca.pem',
'local_cert' => '/path/to/cert.pem',
'local_key' => '/path/to/key.pem',
'verify_peer' => true,
],
]
Configure retry behavior in your job:
class ProcessPayment implements ShouldQueue
{
public $tries = 3;
public $maxExceptions = 2;
public $backoff = [1, 5, 10];
public function retryUntil(): DateTime
{
return now()->addMinutes(15);
}
public function failed(Throwable $exception): void
{
// Handle failure
}
}
use iamfarhad\LaravelRabbitMQ\Facades\RabbitMQ;
// Check if queue exists
if (RabbitMQ::queueExists('my-queue')) {
// Queue exists
}
// Declare a new queue
RabbitMQ::declareQueue('my-queue', $durable = true);
// Purge queue messages
RabbitMQ::purgeQueue('my-queue');
// Delete queue
RabbitMQ::deleteQueue('my-queue');
[program:laravel-rabbitmq]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/artisan rabbitmq:consume --queue=default --memory=256
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=4
redirect_stderr=true
stdout_logfile=/path/to/logs/rabbitmq.log
stopwaitsecs=3600
version: '3.8'
services:
app:
build: .
environment:
RABBITMQ_HOST: rabbitmq
RABBITMQ_PORT: 5672
RABBITMQ_USER: laravel
RABBITMQ_PASSWORD: secret
depends_on:
- rabbitmq
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: laravel
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
# Run all tests
composer test
# Run specific test suites
composer test:unit
composer test:feature
# Code formatting
composer format
composer format-test
Set up RabbitMQ for testing:
docker run -d --name rabbitmq-test \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
Ensure RabbitMQ is running and accessible:
# Check RabbitMQ status
docker ps | grep rabbitmq
# Test connection
telnet localhost 5672
Verify user permissions in RabbitMQ:
# Access RabbitMQ management UI
# http://localhost:15672
# Default: guest/guest
Adjust consumer memory limits:
php artisan rabbitmq:consume --memory=512
- RabbitQueue: Core queue implementation
- Consumer: Message consumer with daemon support
- RabbitMQJob: Job representation with RabbitMQ features
- RabbitMQConnector: Connection management
- ConsumeCommand: Artisan command for consumption
- RabbitQueueInterface: Extended queue contract
- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature
) - Commit changes (
git commit -m 'Add amazing feature'
) - Push to branch (
git push origin feature/amazing-feature
) - Open a Pull Request
- Follow PSR-12 coding standards
- Write tests for new features
- Update documentation
- Use conventional commits
This package is open-sourced software licensed under the MIT license.