From 81172f344d4f1c8277910de2639914e1e8a010cd Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 23 Jan 2024 14:41:42 +0400 Subject: [PATCH 1/4] Added Cancellable and Destroyable interfaces --- src/Application.php | 33 +++++++++- src/Cancellable.php | 15 +++++ src/Destroyable.php | 15 +++++ src/Logger.php | 4 +- src/Socket/Client.php | 34 ++++++---- src/Socket/Exception/ClientDisconnected.php | 12 ++++ ...DisconnectClient.php => ServerStopped.php} | 2 +- src/Socket/Server.php | 65 ++++++++++++++----- 8 files changed, 146 insertions(+), 34 deletions(-) create mode 100644 src/Cancellable.php create mode 100644 src/Destroyable.php create mode 100644 src/Socket/Exception/ClientDisconnected.php rename src/Socket/Exception/{DisconnectClient.php => ServerStopped.php} (67%) diff --git a/src/Application.php b/src/Application.php index 3cd407cf..3afc51ae 100644 --- a/src/Application.php +++ b/src/Application.php @@ -18,7 +18,7 @@ /** * @internal */ -final class Application implements Processable +final class Application implements Processable, Cancellable, Destroyable { /** @var Processable[] */ private array $processors = []; @@ -115,6 +115,32 @@ public function process(array $senders = []): void } } + public function destroy(): void + { + foreach ([...$this->servers, ...$this->processors] as $instance) { + if ($instance instanceof Destroyable) { + $instance->destroy(); + } + } + + $this->servers = []; + $this->processors = []; + $this->fibers = []; + } + + public function cancel(): void + { + foreach ($this->servers as $server) { + $server->cancel(); + } + + while ($this->fibers !== [] || $this->processors !== []) { + echo '.'; + $this->process(); + Fiber::getCurrent() === null or Fiber::suspend(); + } + } + /** * @param Sender[] $senders */ @@ -131,8 +157,9 @@ private function sendBuffer(array $senders = []): void private function createServer(SocketServer $config, Inspector $inspector): Server { - $clientInflector = static function (Client $client, int $id) use ($inspector): Client { - // Logger::debug('New client connected %d', $id); + $logger = $this->logger; + $clientInflector = static function (Client $client, int $id) use ($inspector, $logger): Client { + $logger->debug('Client %d connected', $id); $inspector->addStream(SocketStream::create($client, $id)); return $client; }; diff --git a/src/Cancellable.php b/src/Cancellable.php new file mode 100644 index 00000000..bfdcbdfb --- /dev/null +++ b/src/Cancellable.php @@ -0,0 +1,15 @@ +getFile() . ':' . $e->getLine() . "\033[0m\n"; // Print stack trace using gray $r .= "Stack trace:\n"; - $r .= "\033[90m" . $e->getTraceAsString() . "\033[0m\n"; + // Limit stacktrace to 5 lines + $stack = \explode("\n", $e->getTraceAsString()); + $r .= "\033[90m" . implode("\n", \array_slice($stack, 0, \min(5, \count($stack)))) . "\033[0m\n"; $r .= "\n"; $this->echo($r, !$important); } diff --git a/src/Socket/Client.php b/src/Socket/Client.php index 064c3db6..389d3db2 100644 --- a/src/Socket/Client.php +++ b/src/Socket/Client.php @@ -4,7 +4,9 @@ namespace Buggregator\Trap\Socket; -use Buggregator\Trap\Socket\Exception\DisconnectClient; +use Buggregator\Trap\Destroyable; +use Buggregator\Trap\Socket\Exception\ClientDisconnected; +use Buggregator\Trap\Support\Timer; use Fiber; /** @@ -12,7 +14,7 @@ * * @internal */ -final class Client +final class Client implements Destroyable { /** @var string[] */ private array $writeQueue = []; @@ -33,24 +35,24 @@ private function __construct( $this->setOnClose(fn() => null); } - public function __destruct() + public function destroy(): void { + /** @psalm-suppress RedundantPropertyInitializationCheck */ + isset($this->onClose) and ($this->onClose)(); try { \socket_close($this->socket); } catch (\Throwable) { - // Do nothing. - } finally { - /** @psalm-suppress RedundantPropertyInitializationCheck */ - isset($this->onClose) and ($this->onClose)(); + // do nothing } + // Unlink all closures and free resources. + unset($this->onClose, $this->onPayload); + $this->writeQueue = []; + $this->readBuffer = ''; } - public function close(): void + public function __destruct() { - /** @psalm-suppress RedundantPropertyInitializationCheck */ - isset($this->onClose) and ($this->onClose)(); - // Unlink all closures and free resources. - unset($this->onClose, $this->onPayload, $this->readBuffer); + $this->destroy(); } public function disconnect(): void @@ -96,7 +98,7 @@ public function process(): void } if ($this->toDisconnect && $this->writeQueue === []) { - throw new DisconnectClient(); + throw new ClientDisconnected(); } Fiber::suspend(); } while (true); @@ -126,6 +128,10 @@ public function setOnClose(callable $callable): void public function send(string $payload): void { + if ($this->toDisconnect) { + return; + } + $this->writeQueue[] = $payload; } @@ -147,7 +153,7 @@ private function writeQueue(): void $this->writeQueue = []; - $this->toDisconnect and throw new DisconnectClient(); + $this->toDisconnect and throw new ClientDisconnected(); } private function readMessage(): void diff --git a/src/Socket/Exception/ClientDisconnected.php b/src/Socket/Exception/ClientDisconnected.php new file mode 100644 index 00000000..7690d944 --- /dev/null +++ b/src/Socket/Exception/ClientDisconnected.php @@ -0,0 +1,12 @@ + */ private array $fibers = []; + private bool $cancelled = false; + /** * @param null|Closure(Client, int $id): void $clientInflector * @param positive-int $payloadSize Max payload size. @@ -48,16 +53,27 @@ private function __construct( $logger->status('Application', 'Server started on 127.0.0.1:%s', $port); } - public function __destruct() + public function destroy(): void { + /** @psalm-suppress all */ + foreach ($this->clients ?? [] as $client) { + $client->destroy(); + } + try { - \socket_close($this->socket); - } finally { - foreach ($this->clients as $client) { - $client->close(); + /** @psalm-suppress all */ + if (isset($this->socket)) { + \socket_close($this->socket); } - unset($this->socket, $this->clients, $this->fibers); + } catch (\Throwable) { + // do nothing } + unset($this->socket, $this->clients, $this->fibers); + } + + public function __destruct() + { + $this->destroy(); } /** @@ -76,7 +92,7 @@ public static function init( public function process(): void { - while (false !== ($socket = \socket_accept($this->socket))) { + while (!$this->cancelled and false !== ($socket = \socket_accept($this->socket))) { $client = null; try { $client = Client::init($socket, $this->payloadSize); @@ -84,8 +100,13 @@ public function process(): void $this->clients[$key] = $client; $this->clientInflector !== null and ($this->clientInflector)($client, $key); $this->fibers[$key] = new Fiber($client->process(...)); + /** + * The {@see self::$cancelled} may be changed because of fibers + * @psalm-suppress all + */ + $this->cancelled and $client->disconnect(); } catch (\Throwable) { - $client?->close(); + $client?->destroy(); unset($client); if (isset($key)) { unset($this->clients[$key], $this->fibers[$key]); @@ -98,16 +119,30 @@ public function process(): void $fiber->isStarted() ? $fiber->resume() : $fiber->start(); if ($fiber->isTerminated()) { - throw new RuntimeException('Client terminated.'); + throw new RuntimeException("Client $key terminated."); } } catch (\Throwable $e) { - if ($e instanceof DisconnectClient) { - $this->logger->info('Custom disconnect.'); + if ($e instanceof ClientDisconnected) { + $this->logger->debug('Client %s disconnected', $key); + } else { + $this->logger->exception($e, "Client $key fiber."); } - $this->clients[$key]->close(); - // Logger::exception($e, 'Client fiber.'); + + $this->clients[$key]->destroy(); unset($this->clients[$key], $this->fibers[$key]); } } + + if ($this->cancelled && $this->fibers === []) { + throw new ServerStopped(); + } + } + + public function cancel(): void + { + $this->cancelled = true; + foreach ($this->clients as $client) { + $client->disconnect(); + } } } From 7f71a5f1586beb17a1e76d49c3c8c0a962b7e15f Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 23 Jan 2024 14:42:58 +0400 Subject: [PATCH 2/4] Add waiting for the socket buffer to be flushed before disconnecting --- src/Socket/Client.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Socket/Client.php b/src/Socket/Client.php index 389d3db2..493170dd 100644 --- a/src/Socket/Client.php +++ b/src/Socket/Client.php @@ -98,6 +98,8 @@ public function process(): void } if ($this->toDisconnect && $this->writeQueue === []) { + // Wait for the socket buffer to be flushed. + (new Timer(0.1))->wait(); throw new ClientDisconnected(); } Fiber::suspend(); From 55bebf946447741e9a946df7ec671119e26ab0d4 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 23 Jan 2024 14:43:56 +0400 Subject: [PATCH 3/4] Run command: add signals handling --- src/Command/Run.php | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/src/Command/Run.php b/src/Command/Run.php index 59c82539..30ba4c7b 100644 --- a/src/Command/Run.php +++ b/src/Command/Run.php @@ -11,6 +11,7 @@ use Buggregator\Trap\Sender; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Command\SignalableCommandInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; @@ -24,8 +25,10 @@ name: 'run', description: 'Run application', )] -final class Run extends Command +final class Run extends Command implements SignalableCommandInterface { + private ?Application $app = null; + public function configure(): void { $this->addOption('port', 'p', InputOption::VALUE_OPTIONAL, 'Port to listen', 9912); @@ -54,14 +57,14 @@ protected function execute( $registry = $this->createRegistry($output); - $app = new Application( + $this->app = new Application( [new SocketServer($port)], new Logger($output), senders: $registry->getSenders($senders), withFrontend: $input->getOption('ui') !== false, ); - $app->run(); + $this->app->run(); } catch (\Throwable $e) { if ($output->isVerbose()) { // Write colorful exception (title, message, stacktrace) @@ -93,4 +96,28 @@ public function createRegistry(OutputInterface $output): Sender\SenderRegistry return $registry; } + + public function getSubscribedSignals(): array + { + $result = []; + \defined('SIGINT') and $result[] = \SIGINT; + \defined('SIGTERM') and $result[] = \SIGTERM; + + return $result; + } + + public function handleSignal(int $signal): void + { + if (\defined('SIGINT') && $signal === \SIGINT) { + // todo may be uncommented if it's possible to switch fibers when signal is received + // todo Error: Cannot switch fibers in current execution context + // $this->app?->cancel(); + + $this->app?->destroy(); + } + + if (\defined('SIGTERM') && $signal === \SIGTERM) { + $this->app?->destroy(); + } + } } From e5a6f1a2b431b35d16606ed9dd33bbb384a27f1e Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 23 Jan 2024 14:50:47 +0400 Subject: [PATCH 4/4] Fix psalm issues --- src/Client/TrapHandle/Dumper.php | 2 ++ src/Sender/Console/Renderer/Sentry/Exceptions.php | 3 +++ src/Sender/Console/Renderer/VarDumper.php | 3 +++ src/Socket/Server.php | 1 + 4 files changed, 9 insertions(+) diff --git a/src/Client/TrapHandle/Dumper.php b/src/Client/TrapHandle/Dumper.php index ab9f6650..4f689837 100644 --- a/src/Client/TrapHandle/Dumper.php +++ b/src/Client/TrapHandle/Dumper.php @@ -32,6 +32,7 @@ final class Dumper public static function dump(mixed $var, string|int|null $label = null, int $depth = 0): mixed { + /** @psalm-suppress RiskyTruthyFalsyComparison */ return (self::$handler ??= self::registerHandler())($var, empty($label) ? null : (string)$label, $depth); } @@ -72,6 +73,7 @@ public static function setDumper(?DataDumperInterface $dumper = null): Closure * @return Closure(mixed, string|null, int): mixed * * @author Nicolas Grekas + * @psalm-suppress RiskyTruthyFalsyComparison */ private static function registerHandler(): Closure { diff --git a/src/Sender/Console/Renderer/Sentry/Exceptions.php b/src/Sender/Console/Renderer/Sentry/Exceptions.php index 2586f80e..89e46285 100644 --- a/src/Sender/Console/Renderer/Sentry/Exceptions.php +++ b/src/Sender/Console/Renderer/Sentry/Exceptions.php @@ -58,6 +58,8 @@ public static function render(OutputInterface $output, mixed $exceptions): void /** * Renders the trace of the exception. + * + * @psalm-suppress RiskyTruthyFalsyComparison */ private static function renderTrace(OutputInterface $output, array $frames, bool $verbose = false): void { @@ -82,6 +84,7 @@ private static function renderTrace(OutputInterface $output, array $frames, bool $file = $getValue($frame, 'filename'); $line = $getValue($frame, 'lineno', null); $class = $getValue($frame, 'class'); + /** @psalm-suppress RiskyTruthyFalsyComparison */ $class = empty($class) ? '' : $class . '::'; $function = $getValue($frame, 'function'); diff --git a/src/Sender/Console/Renderer/VarDumper.php b/src/Sender/Console/Renderer/VarDumper.php index 983b5b54..dd6b8e06 100644 --- a/src/Sender/Console/Renderer/VarDumper.php +++ b/src/Sender/Console/Renderer/VarDumper.php @@ -57,6 +57,9 @@ public function __construct( ) { } + /** + * @psalm-suppress RiskyTruthyFalsyComparison + */ public function describe(OutputInterface $output, Data $data, array $context, int $clientId): void { Common::renderHeader1($output, 'DUMP'); diff --git a/src/Socket/Server.php b/src/Socket/Server.php index ca8d2c87..7cb8daeb 100644 --- a/src/Socket/Server.php +++ b/src/Socket/Server.php @@ -95,6 +95,7 @@ public function process(): void while (!$this->cancelled and false !== ($socket = \socket_accept($this->socket))) { $client = null; try { + /** @psalm-suppress MixedArgument */ $client = Client::init($socket, $this->payloadSize); $key = (int)\array_key_last($this->clients) + 1; $this->clients[$key] = $client;