From ea65f68d59a77d1f03319069c8fdd48d0a0fb2b4 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Fri, 15 Dec 2023 17:34:27 +0400 Subject: [PATCH] Refactor RPC and ConnectionPool --- src/Sender/Websocket/ConnectionPool.php | 42 ++++++++++---- src/Sender/Websocket/RPC.php | 57 ++++++------------- .../RPC/{Connected.php => Connect.php} | 18 +++--- src/Sender/Websocket/RPC/Response.php | 30 ++++++++++ src/Sender/Websocket/RPC/Rpc.php | 25 ++++++++ src/Sender/Websocket/RPC/Success.php | 14 ++--- src/Sender/Websocket/Service.php | 36 ++++++++++++ src/Support/Json.php | 13 +++++ 8 files changed, 163 insertions(+), 72 deletions(-) rename src/Sender/Websocket/RPC/{Connected.php => Connect.php} (51%) create mode 100644 src/Sender/Websocket/RPC/Response.php create mode 100644 src/Sender/Websocket/RPC/Rpc.php create mode 100644 src/Sender/Websocket/Service.php diff --git a/src/Sender/Websocket/ConnectionPool.php b/src/Sender/Websocket/ConnectionPool.php index 9d6263e2..637957d4 100644 --- a/src/Sender/Websocket/ConnectionPool.php +++ b/src/Sender/Websocket/ConnectionPool.php @@ -6,14 +6,18 @@ use Buggregator\Trap\Logger; use Buggregator\Trap\Processable; +use Buggregator\Trap\Sender\Websocket\RPC\Connect; +use Buggregator\Trap\Sender\Websocket\RPC\Response; use Buggregator\Trap\Support\Json; use Buggregator\Trap\Support\Timer; +use Buggregator\Trap\Support\Uuid; use Buggregator\Trap\Traffic\StreamClient; use Buggregator\Trap\Traffic\Websocket\Frame; use Buggregator\Trap\Traffic\Websocket\Opcode; use Buggregator\Trap\Traffic\Websocket\StreamReader; use Fiber; use IteratorAggregate; +use JsonSerializable; use Traversable; /** @@ -91,28 +95,46 @@ private function processSocket(StreamClient $stream): void } // Ping-pong - $frame->opcode === Opcode::Ping and $stream->sendData(Frame::pong($frame->content)->__toString()); + $frame->opcode === Opcode::Ping and $stream->sendData($this->packPayload('', Opcode::Pong)); - // RPC - $response = $this->rpc->handleMessage($frame->content); + // Pong using `{}` message + if ($frame->content === '{}') { + $stream->sendData($this->packPayload('{}')); + continue; + } + + // Message must be JSON only + $payload = Json::decode($frame->content); + $response = new Response($payload['id'] ?? 0); - // On connected ping using `{}` message - if ($response instanceof RPC\Connected && $response->ping > 0){ - $pingTimer = new Timer($response->ping); + // On connected start periodic ping using `{}` message + if (isset($payload['connect'])){ + $response->connect = new Connect(Uuid::uuid4()); + + $pingTimer = new Timer($response->connect->ping); $this->fibers[] = new Fiber( function () use ($stream, $pingTimer): void { while ($pingTimer->wait() && !$stream->isDisconnected()) { - $stream->sendData(Frame::text('{}')->__toString()); + $stream->sendData($this->packPayload('{}')); $pingTimer->reset(); } } ); } - if (null !== $response) { - $stream->sendData(Frame::text(Json::encode($response))->__toString()); - $pingTimer?->reset(); + // RPC + if (isset($payload['rpc'])) { + $response->rpc = $this->rpc->handleMessage($payload['rpc']); } + + $stream->sendData($this->packPayload(Json::encode($response))); + // Reset ping timer on any message + $pingTimer?->reset(); } } + + private function packPayload(string|JsonSerializable $payload, Opcode $type = Opcode::Text): string + { + return (new Frame(\is_string($payload) ? $payload : Json::encode($payload), $type, true))->__toString(); + } } diff --git a/src/Sender/Websocket/RPC.php b/src/Sender/Websocket/RPC.php index 8e5f3035..8587bcab 100644 --- a/src/Sender/Websocket/RPC.php +++ b/src/Sender/Websocket/RPC.php @@ -4,13 +4,9 @@ namespace Buggregator\Trap\Sender\Websocket; -use Buggregator\Trap\Handler\Router\Attribute\RegexpRoute; -use Buggregator\Trap\Handler\Router\Attribute\StaticRoute; use Buggregator\Trap\Handler\Router\Method; use Buggregator\Trap\Handler\Router\Router; use Buggregator\Trap\Logger; -use Buggregator\Trap\Support\Uuid; -use JsonSerializable; /** * @internal @@ -21,32 +17,27 @@ final class RPC public function __construct( private readonly Logger $logger, - private readonly EventsStorage $eventsStorage, + EventsStorage $eventsStorage, ) { - $this->router = Router::new($this); + $this->router = Router::new(new Service($logger, $eventsStorage)); } - public function handleMessage(string $message): ?object + /** + * @param array{ + * method?: non-empty-string, + * } $message + */ + public function handleMessage(mixed $message): ?RPC\Rpc { try { - if ($message === '') { - return (object)[]; - } - - $json = \json_decode($message, true, 512, \JSON_THROW_ON_ERROR); - if (!\is_array($json)) { + if (!\is_array($message)) { return null; } - $id = $json['id'] ?? 1; - if (isset($json['connect'])) { - return new RPC\Connected(id: $id, client: Uuid::uuid4()); - } + if (isset($message['method'])) { + $method = $message['method']; - if (isset($json['rpc']['method'])) { - $method = $json['rpc']['method']; - - return $this->callMethod($id, $method); + return $this->callMethod($method); } } catch (\Throwable $e) { $this->logger->exception($e); @@ -54,34 +45,18 @@ public function handleMessage(string $message): ?object return null; } - #[RegexpRoute(Method::Delete, '#^/api/events/(?[a-f0-9-]++)#i')] - public function eventDelete(string $uuid): bool - { - $this->eventsStorage->delete($uuid); - return true; - } - - #[StaticRoute(Method::Delete, 'api/events')] - public function eventsDelete(): bool - { - $this->eventsStorage->clear(); - return true; - } - - private function callMethod(int|string $id, string $initMethod): ?JsonSerializable + private function callMethod(string $initMethod): ?RPC\Rpc { [$method, $path] = \explode(':', $initMethod, 2); $route = $this->router->match(Method::fromString($method), $path ?? ''); if ($route === null) { - // todo: Error message + // todo: Error message? return null; } - $result = $route(id: $id); - return $result === true - ? new RPC\Success(id: $id, code: 200, status: true) - : $result; + $result = $route(); + return $result === null ? null : new RPC\Rpc(data: $result); } } diff --git a/src/Sender/Websocket/RPC/Connected.php b/src/Sender/Websocket/RPC/Connect.php similarity index 51% rename from src/Sender/Websocket/RPC/Connected.php rename to src/Sender/Websocket/RPC/Connect.php index 10b4c49e..21f7d399 100644 --- a/src/Sender/Websocket/RPC/Connected.php +++ b/src/Sender/Websocket/RPC/Connect.php @@ -10,10 +10,9 @@ /** * @internal */ -final class Connected implements JsonSerializable +final class Connect implements JsonSerializable { public function __construct( - public readonly string|int $id, public readonly string $client, public readonly int $ping = 25, public readonly bool $pong = true, @@ -23,16 +22,13 @@ public function __construct( public function jsonSerialize(): array { return [ - 'id' => $this->id, - 'connect' => [ - 'client' => $this->client, - 'version' => Info::VERSION, - 'subs' => [ - 'events' => (object)[], - ], - 'ping' => $this->ping, - 'pong' => $this->pong, + 'client' => $this->client, + 'version' => Info::VERSION, + 'subs' => [ + 'events' => (object)[], ], + 'ping' => $this->ping, + 'pong' => $this->pong, ]; } } diff --git a/src/Sender/Websocket/RPC/Response.php b/src/Sender/Websocket/RPC/Response.php new file mode 100644 index 00000000..fcbb0f08 --- /dev/null +++ b/src/Sender/Websocket/RPC/Response.php @@ -0,0 +1,30 @@ + $this->id]; + + $this->rpc and $data['rpc'] = $this->rpc; + $this->connect and $data['connect'] = $this->connect; + + return $data; + } +} diff --git a/src/Sender/Websocket/RPC/Rpc.php b/src/Sender/Websocket/RPC/Rpc.php new file mode 100644 index 00000000..e5596c6a --- /dev/null +++ b/src/Sender/Websocket/RPC/Rpc.php @@ -0,0 +1,25 @@ + $this->data, + ]; + } +} diff --git a/src/Sender/Websocket/RPC/Success.php b/src/Sender/Websocket/RPC/Success.php index 83c69f21..f5057341 100644 --- a/src/Sender/Websocket/RPC/Success.php +++ b/src/Sender/Websocket/RPC/Success.php @@ -12,22 +12,16 @@ final class Success implements JsonSerializable { public function __construct( - public readonly string|int $id, - public readonly int $code, - public readonly bool $status, + public readonly int $code = 200, + public readonly bool $status = true, ) { } public function jsonSerialize(): array { return [ - 'id' => $this->id, - 'rpc' => [ - 'data' => [ - 'code' => $this->code, - 'status' => $this->status, - ], - ], + 'code' => $this->code, + 'status' => $this->status, ]; } } diff --git a/src/Sender/Websocket/Service.php b/src/Sender/Websocket/Service.php new file mode 100644 index 00000000..f77d687d --- /dev/null +++ b/src/Sender/Websocket/Service.php @@ -0,0 +1,36 @@ +[a-f0-9-]++)#i')] + public function eventDelete(string $uuid): bool + { + $this->eventsStorage->delete($uuid); + return true; + } + + #[StaticRoute(Method::Delete, 'api/events')] + public function eventsDelete(): bool + { + $this->eventsStorage->clear(); + return true; + } +} diff --git a/src/Support/Json.php b/src/Support/Json.php index 77f96e29..af0933db 100644 --- a/src/Support/Json.php +++ b/src/Support/Json.php @@ -4,12 +4,17 @@ namespace Buggregator\Trap\Support; +use JsonException; + /** * @internal * @psalm-internal Buggregator\Trap */ final class Json { + /** + * @throws JsonException + */ public static function encode(mixed $value): string { return \json_encode( @@ -17,4 +22,12 @@ public static function encode(mixed $value): string \JSON_THROW_ON_ERROR | \JSON_INVALID_UTF8_SUBSTITUTE | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE, ); } + + /** + * @throws JsonException + */ + public static function decode(string $content): mixed + { + return \json_decode($content, true, 512, \JSON_THROW_ON_ERROR); + } }