Skip to content

Commit

Permalink
Merge pull request #42: fix fast connection closing
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk authored Jan 23, 2024
2 parents 929ce1f + e5a6f1a commit 62763e1
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 37 deletions.
33 changes: 30 additions & 3 deletions src/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/**
* @internal
*/
final class Application implements Processable
final class Application implements Processable, Cancellable, Destroyable
{
/** @var Processable[] */
private array $processors = [];
Expand Down Expand Up @@ -115,6 +115,32 @@ public function process(array $senders = []): void
}
}

public function destroy(): void
{
foreach ([...$this->servers, ...$this->processors] as $instance) {
if ($instance instanceof Destroyable) {
$instance->destroy();
}
}

$this->servers = [];
$this->processors = [];
$this->fibers = [];
}

public function cancel(): void
{
foreach ($this->servers as $server) {
$server->cancel();
}

while ($this->fibers !== [] || $this->processors !== []) {
echo '.';
$this->process();
Fiber::getCurrent() === null or Fiber::suspend();
}
}

/**
* @param Sender[] $senders
*/
Expand All @@ -131,8 +157,9 @@ private function sendBuffer(array $senders = []): void

private function createServer(SocketServer $config, Inspector $inspector): Server
{
$clientInflector = static function (Client $client, int $id) use ($inspector): Client {
// Logger::debug('New client connected %d', $id);
$logger = $this->logger;
$clientInflector = static function (Client $client, int $id) use ($inspector, $logger): Client {
$logger->debug('Client %d connected', $id);
$inspector->addStream(SocketStream::create($client, $id));
return $client;
};
Expand Down
15 changes: 15 additions & 0 deletions src/Cancellable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap;

/**
* The implementation can be canceled safely.
*
* @internal
*/
interface Cancellable
{
public function cancel(): void;
}
2 changes: 2 additions & 0 deletions src/Client/TrapHandle/Dumper.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class Dumper

public static function dump(mixed $var, string|int|null $label = null, int $depth = 0): mixed
{
/** @psalm-suppress RiskyTruthyFalsyComparison */
return (self::$handler ??= self::registerHandler())($var, empty($label) ? null : (string)$label, $depth);
}

Expand Down Expand Up @@ -72,6 +73,7 @@ public static function setDumper(?DataDumperInterface $dumper = null): Closure
* @return Closure(mixed, string|null, int): mixed
*
* @author Nicolas Grekas <p@tchwork.com>
* @psalm-suppress RiskyTruthyFalsyComparison
*/
private static function registerHandler(): Closure
{
Expand Down
33 changes: 30 additions & 3 deletions src/Command/Run.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Buggregator\Trap\Sender;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
Expand All @@ -24,8 +25,10 @@
name: 'run',
description: 'Run application',
)]
final class Run extends Command
final class Run extends Command implements SignalableCommandInterface
{
private ?Application $app = null;

public function configure(): void
{
$this->addOption('port', 'p', InputOption::VALUE_OPTIONAL, 'Port to listen', 9912);
Expand Down Expand Up @@ -54,14 +57,14 @@ protected function execute(

$registry = $this->createRegistry($output);

$app = new Application(
$this->app = new Application(
[new SocketServer($port)],
new Logger($output),
senders: $registry->getSenders($senders),
withFrontend: $input->getOption('ui') !== false,
);

$app->run();
$this->app->run();
} catch (\Throwable $e) {
if ($output->isVerbose()) {
// Write colorful exception (title, message, stacktrace)
Expand Down Expand Up @@ -93,4 +96,28 @@ public function createRegistry(OutputInterface $output): Sender\SenderRegistry

return $registry;
}

public function getSubscribedSignals(): array
{
$result = [];
\defined('SIGINT') and $result[] = \SIGINT;
\defined('SIGTERM') and $result[] = \SIGTERM;

return $result;
}

public function handleSignal(int $signal): void
{
if (\defined('SIGINT') && $signal === \SIGINT) {
// todo may be uncommented if it's possible to switch fibers when signal is received
// todo Error: Cannot switch fibers in current execution context
// $this->app?->cancel();

$this->app?->destroy();
}

if (\defined('SIGTERM') && $signal === \SIGTERM) {
$this->app?->destroy();
}
}
}
15 changes: 15 additions & 0 deletions src/Destroyable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap;

/**
* Should be used to destroy objects and free resources.
*
* @internal
*/
interface Destroyable
{
public function destroy(): void;
}
4 changes: 3 additions & 1 deletion src/Logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public function exception(Throwable $e, ?string $header = null, bool $important
$r .= "In \033[3;32m" . $e->getFile() . ':' . $e->getLine() . "\033[0m\n";
// Print stack trace using gray
$r .= "Stack trace:\n";
$r .= "\033[90m" . $e->getTraceAsString() . "\033[0m\n";
// Limit stacktrace to 5 lines
$stack = \explode("\n", $e->getTraceAsString());
$r .= "\033[90m" . implode("\n", \array_slice($stack, 0, \min(5, \count($stack)))) . "\033[0m\n";
$r .= "\n";
$this->echo($r, !$important);
}
Expand Down
3 changes: 3 additions & 0 deletions src/Sender/Console/Renderer/Sentry/Exceptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public static function render(OutputInterface $output, mixed $exceptions): void

/**
* Renders the trace of the exception.
*
* @psalm-suppress RiskyTruthyFalsyComparison
*/
private static function renderTrace(OutputInterface $output, array $frames, bool $verbose = false): void
{
Expand All @@ -82,6 +84,7 @@ private static function renderTrace(OutputInterface $output, array $frames, bool
$file = $getValue($frame, 'filename');
$line = $getValue($frame, 'lineno', null);
$class = $getValue($frame, 'class');
/** @psalm-suppress RiskyTruthyFalsyComparison */
$class = empty($class) ? '' : $class . '::';
$function = $getValue($frame, 'function');

Expand Down
3 changes: 3 additions & 0 deletions src/Sender/Console/Renderer/VarDumper.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public function __construct(
) {
}

/**
* @psalm-suppress RiskyTruthyFalsyComparison
*/
public function describe(OutputInterface $output, Data $data, array $context, int $clientId): void
{
Common::renderHeader1($output, 'DUMP');
Expand Down
36 changes: 22 additions & 14 deletions src/Socket/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@

namespace Buggregator\Trap\Socket;

use Buggregator\Trap\Socket\Exception\DisconnectClient;
use Buggregator\Trap\Destroyable;
use Buggregator\Trap\Socket\Exception\ClientDisconnected;
use Buggregator\Trap\Support\Timer;
use Fiber;

/**
* Client state on the server side.
*
* @internal
*/
final class Client
final class Client implements Destroyable
{
/** @var string[] */
private array $writeQueue = [];
Expand All @@ -33,24 +35,24 @@ private function __construct(
$this->setOnClose(fn() => null);
}

public function __destruct()
public function destroy(): void
{
/** @psalm-suppress RedundantPropertyInitializationCheck */
isset($this->onClose) and ($this->onClose)();
try {
\socket_close($this->socket);
} catch (\Throwable) {
// Do nothing.
} finally {
/** @psalm-suppress RedundantPropertyInitializationCheck */
isset($this->onClose) and ($this->onClose)();
// do nothing
}
// Unlink all closures and free resources.
unset($this->onClose, $this->onPayload);
$this->writeQueue = [];
$this->readBuffer = '';
}

public function close(): void
public function __destruct()
{
/** @psalm-suppress RedundantPropertyInitializationCheck */
isset($this->onClose) and ($this->onClose)();
// Unlink all closures and free resources.
unset($this->onClose, $this->onPayload, $this->readBuffer);
$this->destroy();
}

public function disconnect(): void
Expand Down Expand Up @@ -96,7 +98,9 @@ public function process(): void
}

if ($this->toDisconnect && $this->writeQueue === []) {
throw new DisconnectClient();
// Wait for the socket buffer to be flushed.
(new Timer(0.1))->wait();
throw new ClientDisconnected();
}
Fiber::suspend();
} while (true);
Expand Down Expand Up @@ -126,6 +130,10 @@ public function setOnClose(callable $callable): void

public function send(string $payload): void
{
if ($this->toDisconnect) {
return;
}

$this->writeQueue[] = $payload;
}

Expand All @@ -147,7 +155,7 @@ private function writeQueue(): void

$this->writeQueue = [];

$this->toDisconnect and throw new DisconnectClient();
$this->toDisconnect and throw new ClientDisconnected();
}

private function readMessage(): void
Expand Down
12 changes: 12 additions & 0 deletions src/Socket/Exception/ClientDisconnected.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Buggregator\Trap\Socket\Exception;

/**
* @internal
*/
class ClientDisconnected extends \RuntimeException
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
/**
* @internal
*/
class DisconnectClient extends \RuntimeException
class ServerStopped extends \RuntimeException
{
}
Loading

0 comments on commit 62763e1

Please sign in to comment.