diff --git a/src/Thruway/Event/RouterStopEvent.php b/src/Thruway/Event/RouterStopEvent.php new file mode 100644 index 00000000..de58c273 --- /dev/null +++ b/src/Thruway/Event/RouterStopEvent.php @@ -0,0 +1,9 @@ +getEventDispatcher()->dispatch('router.stop', new RouterStopEvent()); + } + /** * Handle close transport * diff --git a/src/Thruway/Peer/RouterInterface.php b/src/Thruway/Peer/RouterInterface.php index d262ece9..dbc9a460 100644 --- a/src/Thruway/Peer/RouterInterface.php +++ b/src/Thruway/Peer/RouterInterface.php @@ -86,4 +86,17 @@ public function addInternalClient(ClientInterface $client); * @return Session */ public function createNewSession(TransportInterface $transport); + + /** + * This is to stop the router. + * + * Note that this should bring down all connections and timers associated with the router + * which will cause the loop to exit once there is nothing being watched. + * + * If there are other things added to the loop through clients or otherwise, the loop + * will continue running. + * + * @param bool $gracefully + */ + public function stop($gracefully = true); } diff --git a/src/Thruway/Transport/InternalClientTransportProvider.php b/src/Thruway/Transport/InternalClientTransportProvider.php index 7c88117a..bd1ff8ab 100644 --- a/src/Thruway/Transport/InternalClientTransportProvider.php +++ b/src/Thruway/Transport/InternalClientTransportProvider.php @@ -5,6 +5,7 @@ use Thruway\Event\ConnectionOpenEvent; use Thruway\Event\RouterStartEvent; +use Thruway\Event\RouterStopEvent; use Thruway\Peer\ClientInterface; use Thruway\Session; @@ -20,6 +21,11 @@ class InternalClientTransportProvider extends AbstractRouterTransportProvider */ private $internalClient; + /** + * @var Session + */ + private $session; + /** * Constructor * @@ -50,6 +56,7 @@ public function handleRouterStart(RouterStartEvent $event) { $transport->setTrusted($this->trusted); $session = $this->router->createNewSession($transport); + $this->session = $session; // connect the transport to the Router/Peer $this->router->getEventDispatcher()->dispatch("connection_open", new ConnectionOpenEvent($session)); @@ -57,14 +64,24 @@ public function handleRouterStart(RouterStartEvent $event) { // open the client side $this->internalClient->onOpen($clientTransport); + // internal client shouldn't retry + $this->internalClient->setAttemptRetry(false); + // tell the internal client to start up $this->internalClient->start(false); } + public function handleRouterStop(RouterStopEvent $event) { + if ($this->session) { + $this->session->shutdown(); + } + } + public static function getSubscribedEvents() { return [ - "router.start" => ['handleRouterStart', 10] + "router.start" => ['handleRouterStart', 10], + "router.stop" => ['handleRouterStop', 10] ]; } diff --git a/src/Thruway/Transport/RatchetTransportProvider.php b/src/Thruway/Transport/RatchetTransportProvider.php index 0fc20cc8..9a9ac2a5 100644 --- a/src/Thruway/Transport/RatchetTransportProvider.php +++ b/src/Thruway/Transport/RatchetTransportProvider.php @@ -7,6 +7,7 @@ use Thruway\Event\ConnectionOpenEvent; use Thruway\Event\MessageEvent; use Thruway\Event\RouterStartEvent; +use Thruway\Event\RouterStopEvent; use Thruway\Exception\DeserializationException; use Thruway\Logging\Logger; use Thruway\Message\HelloMessage; @@ -157,10 +158,21 @@ public function handleRouterStart(RouterStartEvent $event) $this->server = new IoServer(new HttpServer($ws), $socket, $this->loop); } + public function handleRouterStop(RouterStopEvent $event) { + if ($this->server) { + $this->server->socket->shutdown(); + } + + foreach ($this->sessions as $k) { + $this->sessions[$k]->shutdown(); + } + } + public static function getSubscribedEvents() { return [ - "router.start" => ["handleRouterStart", 10] + "router.start" => ["handleRouterStart", 10], + "router.stop" => ["handleRouterStop", 10] ]; } } diff --git a/src/Thruway/Transport/RawSocketTransportProvider.php b/src/Thruway/Transport/RawSocketTransportProvider.php index 82e2d3e8..eb02048c 100644 --- a/src/Thruway/Transport/RawSocketTransportProvider.php +++ b/src/Thruway/Transport/RawSocketTransportProvider.php @@ -7,6 +7,7 @@ use Thruway\Event\ConnectionCloseEvent; use Thruway\Event\ConnectionOpenEvent; use Thruway\Event\RouterStartEvent; +use Thruway\Event\RouterStopEvent; use Thruway\Logging\Logger; use Thruway\Serializer\JsonSerializer; @@ -35,6 +36,11 @@ class RawSocketTransportProvider extends AbstractRouterTransportProvider */ private $sessions; + /** + * @var Server + */ + private $server; + /** * Constructor * @@ -98,12 +104,25 @@ public function handleRouterStart(RouterStartEvent $event) { Logger::info($this, "Raw socket listening on " . $this->address . ":" . $this->port); $socket->listen($this->port, $this->address); + + $this->server = $socket; + } + + public function handleRouterStop(RouterStopEvent $event) { + if ($this->server) { + $this->server->shutdown(); + } + + foreach ($this->sessions as $k) { + $this->sessions[$k]->shutdown(); + } } public static function getSubscribedEvents() { return [ - "router.start" => ['handleRouterStart', 10] + "router.start" => ['handleRouterStart', 10], + "router.stop" => ['handleRouterStop', 10] ]; } diff --git a/tests/Unit/Peer/RouterTest.php b/tests/Unit/Peer/RouterTest.php index af8304d2..d6f2cb39 100644 --- a/tests/Unit/Peer/RouterTest.php +++ b/tests/Unit/Peer/RouterTest.php @@ -1126,4 +1126,100 @@ public function testStateRestoreWithNoQueue() { $sessionStateHandler->dispatchMessage($publish); } } + + public function testRouterStop() { + $loop = \React\EventLoop\Factory::create(); + $router = new \Thruway\Peer\Router($loop); + $router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider("127.0.0.1", 18080)); + $loop->addTimer(.1, function () use ($router) { + $router->stop(); + $this->_result = "Stop was called"; + }); + $router->start(); + // if the execution makes it here, stop worked + $this->assertEquals("Stop was called", $this->_result); + } + + public function testRouterStopWithLiveSession() { + $loop = \React\EventLoop\Factory::create(); + $router = new \Thruway\Peer\Router($loop); + $router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider("127.0.0.1", 18080)); + $client = new \Thruway\Peer\Client("some_realm", $loop); + $client->on('open', function () use ($loop, $router) { + $router->stop(); + $this->_result = "Stop was called"; + }); + $client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang + $client->addTransportProvider(new \Thruway\Transport\PawlTransportProvider("ws://127.0.0.1:18080")); + $loop->addTimer(0.1, function () use ($client) { + $client->start(false); // don't start loop yet + }); + $router->start(); + // if the execution makes it here, stop worked + $this->assertEquals("Stop was called", $this->_result); + } + public function testRouterStopWithRawSocketLiveSession() { + $loop = \React\EventLoop\Factory::create(); + $router = new \Thruway\Peer\Router($loop); + $router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider("127.0.0.1", 18080)); + $client = new \Thruway\Peer\Client("some_realm", $loop); + $client->on('open', function () use ($loop, $router) { + $router->stop(); + $this->_result = "Stop was called"; + }); + $client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang + $client->addTransportProvider(new \Thruway\Transport\RawSocketClientTransportProvider("127.0.0.1", 18080)); + $loop->addTimer(0.1, function () use ($client) { + $client->start(false); // don't start loop yet + }); + $router->start(); + // if the execution makes it here, stop worked + $this->assertEquals("Stop was called", $this->_result); + } + public function testRouterStopWithInternalClientLiveSession() { + $loop = \React\EventLoop\Factory::create(); + $router = new \Thruway\Peer\Router($loop); + // just so we have another transport + $router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider("127.0.0.1", 18080)); + $client = new \Thruway\Peer\Client("some_realm", $loop); + $client->on('open', function () use ($loop, $router) { + $router->stop(); + $this->_result = "Stop was called"; + }); + $client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang + $router->addInternalClient($client); + $loop->addTimer(0.1, function () use ($client) { + $client->start(false); // don't start loop yet + }); + $router->start(); + // if the execution makes it here, stop worked + $this->assertEquals("Stop was called", $this->_result); + } + + // This came over from 0.3 but things work differently now + // still should implement removeModule or something for the same type of thing +// public function testRemoveInternalClient() { +// $clientRemovalDeferred = new \React\Promise\Deferred(); +// $loop = \React\EventLoop\Factory::create(); +// $router = new \Thruway\Peer\Router($loop); +// $client = new \Thruway\Peer\Client("some_realm", $loop); +// $client->on('open', function (\Thruway\ClientSession $session, $transport, $details) use ($client, $loop, $router) { +// $session->register('internal_echo', function ($args) { +// return $args; +// })->then(function () use ($client, $loop, $router) { +// $loop->addTimer(0.001, function () use ($client, $router) { +// $router->removeInternalClient($client); +// }); +// }); +// }); +// $client->on('close', function () use ($router) { +// $this->_result = "Client closed"; +// $router->stop(); +// }); +// $router->addInternalClient($client); +// // setup a real listening thing +// $router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider()); +// $router->start(); +// $this->assertEquals("Client closed", $this->_result); +// } } \ No newline at end of file diff --git a/tests/WAMP/WampErrorExceptionTest.php b/tests/WAMP/WampErrorExceptionTest.php index a0fada66..2bc33e98 100644 --- a/tests/WAMP/WampErrorExceptionTest.php +++ b/tests/WAMP/WampErrorExceptionTest.php @@ -7,7 +7,7 @@ function testWampErrorException() { $router = new \Thruway\Peer\Router($loop); - $router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider()); + //$router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider()); $client = new \Thruway\Peer\Client("realm1", $loop); $client->setAttemptRetry(false);