From 286662738c08a06de34749114446bf31097d8812 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Sun, 10 Dec 2023 19:36:03 +0400 Subject: [PATCH] Implement basic RPC via websocket --- src/Sender/Websocket/ConnectionPool.php | 11 ++-- src/Sender/Websocket/Event.php | 2 +- src/Sender/Websocket/EventsStorage.php | 5 ++ src/Sender/Websocket/FrameHandler.php | 2 +- src/Sender/Websocket/RPC.php | 82 +++++++++++++++++++++++++ src/Sender/Websocket/RPC/Connected.php | 38 ++++++++++++ src/Sender/Websocket/RPC/Push.php | 12 ++-- src/Sender/Websocket/RPC/Success.php | 33 ++++++++++ src/Sender/WebsocketSender.php | 2 +- src/Traffic/Websocket/Frame.php | 2 +- 10 files changed, 175 insertions(+), 14 deletions(-) create mode 100644 src/Sender/Websocket/RPC.php create mode 100644 src/Sender/Websocket/RPC/Connected.php create mode 100644 src/Sender/Websocket/RPC/Success.php diff --git a/src/Sender/Websocket/ConnectionPool.php b/src/Sender/Websocket/ConnectionPool.php index 90c36f8a..49cac2be 100644 --- a/src/Sender/Websocket/ConnectionPool.php +++ b/src/Sender/Websocket/ConnectionPool.php @@ -24,7 +24,8 @@ final class ConnectionPool implements IteratorAggregate, Processable private array $fibers = []; public function __construct( - private readonly Logger $logger + private readonly Logger $logger, + private RPC $rpc, ) { } @@ -82,11 +83,11 @@ private function processSocket(StreamClient $stream): void foreach ($stream as $chunk) { // \error_log('Read chunk: ' . $chunk); $frame = Frame::read($chunk); - // \trap($frame); + $response = $this->rpc->handleMessage($frame->content); - // \error_log('Websocket encoded data: ' .\gzdecode($frame->content)); - // \trap(); - // $data = $stream->(); + if (null !== $response) { + $stream->sendData(Frame::text($response)->__toString()); + } } $stream->waitData(); diff --git a/src/Sender/Websocket/Event.php b/src/Sender/Websocket/Event.php index 3e03c163..b791ee8d 100644 --- a/src/Sender/Websocket/Event.php +++ b/src/Sender/Websocket/Event.php @@ -15,7 +15,7 @@ public function __construct( public readonly string $type, public readonly array $payload, public readonly float $timestamp, - public readonly ?string $project_id = null, + public readonly ?string $projectId = null, ) { } } diff --git a/src/Sender/Websocket/EventsStorage.php b/src/Sender/Websocket/EventsStorage.php index 0aaddf4a..2f859bae 100644 --- a/src/Sender/Websocket/EventsStorage.php +++ b/src/Sender/Websocket/EventsStorage.php @@ -35,4 +35,9 @@ public function getIterator(): \Traversable { return new \ArrayIterator($this->events); } + + public function delete(string $key): void + { + unset($this->events[$key]); + } } diff --git a/src/Sender/Websocket/FrameHandler.php b/src/Sender/Websocket/FrameHandler.php index fc1551d0..cfd5339d 100644 --- a/src/Sender/Websocket/FrameHandler.php +++ b/src/Sender/Websocket/FrameHandler.php @@ -33,7 +33,7 @@ public function handle(Frame $frame): void new Push( event: 'event.received', channel: 'events', - data: [$event], + data: $event, ), JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE, ), diff --git a/src/Sender/Websocket/RPC.php b/src/Sender/Websocket/RPC.php new file mode 100644 index 00000000..96c8965e --- /dev/null +++ b/src/Sender/Websocket/RPC.php @@ -0,0 +1,82 @@ +callMethod($id, $method); + return $response === null ? null : self::json_encode($response); + } + } catch (\Throwable $e) { + $this->logger->exception($e); + } + return null; + } + + private function callMethod(int|string $id, string $initMethod): ?JsonSerializable + { + [$method, $path] = \explode(':', $initMethod, 2); + + switch ($method) { + case 'delete': + if (\str_starts_with($path, 'api/event/')) { + $uuid = \substr($path, 10); + $this->eventsStorage->delete($uuid); + return new RPC\Success(id: $id, code: 200, status: true); + } + if (\str_starts_with($path, 'api/events')) { + $this->eventsStorage->clear(); + return new RPC\Success(id: $id, code: 200, status: true); + } + break; + default: + $this->logger->error('Unknown RPC method: ' . $initMethod); + } + + return null; + } +} diff --git a/src/Sender/Websocket/RPC/Connected.php b/src/Sender/Websocket/RPC/Connected.php new file mode 100644 index 00000000..b4a10ce4 --- /dev/null +++ b/src/Sender/Websocket/RPC/Connected.php @@ -0,0 +1,38 @@ + [ + 'client' => $this->client, + 'ping' => $this->ping, + 'pong' => $this->pong, + 'subs' => [ + 'events' => (object)[], + ], + 'version' => Info::VERSION, + ], + 'id' => $this->id, + ]; + } +} diff --git a/src/Sender/Websocket/RPC/Push.php b/src/Sender/Websocket/RPC/Push.php index 05a9d0c2..7243db8d 100644 --- a/src/Sender/Websocket/RPC/Push.php +++ b/src/Sender/Websocket/RPC/Push.php @@ -21,11 +21,13 @@ public function __construct( public function jsonSerialize(): array { return [ - 'channel' => $this->channel, - 'pub' => [ - 'data' => [ - 'data' => $this->data, - 'event' => $this->event, + 'push' => [ + 'channel' => $this->channel, + 'pub' => [ + 'data' => [ + 'event' => $this->event, + 'data' => $this->data, + ], ], ], ]; diff --git a/src/Sender/Websocket/RPC/Success.php b/src/Sender/Websocket/RPC/Success.php new file mode 100644 index 00000000..83c69f21 --- /dev/null +++ b/src/Sender/Websocket/RPC/Success.php @@ -0,0 +1,33 @@ + $this->id, + 'rpc' => [ + 'data' => [ + 'code' => $this->code, + 'status' => $this->status, + ], + ], + ]; + } +} diff --git a/src/Sender/WebsocketSender.php b/src/Sender/WebsocketSender.php index 01c75aca..dc4d867a 100644 --- a/src/Sender/WebsocketSender.php +++ b/src/Sender/WebsocketSender.php @@ -19,8 +19,8 @@ public static function create( ?Websocket\ConnectionPool $connectionPool = null, ?Websocket\EventsStorage $eventStorage = null, ): self { - $connectionPool ??= new Websocket\ConnectionPool($logger); $eventStorage ??= new Websocket\EventsStorage(); + $connectionPool ??= new Websocket\ConnectionPool($logger, new Websocket\RPC($logger, $eventStorage)); return new self( $connectionPool, $eventStorage, diff --git a/src/Traffic/Websocket/Frame.php b/src/Traffic/Websocket/Frame.php index 86d776a9..acd975f6 100644 --- a/src/Traffic/Websocket/Frame.php +++ b/src/Traffic/Websocket/Frame.php @@ -184,7 +184,7 @@ public function __toString(): string return \sprintf( '%s%s%s', - \chr(128 | $this->opcode->value), + \chr(128 | ($this->rsv1 ? 64 : 0) | $this->opcode->value), match (true) { $len < 126 => \chr($len), $len < 65536 => \pack('Cn', 126, $len),