Skip to content

Commit

Permalink
LIMS-1354: Migrate from ActiveMQ to RabbitMQ (#826)
Browse files Browse the repository at this point in the history
* LIMS-1354: Migrate from ActiveMQ to RabbitMQ

* Unwrap try...catch

* Add routing_key and vhost

* Add polyfill for BCMath as php-bcmath extension is not installed but required by php-amqplib for AMQP
  • Loading branch information
JPHall-DLS authored Oct 1, 2024
1 parent d09aa59 commit ca7badb
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 38 deletions.
5 changes: 3 additions & 2 deletions api/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
"mpdf/mpdf": "8.1.2",
"ralouphie/getallheaders": "2.0.5",
"slim/slim": "2.6.2",
"stomp-php/stomp-php": "3.0.6",
"php-amqplib/php-amqplib": "^2.0",
"symfony/http-foundation": "^5.4",
"symfony/filesystem": "^5.4",
"mpdf/qrcode": "^1.2",
"mtcmedia/dhl-api": "dev-master#9b4b6315",
"maennchen/zipstream-php": "2.1.0"
"maennchen/zipstream-php": "2.1.0",
"phpseclib/bcmath_compat": "^2.0"
},
"autoload": {
"psr-4": {
Expand Down
16 changes: 9 additions & 7 deletions api/src/Page.php
Original file line number Diff line number Diff line change
Expand Up @@ -1109,14 +1109,16 @@ function _submit_zocalo_recipe($recipe, $parameters, $error_code = 500)
}


function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500)
function _send_zocalo_message($rabbitmq_zocalo_vhost, $zocalo_message, $error_code = 500)
{
global
$zocalo_server,
$zocalo_username,
$zocalo_password;
$rabbitmq_zocalo_host,
$rabbitmq_zocalo_port,
$rabbitmq_zocalo_username,
$rabbitmq_zocalo_password,
$rabbitmq_zocalo_routing_key;

if (empty($zocalo_server) || empty($zocalo_queue))
if (empty($rabbitmq_zocalo_host) || empty($rabbitmq_zocalo_vhost))
{
$message = 'Zocalo server or queue not specified.';
error_log($message);
Expand All @@ -1129,8 +1131,8 @@ function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500)
try
{
error_log("Sending message" . var_export($zocalo_message, true));
$queue = new Queue($zocalo_server, $zocalo_username, $zocalo_password);
$queue->send($zocalo_queue, $zocalo_message, true, $this->user->loginId);
$queue = new Queue($rabbitmq_zocalo_host, $rabbitmq_zocalo_port, $rabbitmq_zocalo_username, $rabbitmq_zocalo_password, $rabbitmq_zocalo_vhost);
$queue->send($zocalo_message, $rabbitmq_zocalo_routing_key);
}
catch (Exception $e)
{
Expand Down
4 changes: 2 additions & 2 deletions api/src/Page/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ function _add_reprocessing_sweep($args) {

function _enqueue()
{
global $zocalo_mx_reprocess_queue;
global $rabbitmq_zocalo_vhost;

if (!$this->has_arg('PROCESSINGJOBID')) $this->_error('No processing job specified');

Expand All @@ -379,7 +379,7 @@ function _enqueue()
'ispyb_process' => intval($this->arg('PROCESSINGJOBID')),
)
);
$this->_send_zocalo_message($zocalo_mx_reprocess_queue, $message);
$this->_send_zocalo_message($rabbitmq_zocalo_vhost, $message);

$this->_output(new \stdClass);
}
Expand Down
46 changes: 19 additions & 27 deletions api/src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,34 @@

namespace SynchWeb;

use Stomp\Exception\StompException;
use Stomp\Stomp;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class Queue
{
private $server, $username, $password;
private $host, $port, $username, $password, $vhost;

function __construct($server, $username, $password)
function __construct($host, $port, $username, $password, $vhost)
{
$this->server = $server;
$this->host = $host;
$this->port = $port;
$this->username = $username;
$this->password = $password;
$this->vhost = $vhost;
}

function send($queue, array $message, $persistent = false, $login = null)
function send(array $message, $routing_key)
{
try {
$connection = new Stomp($this->server);

$connection->connect($this->username, $this->password);

$connection->send(
$queue,
json_encode($message, JSON_UNESCAPED_SLASHES),
array(
'persistent' => ($persistent === true),
'synchweb.host' => gethostname(),
'synchweb.user' => $login,
)
);

$connection->disconnect();
} catch (StompException $e) {
/** @noinspection PhpUnhandledExceptionInspection */

throw $e;
}
$connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password, $this->vhost);
$channel = $connection->channel();

$msg = new AMQPMessage(
json_encode($message, JSON_UNESCAPED_SLASHES)
);

$channel->basic_publish($msg, null, $routing_key);

$channel->close();
$connection->close();
}
}

0 comments on commit ca7badb

Please sign in to comment.