Skip to content

Commit

Permalink
Merge pull request #7 from daron0ne/master
Browse files Browse the repository at this point in the history
Виртуальный хост как параметр
  • Loading branch information
Serganbus authored May 13, 2019
2 parents e669fd8 + 976c1fc commit 781c379
Showing 1 changed file with 77 additions and 41 deletions.
118 changes: 77 additions & 41 deletions src/StompClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,35 @@

/**
* @author Sergey Ivanov(ivanov@tochka.com)
*
* @property Stomp|null stomp
* @property string login
* @property array hosts
* @property array queues
* @property string pw
* @property string|null vhost
*/
class StompClient
{
use Traits\Loggable;

/**
* @var Stomp
* @var Stomp|null
*/
private $stomp;

/** @var array $hosts */
private $hosts;

/** @var string $login */
private $login;

/** @var string $pw */
private $pw;

/** @var string|null $vhost */
private $vhost;

/**
* @var array
*/
Expand All @@ -38,12 +53,19 @@ class StompClient
* - Use only one broker uri: tcp://localhost:61614
* - use failover in given order: failover://(tcp://localhost:61614,ssl://localhost:61612).
*
* @param string $connectionString hosts url
* @param string $login login
* @param string $pw password
* @param string $connectionString hosts url
* @param string $login login
* @param string $pw password
* @param string|null $vhost
*
* @throws StompClientException
*/
public function __construct(string $connectionString, string $login, string $pw)
public function __construct(
string $connectionString,
string $login,
string $pw,
string $vhost = null
)
{
$hosts = [];

Expand All @@ -69,6 +91,7 @@ public function __construct(string $connectionString, string $login, string $pw)
$this->hosts = $hosts;
$this->login = $login;
$this->pw = $pw;
$this->vhost = $vhost;
}

public function __destruct()
Expand All @@ -78,21 +101,23 @@ public function __destruct()
$this->pw = null;
$this->stomp = null;
$this->queues = [];
$this->vhost = null;
}

/**
* Опубликовать сообщение в очереди
*
* @param string $destination Очередь, куда класть сообщение
* @param string $body Тело сообщения
* @param array $headers
* @param string $transactionId ID транзакции
* @param array $headers
*
* @return boolean
*/
public function send(
string $destination,
string $body,
array $headers = []) {
array $headers = [])
{

$result = true;
try {
Expand All @@ -103,7 +128,7 @@ public function send(
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::send failed.', [
'Message' => $e->getMessage(),
'headers' => implode(', ', $headers)
'headers' => implode(', ', $headers),
]);
}

Expand All @@ -127,9 +152,9 @@ public function getNextFrame()
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::getNextFrame failed', [
'Message' => $ex->getMessage(),
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine()
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
]);

throw $ex;
Expand All @@ -155,10 +180,10 @@ public function ack($frame)
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::ack failed', [
'Message' => $ex->getMessage(),
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'frame' => $frame
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'frame' => $frame,
]);

return false;
Expand All @@ -184,10 +209,10 @@ public function nack($frame)
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::nack failed', [
'Message' => $ex->getMessage(),
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'frame' => $frame
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'frame' => $frame,
]);

return false;
Expand All @@ -204,7 +229,7 @@ public function nack($frame)
public function subscribe($queues)
{
if (is_string($queues)) {
$this->queues = [ $queues ];
$this->queues = [$queues];
} else {
$this->queues = $queues;
}
Expand All @@ -223,10 +248,10 @@ public function subscribe($queues)
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::subscribe failed', [
'Message' => $ex->getMessage(),
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'queues' => $this->queues
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'queues' => $this->queues,
]);

throw $ex;
Expand Down Expand Up @@ -260,10 +285,10 @@ public function unsubscribe()
// Логирование в случае, если установлен логгер
$this->putInLog(LogLevel::ERROR, 'Stomp::unsubscribe failed', [
'Message' => $ex->getMessage(),
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'queues' => $this->queues
'Code' => $ex->getCode(),
'File' => $ex->getFile(),
'Line' => $ex->getLine(),
'queues' => $this->queues,
]);

throw $ex;
Expand Down Expand Up @@ -306,10 +331,11 @@ private function initStomp()
{
$errors = [];
$i = 0;
/** @var string $host */
foreach ($this->hosts as $host) {
$i++;
try {
$connect = $this->connect($host, $this->login, $this->pw);
$connect = $this->connect($host, $this->login, $this->pw, $this->vhost);
$this->putInLog(LogLevel::WARNING, 'Stomp::initStomp connected', [
'host' => $host,
]);
Expand All @@ -318,7 +344,7 @@ private function initStomp()
} catch (Exception $ex) {
$errors[] = $ex->getMessage();
if ($i === count($this->hosts)) {
throw new StompClientException("Cannot connect to: " . implode(', ',$this->hosts) . '.Errors: ' . implode(', ',$errors));
throw new StompClientException("Cannot connect to: " . implode(', ', $this->hosts) . '.Errors: ' . implode(', ', $errors));
}
}
}
Expand All @@ -327,22 +353,32 @@ private function initStomp()
/**
* Подключение к брокеру по ссылке.
*
* @param string $url
* @param string $login
* @param string $pw
* @param string $url
* @param string $login
* @param string $pw
* @param string|null $vhost
*
* @return Stomp|null
*
* @throws Exception
*/
private function connect($url, $login, $pw)
private function connect(
string $url,
string $login,
string $pw,
string $vhost = null
)
{
$headers = [
'accept-version' => '1.2',
'RECEIPT' => true,
];

if ($vhost) {
$headers['host'] = $vhost;
}

try {
return new Stomp($url, $login, $pw, [
'accept-version' => '1.2',
'RECEIPT' => true,
'host' => $login
]);
return new Stomp($url, $login, $pw, $headers);
} catch (StompException $e) {
throw $e;
}
Expand Down

0 comments on commit 781c379

Please sign in to comment.