Skip to content

Commit

Permalink
Refactor RPC and ConnectionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk committed Dec 15, 2023
1 parent 04c331a commit ea65f68
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 72 deletions.
42 changes: 32 additions & 10 deletions src/Sender/Websocket/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@

use Buggregator\Trap\Logger;
use Buggregator\Trap\Processable;
use Buggregator\Trap\Sender\Websocket\RPC\Connect;
use Buggregator\Trap\Sender\Websocket\RPC\Response;
use Buggregator\Trap\Support\Json;
use Buggregator\Trap\Support\Timer;
use Buggregator\Trap\Support\Uuid;
use Buggregator\Trap\Traffic\StreamClient;
use Buggregator\Trap\Traffic\Websocket\Frame;
use Buggregator\Trap\Traffic\Websocket\Opcode;
use Buggregator\Trap\Traffic\Websocket\StreamReader;
use Fiber;
use IteratorAggregate;
use JsonSerializable;
use Traversable;

/**
Expand Down Expand Up @@ -91,28 +95,46 @@ private function processSocket(StreamClient $stream): void
}

// Ping-pong
$frame->opcode === Opcode::Ping and $stream->sendData(Frame::pong($frame->content)->__toString());
$frame->opcode === Opcode::Ping and $stream->sendData($this->packPayload('', Opcode::Pong));

// RPC
$response = $this->rpc->handleMessage($frame->content);
// Pong using `{}` message
if ($frame->content === '{}') {
$stream->sendData($this->packPayload('{}'));
continue;
}

// Message must be JSON only
$payload = Json::decode($frame->content);
$response = new Response($payload['id'] ?? 0);

// On connected ping using `{}` message
if ($response instanceof RPC\Connected && $response->ping > 0){
$pingTimer = new Timer($response->ping);
// On connected start periodic ping using `{}` message
if (isset($payload['connect'])){
$response->connect = new Connect(Uuid::uuid4());

$pingTimer = new Timer($response->connect->ping);
$this->fibers[] = new Fiber(
function () use ($stream, $pingTimer): void {
while ($pingTimer->wait() && !$stream->isDisconnected()) {
$stream->sendData(Frame::text('{}')->__toString());
$stream->sendData($this->packPayload('{}'));
$pingTimer->reset();
}
}
);
}

if (null !== $response) {
$stream->sendData(Frame::text(Json::encode($response))->__toString());
$pingTimer?->reset();
// RPC
if (isset($payload['rpc'])) {
$response->rpc = $this->rpc->handleMessage($payload['rpc']);
}

$stream->sendData($this->packPayload(Json::encode($response)));
// Reset ping timer on any message
$pingTimer?->reset();
}
}

private function packPayload(string|JsonSerializable $payload, Opcode $type = Opcode::Text): string
{
return (new Frame(\is_string($payload) ? $payload : Json::encode($payload), $type, true))->__toString();
}
}
57 changes: 16 additions & 41 deletions src/Sender/Websocket/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@

namespace Buggregator\Trap\Sender\Websocket;

use Buggregator\Trap\Handler\Router\Attribute\RegexpRoute;
use Buggregator\Trap\Handler\Router\Attribute\StaticRoute;
use Buggregator\Trap\Handler\Router\Method;
use Buggregator\Trap\Handler\Router\Router;
use Buggregator\Trap\Logger;
use Buggregator\Trap\Support\Uuid;
use JsonSerializable;

/**
* @internal
Expand All @@ -21,67 +17,46 @@ final class RPC

public function __construct(
private readonly Logger $logger,
private readonly EventsStorage $eventsStorage,
EventsStorage $eventsStorage,
) {
$this->router = Router::new($this);
$this->router = Router::new(new Service($logger, $eventsStorage));
}

public function handleMessage(string $message): ?object
/**
* @param array{
* method?: non-empty-string,
* } $message
*/
public function handleMessage(mixed $message): ?RPC\Rpc
{
try {
if ($message === '') {
return (object)[];
}

$json = \json_decode($message, true, 512, \JSON_THROW_ON_ERROR);
if (!\is_array($json)) {
if (!\is_array($message)) {
return null;
}
$id = $json['id'] ?? 1;

if (isset($json['connect'])) {
return new RPC\Connected(id: $id, client: Uuid::uuid4());
}
if (isset($message['method'])) {
$method = $message['method'];

if (isset($json['rpc']['method'])) {
$method = $json['rpc']['method'];

return $this->callMethod($id, $method);
return $this->callMethod($method);
}
} catch (\Throwable $e) {
$this->logger->exception($e);
}
return null;
}

#[RegexpRoute(Method::Delete, '#^/api/events/(?<uuid>[a-f0-9-]++)#i')]
public function eventDelete(string $uuid): bool
{
$this->eventsStorage->delete($uuid);
return true;
}

#[StaticRoute(Method::Delete, 'api/events')]
public function eventsDelete(): bool
{
$this->eventsStorage->clear();
return true;
}

private function callMethod(int|string $id, string $initMethod): ?JsonSerializable
private function callMethod(string $initMethod): ?RPC\Rpc
{
[$method, $path] = \explode(':', $initMethod, 2);

$route = $this->router->match(Method::fromString($method), $path ?? '');

if ($route === null) {
// todo: Error message
// todo: Error message?
return null;
}
$result = $route(id: $id);

return $result === true
? new RPC\Success(id: $id, code: 200, status: true)
: $result;
$result = $route();
return $result === null ? null : new RPC\Rpc(data: $result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
/**
* @internal
*/
final class Connected implements JsonSerializable
final class Connect implements JsonSerializable
{
public function __construct(
public readonly string|int $id,
public readonly string $client,
public readonly int $ping = 25,
public readonly bool $pong = true,
Expand All @@ -23,16 +22,13 @@ public function __construct(
public function jsonSerialize(): array
{
return [
'id' => $this->id,
'connect' => [
'client' => $this->client,
'version' => Info::VERSION,
'subs' => [
'events' => (object)[],
],
'ping' => $this->ping,
'pong' => $this->pong,
'client' => $this->client,
'version' => Info::VERSION,
'subs' => [
'events' => (object)[],
],
'ping' => $this->ping,
'pong' => $this->pong,
];
}
}
30 changes: 30 additions & 0 deletions src/Sender/Websocket/RPC/Response.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket\RPC;

use JsonSerializable;

/**
* @internal
*/
final class Response implements JsonSerializable
{
public function __construct(
public readonly string|int $id,
public ?Rpc $rpc = null,
public ?Connect $connect = null,
) {
}

public function jsonSerialize(): array
{
$data = ['id' => $this->id];

$this->rpc and $data['rpc'] = $this->rpc;
$this->connect and $data['connect'] = $this->connect;

return $data;
}
}
25 changes: 25 additions & 0 deletions src/Sender/Websocket/RPC/Rpc.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket\RPC;

use JsonSerializable;

/**
* @internal
*/
final class Rpc implements JsonSerializable
{
public function __construct(
public readonly mixed $data,
) {
}

public function jsonSerialize(): array
{
return [
'data' => $this->data,
];
}
}
14 changes: 4 additions & 10 deletions src/Sender/Websocket/RPC/Success.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,16 @@
final class Success implements JsonSerializable
{
public function __construct(
public readonly string|int $id,
public readonly int $code,
public readonly bool $status,
public readonly int $code = 200,
public readonly bool $status = true,
) {
}

public function jsonSerialize(): array
{
return [
'id' => $this->id,
'rpc' => [
'data' => [
'code' => $this->code,
'status' => $this->status,
],
],
'code' => $this->code,
'status' => $this->status,
];
}
}
36 changes: 36 additions & 0 deletions src/Sender/Websocket/Service.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket;

use Buggregator\Trap\Handler\Router\Attribute\RegexpRoute;
use Buggregator\Trap\Handler\Router\Attribute\StaticRoute;
use Buggregator\Trap\Handler\Router\Method;
use Buggregator\Trap\Logger;

/**
* @internal
*/
final class Service
{
public function __construct(
private readonly Logger $logger,
private readonly EventsStorage $eventsStorage,
) {
}

#[RegexpRoute(Method::Delete, '#^/api/events/(?<uuid>[a-f0-9-]++)#i')]
public function eventDelete(string $uuid): bool
{
$this->eventsStorage->delete($uuid);
return true;
}

#[StaticRoute(Method::Delete, 'api/events')]
public function eventsDelete(): bool
{
$this->eventsStorage->clear();
return true;
}
}
13 changes: 13 additions & 0 deletions src/Support/Json.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,30 @@

namespace Buggregator\Trap\Support;

use JsonException;

/**
* @internal
* @psalm-internal Buggregator\Trap
*/
final class Json
{
/**
* @throws JsonException
*/
public static function encode(mixed $value): string
{
return \json_encode(
$value,
\JSON_THROW_ON_ERROR | \JSON_INVALID_UTF8_SUBSTITUTE | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE,
);
}

/**
* @throws JsonException
*/
public static function decode(string $content): mixed
{
return \json_decode($content, true, 512, \JSON_THROW_ON_ERROR);
}
}

0 comments on commit ea65f68

Please sign in to comment.