Skip to content

Commit

Permalink
Ported Router::stop to dev master
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Apr 22, 2015
1 parent 78ceb29 commit 40d2691
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 6 deletions.
9 changes: 9 additions & 0 deletions src/Thruway/Event/RouterStopEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php


namespace Thruway\Event;


class RouterStopEvent extends Event {

}
13 changes: 11 additions & 2 deletions src/Thruway/Peer/Router.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

namespace Thruway\Peer;

use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Thruway\Authentication\AllPermissiveAuthorizationManager;
use Thruway\Authentication\AuthorizationManagerInterface;
use Thruway\Common\Utils;
use Thruway\Event\ConnectionCloseEvent;
use Thruway\Event\EventDispatcher;
use Thruway\Event\EventDispatcherInterface;
use Thruway\Event\EventSubscriberInterface;
use Thruway\Event\ConnectionOpenEvent;
use Thruway\Event\RouterStartEvent;
use Thruway\Event\RouterStopEvent;
use Thruway\Exception\InvalidRealmNameException;
use Thruway\Exception\RealmNotFoundException;
use Thruway\Logging\Logger;
Expand Down Expand Up @@ -167,6 +168,14 @@ public function start($runLoop = true)
}
}

/**
* @inheritdoc
*/
public function stop($gracefully = true)
{
$this->getEventDispatcher()->dispatch('router.stop', new RouterStopEvent());
}

/**
* Handle close transport
*
Expand Down
13 changes: 13 additions & 0 deletions src/Thruway/Peer/RouterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ public function addInternalClient(ClientInterface $client);
* @return Session
*/
public function createNewSession(TransportInterface $transport);

/**
* This is to stop the router.
*
* Note that this should bring down all connections and timers associated with the router
* which will cause the loop to exit once there is nothing being watched.
*
* If there are other things added to the loop through clients or otherwise, the loop
* will continue running.
*
* @param bool $gracefully
*/
public function stop($gracefully = true);
}
19 changes: 18 additions & 1 deletion src/Thruway/Transport/InternalClientTransportProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Thruway\Event\ConnectionOpenEvent;
use Thruway\Event\RouterStartEvent;
use Thruway\Event\RouterStopEvent;
use Thruway\Peer\ClientInterface;
use Thruway\Session;

Expand All @@ -20,6 +21,11 @@ class InternalClientTransportProvider extends AbstractRouterTransportProvider
*/
private $internalClient;

/**
* @var Session
*/
private $session;

/**
* Constructor
*
Expand Down Expand Up @@ -50,21 +56,32 @@ public function handleRouterStart(RouterStartEvent $event) {
$transport->setTrusted($this->trusted);

$session = $this->router->createNewSession($transport);
$this->session = $session;

// connect the transport to the Router/Peer
$this->router->getEventDispatcher()->dispatch("connection_open", new ConnectionOpenEvent($session));

// open the client side
$this->internalClient->onOpen($clientTransport);

// internal client shouldn't retry
$this->internalClient->setAttemptRetry(false);

// tell the internal client to start up
$this->internalClient->start(false);
}

public function handleRouterStop(RouterStopEvent $event) {
if ($this->session) {
$this->session->shutdown();
}
}

public static function getSubscribedEvents()
{
return [
"router.start" => ['handleRouterStart', 10]
"router.start" => ['handleRouterStart', 10],
"router.stop" => ['handleRouterStop', 10]
];
}

Expand Down
14 changes: 13 additions & 1 deletion src/Thruway/Transport/RatchetTransportProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Thruway\Event\ConnectionOpenEvent;
use Thruway\Event\MessageEvent;
use Thruway\Event\RouterStartEvent;
use Thruway\Event\RouterStopEvent;
use Thruway\Exception\DeserializationException;
use Thruway\Logging\Logger;
use Thruway\Message\HelloMessage;
Expand Down Expand Up @@ -157,10 +158,21 @@ public function handleRouterStart(RouterStartEvent $event)
$this->server = new IoServer(new HttpServer($ws), $socket, $this->loop);
}

public function handleRouterStop(RouterStopEvent $event) {
if ($this->server) {
$this->server->socket->shutdown();
}

foreach ($this->sessions as $k) {
$this->sessions[$k]->shutdown();
}
}

public static function getSubscribedEvents()
{
return [
"router.start" => ["handleRouterStart", 10]
"router.start" => ["handleRouterStart", 10],
"router.stop" => ["handleRouterStop", 10]
];
}
}
21 changes: 20 additions & 1 deletion src/Thruway/Transport/RawSocketTransportProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Thruway\Event\ConnectionCloseEvent;
use Thruway\Event\ConnectionOpenEvent;
use Thruway\Event\RouterStartEvent;
use Thruway\Event\RouterStopEvent;
use Thruway\Logging\Logger;
use Thruway\Serializer\JsonSerializer;

Expand Down Expand Up @@ -35,6 +36,11 @@ class RawSocketTransportProvider extends AbstractRouterTransportProvider
*/
private $sessions;

/**
* @var Server
*/
private $server;

/**
* Constructor
*
Expand Down Expand Up @@ -98,12 +104,25 @@ public function handleRouterStart(RouterStartEvent $event) {
Logger::info($this, "Raw socket listening on " . $this->address . ":" . $this->port);

$socket->listen($this->port, $this->address);

$this->server = $socket;
}

public function handleRouterStop(RouterStopEvent $event) {
if ($this->server) {
$this->server->shutdown();
}

foreach ($this->sessions as $k) {
$this->sessions[$k]->shutdown();
}
}

public static function getSubscribedEvents()
{
return [
"router.start" => ['handleRouterStart', 10]
"router.start" => ['handleRouterStart', 10],
"router.stop" => ['handleRouterStop', 10]
];
}

Expand Down
96 changes: 96 additions & 0 deletions tests/Unit/Peer/RouterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1126,4 +1126,100 @@ public function testStateRestoreWithNoQueue() {
$sessionStateHandler->dispatchMessage($publish);
}
}

public function testRouterStop() {
$loop = \React\EventLoop\Factory::create();
$router = new \Thruway\Peer\Router($loop);
$router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider("127.0.0.1", 18080));
$loop->addTimer(.1, function () use ($router) {
$router->stop();
$this->_result = "Stop was called";
});
$router->start();
// if the execution makes it here, stop worked
$this->assertEquals("Stop was called", $this->_result);
}

public function testRouterStopWithLiveSession() {
$loop = \React\EventLoop\Factory::create();
$router = new \Thruway\Peer\Router($loop);
$router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider("127.0.0.1", 18080));
$client = new \Thruway\Peer\Client("some_realm", $loop);
$client->on('open', function () use ($loop, $router) {
$router->stop();
$this->_result = "Stop was called";
});
$client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang
$client->addTransportProvider(new \Thruway\Transport\PawlTransportProvider("ws://127.0.0.1:18080"));
$loop->addTimer(0.1, function () use ($client) {
$client->start(false); // don't start loop yet
});
$router->start();
// if the execution makes it here, stop worked
$this->assertEquals("Stop was called", $this->_result);
}
public function testRouterStopWithRawSocketLiveSession() {
$loop = \React\EventLoop\Factory::create();
$router = new \Thruway\Peer\Router($loop);
$router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider("127.0.0.1", 18080));
$client = new \Thruway\Peer\Client("some_realm", $loop);
$client->on('open', function () use ($loop, $router) {
$router->stop();
$this->_result = "Stop was called";
});
$client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang
$client->addTransportProvider(new \Thruway\Transport\RawSocketClientTransportProvider("127.0.0.1", 18080));
$loop->addTimer(0.1, function () use ($client) {
$client->start(false); // don't start loop yet
});
$router->start();
// if the execution makes it here, stop worked
$this->assertEquals("Stop was called", $this->_result);
}
public function testRouterStopWithInternalClientLiveSession() {
$loop = \React\EventLoop\Factory::create();
$router = new \Thruway\Peer\Router($loop);
// just so we have another transport
$router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider("127.0.0.1", 18080));
$client = new \Thruway\Peer\Client("some_realm", $loop);
$client->on('open', function () use ($loop, $router) {
$router->stop();
$this->_result = "Stop was called";
});
$client->setAttemptRetry(false); // we are running on the same loop so if we allow retry, we will hang
$router->addInternalClient($client);
$loop->addTimer(0.1, function () use ($client) {
$client->start(false); // don't start loop yet
});
$router->start();
// if the execution makes it here, stop worked
$this->assertEquals("Stop was called", $this->_result);
}

// This came over from 0.3 but things work differently now
// still should implement removeModule or something for the same type of thing
// public function testRemoveInternalClient() {
// $clientRemovalDeferred = new \React\Promise\Deferred();
// $loop = \React\EventLoop\Factory::create();
// $router = new \Thruway\Peer\Router($loop);
// $client = new \Thruway\Peer\Client("some_realm", $loop);
// $client->on('open', function (\Thruway\ClientSession $session, $transport, $details) use ($client, $loop, $router) {
// $session->register('internal_echo', function ($args) {
// return $args;
// })->then(function () use ($client, $loop, $router) {
// $loop->addTimer(0.001, function () use ($client, $router) {
// $router->removeInternalClient($client);
// });
// });
// });
// $client->on('close', function () use ($router) {
// $this->_result = "Client closed";
// $router->stop();
// });
// $router->addInternalClient($client);
// // setup a real listening thing
// $router->addTransportProvider(new \Thruway\Transport\RatchetTransportProvider());
// $router->start();
// $this->assertEquals("Client closed", $this->_result);
// }
}
2 changes: 1 addition & 1 deletion tests/WAMP/WampErrorExceptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ function testWampErrorException() {

$router = new \Thruway\Peer\Router($loop);

$router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider());
//$router->addTransportProvider(new \Thruway\Transport\RawSocketTransportProvider());

$client = new \Thruway\Peer\Client("realm1", $loop);
$client->setAttemptRetry(false);
Expand Down

0 comments on commit 40d2691

Please sign in to comment.