Skip to content

Commit

Permalink
Tidying code, clearing only rabbitmq queues
Browse files Browse the repository at this point in the history
  • Loading branch information
ugljesaspx committed Jun 5, 2024
1 parent 378dc83 commit d2f8f9a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 127 deletions.
163 changes: 37 additions & 126 deletions Console/Command/NostoClearQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
use Exception;

Check warning on line 5 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Possibly zero references to use statement for classlike/namespace Exception (\Exception)
use Magento\Framework\Amqp\Config;
use Magento\Framework\App\ResourceConnection;

Check warning on line 7 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Possibly zero references to use statement for classlike/namespace ResourceConnection (\Magento\Framework\App\ResourceConnection)
use Magento\Framework\Bulk\BulkManagementInterface;

Check warning on line 8 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Possibly zero references to use statement for classlike/namespace BulkManagementInterface (\Magento\Framework\Bulk\BulkManagementInterface)
use Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor;

Check warning on line 9 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Possibly zero references to use statement for classlike/namespace Interceptor (\Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor)
use Nosto\NostoException;

Check warning on line 10 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Possibly zero references to use statement for classlike/namespace NostoException (\Nosto\NostoException)
use RuntimeException;
use PhpAmqpLib\Channel\AMQPChannel;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -15,52 +18,62 @@
class NostoClearQueueCommand extends Command
{
/**
* Nosto Queues
* Nosto Product Sync Update label.
*
* @var string
*/
private const QUEUE_TOPICS = [
'nosto_product_sync.update',
'nosto_product_sync.delete'
];
public const NOSTO_UPDATE_SYNC_MESSAGE_QUEUE = 'nosto_product_sync.update';

/**
* @var Config
* Nosto Product Sync Delete label.
*
* @var string
*/
private Config $amqpConfig;
public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete';

/**
* @var ResourceConnection
* @var Config
*/
private ResourceConnection $resourceConnection;
private Config $amqpConfig;

Check warning on line 37 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Property \Nosto\Tagging\Console\Command\NostoClearQueueCommand->amqpConfig has undeclared type \Magento\Framework\Amqp\Config (Did you mean class \Nosto\Tagging\Block\Adminhtml\Account\Config)

public function __construct(
ResourceConnection $resourceConnection,
Config $amqpConfig

Check warning on line 40 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Parameter $amqpConfig has undeclared type \Magento\Framework\Amqp\Config (Did you mean class \Nosto\Tagging\Block\Adminhtml\Account\Config)
) {
$this->resourceConnection = $resourceConnection;
$this->amqpConfig = $amqpConfig;
parent::__construct();
}

/**
* Configure the command and the arguments
*/
protected function configure()
{
// Define command name.
$this->setName('nosto:clear:queue')
$this->setName('nosto:clear:messagequeue')
->setDescription('Clear all message queues for Nosto product sync topics.');
parent::configure();
}

/**
* @inheritDoc
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);

try {
foreach (self::QUEUE_TOPICS as $topicName) {
$this->clearQueue($topicName, $io);
$queues = [
self::NOSTO_DELETE_MESSAGE_QUEUE,
self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE,
];

foreach ($queues as $queueName) {
$this->clearQueue($queueName, $io);

Check notice on line 71 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Call with 2 arg(s) to \Nosto\Tagging\Console\Command\NostoClearQueueCommand::clearQueue(string $queueName) which only takes 1 arg(s) defined at Console/Command/NostoClearQueueCommand.php:88
}

$io->success('Successfully cleared message queues.');
return 0;
} catch (NostoException $e) {
} catch (RuntimeException $e) {
$io->error('An error occurred while clearing message queues: ' . $e->getMessage());
return 1;
}
Expand All @@ -69,130 +82,28 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/**
* Clear MySql and RabbitMq queues by name.
*
* @param string $topicName
* @param SymfonyStyle $io
* @return void
*/
private function clearQueue(string $topicName, SymfonyStyle $io): void
{
$this->clearRabbitMQQueue($topicName, $io);
$this->clearDBQueues($topicName, $io);
}

/**
* Clear DB.
*
* @param string $topicName
* @param SymfonyStyle $io
* @return void
*/
private function clearDBQueues(string $topicName, SymfonyStyle $io): void
{
// Get connection.
$connection = $this->resourceConnection->getConnection();

// Start DB transaction.
$connection->beginTransaction();
try {
// Emptying DB tables.
$this->clearQueueMessages($topicName, $connection);
$this->clearRelatedRecords($topicName, $connection);
$connection->commit();
} catch (Exception $exception) {
$connection->rollBack();
$io->error('An error occurred while clearing DB queues for topic '
. $topicName . ': '
. $exception->getMessage()
);
}
}

/**
* Emptying queue message tables.
*
* @param string $topicName
* @param $connection
* @return void
*/
private function clearQueueMessages(string $topicName, $connection): void
{
$queueMessageTable = $this->resourceConnection->getTableName('queue_message');
$queueMessageStatusTable = $this->resourceConnection->getTableName('queue_message_status');

// Get all IDs from "queue_message" table.
$select = $connection->select()
->from($queueMessageTable, ['id'])
->where('topic_name = ?', $topicName);
$messageIds = $connection->fetchCol($select);

// Delete related records from "queue_message_status" table.
if (!empty($messageIds)) {
$connection->delete($queueMessageStatusTable, ['message_id IN (?)' => $messageIds]);
}

// Delete records from "queue_message" table.
$connection->delete($queueMessageTable, ['topic_name = ?' => $topicName]);
}

/**
* Emptying related tables.
*
* @param string $topicName
* @param $connection
* @return void
*/
private function clearRelatedRecords(string $topicName, $connection): void
{
$magentoOperationTable = $this->resourceConnection->getTableName('magento_operation');
$magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk');

// Get all IDs from "magento_operation" table.
$selectBulkUuids = $connection->select()
->from($magentoOperationTable, ['bulk_uuid'])
->where('topic_name = ?', $topicName);
$bulkUuids = $connection->fetchCol($selectBulkUuids);

// Delete related records from "magento_bulk" table.
if (!empty($bulkUuids)) {
$connection->delete($magentoBulkTable, ['uuid IN (?)' => $bulkUuids]);
}

// Delete records from "magento_operation" table.
$connection->delete($magentoOperationTable, ['topic_name = ?' => $topicName]);
}

/**
* Clear RabbitMq Queues by name.
*
* @param string $queueName
* @param SymfonyStyle $io
* @return void
*/
private function clearRabbitMQQueue(string $queueName, SymfonyStyle $io): void
private function clearQueue(string $queueName): void
{
try {
// Get RabbitMq channel.
$channel = $this->amqpConfig->getChannel();
// Get RabbitMq channel.
$channel = $this->amqpConfig->getChannel();

Check failure on line 91 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Call to method getChannel from undeclared class \Magento\Framework\Amqp\Config (Did you mean class \Nosto\Tagging\Block\Adminhtml\Account\Config)

// Empty queue if queue exists.
if ($this->queueExists($channel, $queueName)) {
$channel->queue_purge($queueName);
}
} catch (Exception $e) {
// Log the error or handle it as required.
$io->error('An error occurred while clearing RabbitMQ queue ' . $queueName . ': ' . $e->getMessage());
throw new RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage());
// Empty queue if queue exists.
if ($this->queueExists($channel, $queueName)) {
$channel->queue_purge($queueName);
}
}

/**
* Check queue exist.
* Check the expected queue exist.
*
* @param $channel
* @param AMQPChannel $channel
* @param string $queueName
* @return bool
*/
protected function queueExists($channel, string $queueName): bool
protected function queueExists(AMQPChannel $channel, string $queueName): bool

Check warning on line 106 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Parameter $channel has undeclared type \PhpAmqpLib\Channel\AMQPChannel (Did you mean class \AMQPChannel)
{
$queueInfo = $channel->queue_declare($queueName, true);

Check failure on line 108 in Console/Command/NostoClearQueueCommand.php

View workflow job for this annotation

GitHub Actions / Phan Analysis

Call to method queue_declare from undeclared class \PhpAmqpLib\Channel\AMQPChannel (Did you mean class \AMQPChannel)

Expand Down
1 change: 0 additions & 1 deletion etc/di.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@
</type>
<type name="Nosto\Tagging\Console\Command\NostoClearQueueCommand">
<arguments>
<argument name="resourceConnection" xsi:type="object">Magento\Framework\App\ResourceConnection</argument>
<argument name="amqpConfig" xsi:type="object">Magento\Framework\Amqp\Config</argument>
</arguments>
</type>
Expand Down

0 comments on commit d2f8f9a

Please sign in to comment.