Skip to content

Commit

Permalink
Use Magento\Framework\MessageQueue to clear message queues
Browse files Browse the repository at this point in the history
  • Loading branch information
supercid committed Jun 6, 2024
1 parent f23fdcf commit f7c2eca
Showing 1 changed file with 44 additions and 46 deletions.
90 changes: 44 additions & 46 deletions Console/Command/NostoClearQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@

namespace Nosto\Tagging\Console\Command;

use Exception;
use Magento\Framework\Amqp\Config as AmqpConfig;
use Magento\Framework\App\ResourceConnection;
use Magento\Framework\Bulk\BulkManagementInterface;
use Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor;
use Nosto\NostoException;
use Magento\Framework\Exception\LocalizedException;
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
use Magento\Framework\MessageQueue\QueueRepository;
use RuntimeException;
use PhpAmqpLib\Channel\AMQPChannel;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -32,14 +28,32 @@ class NostoClearQueueCommand extends Command
public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete';

/**
* @var AmqpConfig
* @var ConsumerConfig
*/
private AmqpConfig $amqpConfig;
private $consumerConfig;

/**
* @var QueueRepository
*/
private $queueRepository;

private array $consumers = [
self::NOSTO_DELETE_MESSAGE_QUEUE,
self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE,
];

/**
* NostoClearQueueCommand constructor.
*
* @param ConsumerConfig $consumerConfig
* @param QueueRepository $queueRepository
*/
public function __construct(
AmqpConfig $amqpConfig
ConsumerConfig $consumerConfig,
QueueRepository $queueRepository
) {
$this->amqpConfig = $amqpConfig;
$this->consumerConfig = $consumerConfig;
$this->queueRepository = $queueRepository;
parent::__construct();
}

Expand All @@ -48,8 +62,7 @@ public function __construct(
*/
protected function configure()
{
// Define command name.
$this->setName('nosto:clear:messagequeue')
$this->setName('nosto:clear:message-queue')
->setDescription('Clear all message queues for Nosto product sync topics.');
parent::configure();
}
Expand All @@ -62,51 +75,36 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$io = new SymfonyStyle($input, $output);

try {
$queues = [
self::NOSTO_DELETE_MESSAGE_QUEUE,
self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE,
];

foreach ($queues as $queueName) {
$this->clearQueue($queueName);
foreach ($this->consumers as $queueName) {
$this->clearQueue($io, $queueName);
}

$io->success('Successfully cleared message queues.');
return 0;
} catch (RuntimeException $e) {
} catch (RuntimeException|LocalizedException $e) {
$io->error('An error occurred while clearing message queues: ' . $e->getMessage());
return 1;
}
return 0;
}

/**
* Clear MySql and RabbitMq queues by name.
* Clear message queues by consumer name.
*
* @param string $queueName
* @param SymfonyStyle $io
* @param string $consumerName
* @return void
* @throws LocalizedException
*/
private function clearQueue(string $queueName): void
private function clearQueue(SymfonyStyle $io, string $consumerName): void
{
// Get RabbitMq channel.
$channel = $this->amqpConfig->getChannel();

// Empty queue if queue exists.
if ($this->queueExists($channel, $queueName)) {
$channel->queue_purge($queueName);
$io->writeln(sprintf('Clearing messages from %s', $consumerName));
$io->createProgressBar();
$io->progressStart();
$consumerConfig = $this->consumerConfig->getConsumer($consumerName);
$queue = $this->queueRepository->get($consumerConfig->getConnection(), $consumerConfig->getQueue());
while ($message = $queue->dequeue()) {
$io->progressAdvance(1);
$queue->acknowledge($message);
}
}

/**
* Check the expected queue exist.
*
* @param AMQPChannel $channel
* @param string $queueName
* @return bool
*/
protected function queueExists(AMQPChannel $channel, string $queueName): bool
{
$queueInfo = $channel->queue_declare($queueName, true);

return !empty($queueInfo);
$io->progressFinish();
}
}

0 comments on commit f7c2eca

Please sign in to comment.