Skip to content

Commit

Permalink
Implement basic RPC via websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk committed Dec 10, 2023
1 parent f47b3ce commit 2866627
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 14 deletions.
11 changes: 6 additions & 5 deletions src/Sender/Websocket/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ final class ConnectionPool implements IteratorAggregate, Processable
private array $fibers = [];

public function __construct(
private readonly Logger $logger
private readonly Logger $logger,
private RPC $rpc,
) {
}

Expand Down Expand Up @@ -82,11 +83,11 @@ private function processSocket(StreamClient $stream): void
foreach ($stream as $chunk) {
// \error_log('Read chunk: ' . $chunk);
$frame = Frame::read($chunk);
// \trap($frame);
$response = $this->rpc->handleMessage($frame->content);

// \error_log('Websocket encoded data: ' .\gzdecode($frame->content));
// \trap();
// $data = $stream->();
if (null !== $response) {
$stream->sendData(Frame::text($response)->__toString());
}
}

$stream->waitData();
Expand Down
2 changes: 1 addition & 1 deletion src/Sender/Websocket/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public function __construct(
public readonly string $type,
public readonly array $payload,
public readonly float $timestamp,
public readonly ?string $project_id = null,
public readonly ?string $projectId = null,
) {
}
}
5 changes: 5 additions & 0 deletions src/Sender/Websocket/EventsStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public function getIterator(): \Traversable
{
return new \ArrayIterator($this->events);
}

public function delete(string $key): void
{
unset($this->events[$key]);
}
}
2 changes: 1 addition & 1 deletion src/Sender/Websocket/FrameHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function handle(Frame $frame): void
new Push(
event: 'event.received',
channel: 'events',
data: [$event],
data: $event,
),
JSON_THROW_ON_ERROR | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE,
),
Expand Down
82 changes: 82 additions & 0 deletions src/Sender/Websocket/RPC.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket;

use Buggregator\Trap\Info;
use Buggregator\Trap\Logger;
use Buggregator\Trap\Support\Uuid;
use JsonSerializable;

/**
* @internal
*/
final class RPC
{
public function __construct(
private readonly Logger $logger,
private readonly EventsStorage $eventsStorage,
) {
}

private static function json_encode(array|JsonSerializable $array)
{
return \json_encode($array, \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_UNICODE | \JSON_INVALID_UTF8_SUBSTITUTE);
}

/**
* @return non-empty-string|null $response
*/
public function handleMessage(string $message): ?string
{
try {
if ($message === '') {
return '';
}

$json = \json_decode($message, true, 512, \JSON_THROW_ON_ERROR);
if (!\is_array($json)) {
return null;
}
$id = $json['id'] ?? 1;

if (isset($json['connect'])) {
return self::json_encode(new RPC\Connected(id: $id, client: Uuid::uuid4()),);
}

if (isset($json['rpc']['method'])) {
$method = $json['rpc']['method'];

$response = $this->callMethod($id, $method);
return $response === null ? null : self::json_encode($response);
}
} catch (\Throwable $e) {
$this->logger->exception($e);
}
return null;
}

private function callMethod(int|string $id, string $initMethod): ?JsonSerializable
{
[$method, $path] = \explode(':', $initMethod, 2);

switch ($method) {
case 'delete':
if (\str_starts_with($path, 'api/event/')) {
$uuid = \substr($path, 10);
$this->eventsStorage->delete($uuid);
return new RPC\Success(id: $id, code: 200, status: true);
}
if (\str_starts_with($path, 'api/events')) {
$this->eventsStorage->clear();
return new RPC\Success(id: $id, code: 200, status: true);
}
break;
default:
$this->logger->error('Unknown RPC method: ' . $initMethod);
}

return null;
}
}
38 changes: 38 additions & 0 deletions src/Sender/Websocket/RPC/Connected.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket\RPC;

use Buggregator\Trap\Info;
use JsonSerializable;

/**
* @internal
*/
final class Connected implements JsonSerializable
{
public function __construct(
public readonly string|int $id,
public readonly string $client,
public readonly int $ping = 25,
public readonly bool $pong = true,
) {
}

public function jsonSerialize(): array
{
return [
'connect' => [
'client' => $this->client,
'ping' => $this->ping,
'pong' => $this->pong,
'subs' => [
'events' => (object)[],
],
'version' => Info::VERSION,
],
'id' => $this->id,
];
}
}
12 changes: 7 additions & 5 deletions src/Sender/Websocket/RPC/Push.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ public function __construct(
public function jsonSerialize(): array
{
return [
'channel' => $this->channel,
'pub' => [
'data' => [
'data' => $this->data,
'event' => $this->event,
'push' => [
'channel' => $this->channel,
'pub' => [
'data' => [
'event' => $this->event,
'data' => $this->data,
],
],
],
];
Expand Down
33 changes: 33 additions & 0 deletions src/Sender/Websocket/RPC/Success.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Sender\Websocket\RPC;

use JsonSerializable;

/**
* @internal
*/
final class Success implements JsonSerializable
{
public function __construct(
public readonly string|int $id,
public readonly int $code,
public readonly bool $status,
) {
}

public function jsonSerialize(): array
{
return [
'id' => $this->id,
'rpc' => [
'data' => [
'code' => $this->code,
'status' => $this->status,
],
],
];
}
}
2 changes: 1 addition & 1 deletion src/Sender/WebsocketSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ public static function create(
?Websocket\ConnectionPool $connectionPool = null,
?Websocket\EventsStorage $eventStorage = null,
): self {
$connectionPool ??= new Websocket\ConnectionPool($logger);
$eventStorage ??= new Websocket\EventsStorage();
$connectionPool ??= new Websocket\ConnectionPool($logger, new Websocket\RPC($logger, $eventStorage));
return new self(
$connectionPool,
$eventStorage,
Expand Down
2 changes: 1 addition & 1 deletion src/Traffic/Websocket/Frame.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public function __toString(): string

return \sprintf(
'%s%s%s',
\chr(128 | $this->opcode->value),
\chr(128 | ($this->rsv1 ? 64 : 0) | $this->opcode->value),
match (true) {
$len < 126 => \chr($len),
$len < 65536 => \pack('Cn', 126, $len),
Expand Down

0 comments on commit 2866627

Please sign in to comment.