diff --git a/src/StompClient.php b/src/StompClient.php index 907086f..7e5da1e 100644 --- a/src/StompClient.php +++ b/src/StompClient.php @@ -196,7 +196,10 @@ public function nack($frame) /** * @param array $queues Массив очередей, к которым нужно подписаться + * * @return void + * + * @throws Exception */ public function subscribe($queues) { @@ -209,7 +212,10 @@ public function subscribe($queues) try { foreach ($this->queues as $queue) { $stomp = $this->getStomp(); - if (!$stomp->subscribe($queue, ['id' => $stomp->getSessionId()])) { + + $sessionId = $stomp->getSessionId() . '_' . $queue; + + if (!$stomp->subscribe($queue, ['id' => $sessionId])) { throw new Exception('Queue: ' . $queue); } } @@ -231,6 +237,8 @@ public function subscribe($queues) * Отписываемся от всех очередей * * @return void + * + * @throws Exception */ public function unsubscribe() { @@ -241,7 +249,10 @@ public function unsubscribe() try { foreach ($this->queues as $queue) { $stomp = $this->getStomp(); - if (!$stomp->unsubscribe($queue, ['id' => $stomp->getSessionId()])) { + + $sessionId = $stomp->getSessionId() . '_' . $queue; + + if (!$stomp->unsubscribe($queue, ['id' => $sessionId])) { throw new Exception('Queue: ' . $queue); } } @@ -264,6 +275,8 @@ public function unsubscribe() * В случае необходимости устанавливает коннект * * @return null|Stomp + * + * @throws StompClientException */ private function getStomp() { @@ -286,6 +299,8 @@ private function getStomp() * Возвращает коннект первого доступного брокера. * * @return Stomp + * + * @throws StompClientException */ private function initStomp() { @@ -312,10 +327,13 @@ private function initStomp() /** * Подключение к брокеру по ссылке. * - * @param string $url - * @param string $login - * @param string $pw + * @param string $url + * @param string $login + * @param string $pw + * * @return Stomp|null + * + * @throws Exception */ private function connect($url, $login, $pw) {