Skip to content

Commit

Permalink
Update libraries for more recent react work (#13)
Browse files Browse the repository at this point in the history
* Library updates

* Update README.md and add a test

* Import some classes
  • Loading branch information
mbonneau authored and davidwdan committed Sep 12, 2017
1 parent d87f61c commit 94a9243
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 52 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2.0.0

- Updated react libraries (http/http-client)
- Changed API to allow passing of `Connector`

# 1.0.2

- End the request not the response when dispose is called ([b77c5118](https://github.com/RxPHP/RxWebsocket/commit/b77c5118c14d34e034b19383974337aec05d787a))
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ $server->subscribe(function (\Rx\Websocket\MessageSubject $cs) {

#### Server that dumps everything to the console
```php
$server = new \Rx\Websocket\Server('127.0.0.1', 9191);
$server = new \Rx\Websocket\Server('tcp://127.0.0.1:9191');

$server->subscribe(function (\Rx\Websocket\MessageSubject $cs) {
$cs->subscribe(function ($message) {
Expand Down
9 changes: 7 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@
"Rx\\Websocket\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"Rx\\Websocket\\Test\\": "tests/"
}
},
"require": {
"react/http": "^0.4.1",
"react/http-client": "^0.4.10",
"react/http": "^0.7.3",
"react/http-client": "^0.5.3",
"voryx/event-loop": "^2.0",
"ratchet/rfc6455": "^0.2.2",
"reactivex/rxphp": "^2.0"
Expand Down
37 changes: 26 additions & 11 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

namespace Rx\Websocket;

use GuzzleHttp\Psr7\Request as Psr7Request;
use GuzzleHttp\Psr7\Response as Psr7Response;
use GuzzleHttp\Psr7\Uri;
use Ratchet\RFC6455\Handshake\ClientNegotiator;
use React\Dns\Resolver\Factory;
use React\Dns\Resolver\Resolver;
use React\EventLoop\LoopInterface;
use React\HttpClient\Client as HttpClient;
use React\HttpClient\Request;
use React\HttpClient\Response;
use React\Socket\ConnectorInterface;
use Rx\Disposable\CallbackDisposable;
use Rx\DisposableInterface;
use Rx\Observable;
Expand All @@ -22,25 +24,37 @@ class Client extends Observable
private $useMessageObject;
private $subProtocols;
private $loop;
private $dnsResolver;
private $connector;

public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, Resolver $dnsResolver = null)
public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, ConnectorInterface $connector = null)
{
$parsedUrl = parse_url($url);
if (!isset($parsedUrl['scheme']) || !in_array($parsedUrl['scheme'], ['wss', 'ws'])) {
throw new \InvalidArgumentException('url must use either ws or wss scheme');
}

if ($parsedUrl['scheme'] === 'wss') {
$url = 'https://' . substr($url, 6);
}

if ($parsedUrl['scheme'] === 'ws') {
$url = 'http://' . substr($url, 5);
}

$this->url = $url;
$this->useMessageObject = $useMessageObject;
$this->subProtocols = $subProtocols;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->dnsResolver = $dnsResolver ?: (new Factory())->createCached('8.8.8.8', $this->loop);
$this->connector = $connector;
}

public function _subscribe(ObserverInterface $clientObserver): DisposableInterface
{
$factory = new \React\HttpClient\Factory();
$client = $factory->create($this->loop, $this->dnsResolver);
$client = new HttpClient($this->loop, $this->connector);

$cNegotiator = new ClientNegotiator();

/** @var \GuzzleHttp\Psr7\Request $nRequest */
/** @var Psr7Request $nRequest */
$nRequest = $cNegotiator->generateRequest(new Uri($this->url));

if (!empty($this->subProtocols)) {
Expand All @@ -67,14 +81,14 @@ public function _subscribe(ObserverInterface $clientObserver): DisposableInterfa
throw new \Exception('Unexpected response code ' . $response->getCode());
}

$psr7Response = new \GuzzleHttp\Psr7\Response(
$psr7Response = new Psr7Response(
$response->getCode(),
$response->getHeaders(),
null,
$response->getVersion()
);

$psr7Request = new \GuzzleHttp\Psr7\Request('GET', $this->url, $flatHeaders);
$psr7Request = new Psr7Request('GET', $this->url, $flatHeaders);

if (!$cNegotiator->validateResponse($psr7Request, $psr7Response)) {
throw new \Exception('Invalid response');
Expand Down Expand Up @@ -127,7 +141,8 @@ function () use ($request) {
));
});

$request->writeHead();
// empty write to force connection and header send
$request->write('');

return new CallbackDisposable(function () use ($request) {
$request->close();
Expand Down
80 changes: 45 additions & 35 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

namespace Rx\Websocket;

use GuzzleHttp\Psr7\Uri;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\ServerRequestInterface;
use Ratchet\RFC6455\Handshake\RequestVerifier;
use Ratchet\RFC6455\Handshake\ServerNegotiator;
use React\EventLoop\LoopInterface;
use React\Http\Request;
use React\Http\Response;
use React\Http\Server as HttpServer;
use React\Socket\Server as SocketServer;
use React\Stream\CompositeStream;
use React\Stream\ReadableStreamInterface;
use React\Stream\ThroughStream;
use Rx\Disposable\CallbackDisposable;
use Rx\DisposableInterface;
use Rx\Observable;
Expand All @@ -23,53 +28,56 @@ class Server extends Observable
private $subProtocols;
private $loop;

public function __construct(string $bindAddress, int $port, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null)
public function __construct(string $bindAddressOrPort, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null)
{
$this->bindAddress = $bindAddress;
$this->port = $port;
$this->bindAddress = $bindAddressOrPort;
$this->useMessageObject = $useMessageObject;
$this->subProtocols = $subProtocols;
$this->loop = $loop ?: \EventLoop\getLoop();
}

public function _subscribe(ObserverInterface $observer): DisposableInterface
{
$socket = new \React\Socket\Server($this->loop);
$socket = new SocketServer($this->bindAddress, $this->loop);

$negotiator = new ServerNegotiator(new RequestVerifier());
if (!empty($this->subProtocols)) {
$negotiator->setSupportedSubProtocols($this->subProtocols);
}

$http = new \React\Http\Server($socket);
$http->on('request', function (Request $request, Response $response) use ($negotiator, $observer, &$outStream) {
$uri = new Uri($request->getPath());
if (count($request->getQuery()) > 0) {
$uri = $uri->withQuery(\GuzzleHttp\Psr7\build_query($request->getQuery()));
}
$http = new HttpServer(function (ServerRequestInterface $request) use ($negotiator, $observer) {
$uri = $request->getUri();

$psrRequest = new \GuzzleHttp\Psr7\Request(
$psrRequest = new Request(
$request->getMethod(),
$uri,
$request->getHeaders()
);

// cram the remote address into the header in our own X- header so
// the user will have access to it
$psrRequest = $psrRequest->withAddedHeader('X-RxWebsocket-Remote-Address', $request->remoteAddress);
$psrRequest = $psrRequest->withAddedHeader('X-RxWebsocket-Remote-Address', $request->getServerParams()['REMOTE_ADDR'] ?? '');

$negotiatorResponse = $negotiator->handshake($psrRequest);

$response->writeHead(
/** @var ReadableStreamInterface $requestStream */
$requestStream = new ThroughStream();
$responseStream = new ThroughStream();

$response = new Response(
$negotiatorResponse->getStatusCode(),
array_merge(
$negotiatorResponse->getHeaders(),
['Content-Length' => '0']
$negotiatorResponse->getHeaders()
),
new CompositeStream(
$responseStream,
$requestStream
)
);


if ($negotiatorResponse->getStatusCode() !== 101) {
$response->end();
$responseStream->end();
return;
}

Expand All @@ -78,38 +86,38 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
$subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0];
}

$connection = new MessageSubject(
$messageSubject = new MessageSubject(
new AnonymousObservable(
function (ObserverInterface $observer) use ($request) {
$request->on('data', function ($data) use ($observer) {
function (ObserverInterface $observer) use ($requestStream) {
$requestStream->on('data', function ($data) use ($observer) {
$observer->onNext($data);
});
$request->on('error', function ($error) use ($observer) {
$requestStream->on('error', function ($error) use ($observer) {
$observer->onError($error);
});
$request->on('close', function () use ($observer) {
$requestStream->on('close', function () use ($observer) {
$observer->onCompleted();
});
$request->on('end', function () use ($observer) {
$requestStream->on('end', function () use ($observer) {
$observer->onCompleted();
});

return new CallbackDisposable(
function () use ($request) {
$request->close();
function () use ($requestStream) {
$requestStream->close();
}
);
}
),
new CallbackObserver(
function ($x) use ($response) {
$response->write($x);
function ($x) use ($responseStream) {
$responseStream->write($x);
},
function ($error) use ($response) {
$response->close();
function ($error) use ($responseStream) {
$responseStream->close();
},
function () use ($response) {
$response->end();
function () use ($responseStream) {
$responseStream->end();
}
),
false,
Expand All @@ -119,13 +127,15 @@ function () use ($response) {
$negotiatorResponse
);

$observer->onNext($connection);
$observer->onNext($messageSubject);

return $response;
});

$socket->listen($this->port, $this->bindAddress);
$http->listen($socket);

return new CallbackDisposable(function () use ($socket) {
$socket->shutdown();
$socket->close();
});
}
}
36 changes: 36 additions & 0 deletions test/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
namespace Rx\Websocket\Test;

use React\EventLoop\Factory;
use Rx\Websocket\Client;
use Rx\Websocket\MessageSubject;
use Rx\Websocket\Server;

class ClientTest extends \PHPUnit_Framework_TestCase
{
public function testErrorBeforeRequest()
{
$loop = Factory::create();

// expecting connection error
$client = new \Rx\Websocket\Client('ws://127.0.0.1:12340/', false, [], $loop);

$errored = false;
Expand All @@ -25,4 +29,36 @@ function ($err) use (&$errored) {

$this->assertTrue($errored);
}

public function testRequestEndOnDispose()
{
$this->markTestSkipped();
$loop = Factory::create();

$server = new Server('tcp://127.0.0.1:1234', false, [], $loop);
$serverDisp = $server->subscribe(function (MessageSubject $ms) {
$ms->map('strrev')->subscribe($ms);
});

$value = null;

$client = new Client('ws://127.0.0.1:1234/', false, [], $loop);
$client
->subscribe(function (MessageSubject $ms) use ($serverDisp) {
$ms->onNext('Hello');
$ms
->finally(function () use ($serverDisp) {
$serverDisp->dispose();
})
->take(1)
->subscribe(function ($x) use (&$value) {
$this->assertNull($value);
$value = $x;
});
});

$loop->run();

$this->assertEquals('olleH', $value);
}
}
2 changes: 1 addition & 1 deletion test/ab/run_ab_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ php clientRunner.php

sleep 2

php testServer.php 60 &
php testServer.php 600 &
sleep 3
wstest -m fuzzingclient -s fuzzingclient.json
sleep 12
2 changes: 1 addition & 1 deletion test/ab/testServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$timerObservable = Observable::timer(1000 * $argv[1]);
}

$server = new \Rx\Websocket\Server("127.0.0.1", 9001, true);
$server = new \Rx\Websocket\Server("tcp://127.0.0.1:9001", true);

$server
->takeUntil($timerObservable)
Expand Down
1 change: 0 additions & 1 deletion test/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
foreach ($files as $file) {
if (file_exists($file)) {
$loader = require $file;
$loader->addPsr4('Rx\\Websocket\\Test\\', __DIR__);
break;
}
}

0 comments on commit 94a9243

Please sign in to comment.