From bfd572b93add024c02370bd43a203a19e96343a5 Mon Sep 17 00:00:00 2001 From: dmitry krokhin Date: Mon, 11 Mar 2024 20:10:45 +0300 Subject: [PATCH] refactor connection management --- .github/workflows/tests.yml | 2 + README.md | 42 ++-- phpunit.xml.dist | 2 + src/Client.php | 330 ++------------------------ src/Connection.php | 271 +++++++++++++++++++++ src/Consumer/Consumer.php | 2 +- src/Message/Factory.php | 47 ++-- src/Message/Info.php | 35 +-- src/Message/Msg.php | 5 +- src/Message/Ok.php | 13 + src/Message/Payload.php | 2 +- src/Message/Prototype.php | 9 +- tests/Functional/ClientTest.php | 14 +- tests/Functional/StreamTest.php | 6 +- tests/Functional/SubjectTest.php | 2 +- tests/FunctionalTestCase.php | 2 + tests/Performance/PerformanceTest.php | 4 +- 17 files changed, 411 insertions(+), 377 deletions(-) create mode 100644 src/Connection.php create mode 100644 src/Message/Ok.php diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a7cda00..95239cd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,11 +9,13 @@ jobs: strategy: matrix: php: ["8.1", "8.2", "8.3"] + verbose: ['0', '1'] steps: - uses: actions/checkout@v2 - uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php }} + NATS_CLIENT_VERBOSE: ${{ matrix.verbose }} - uses: php-actions/composer@v6 - run: docker compose up -d working-directory: docker diff --git a/README.md b/README.md index 252aa4c..b0ac6c7 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ The recommended way to install the library is through [Composer](http://getcompo $ composer require basis-company/nats ``` -The NKeys functionality requires Ed25519, which is provided in `libsodium` extension or `sodium_compat` package. +The NKeys functionality requires Ed25519, which is provided in `libsodium` extension or `sodium_compat` package. ## Connection ```php @@ -63,9 +63,9 @@ $client->ping(); // true ``` ### Connecting to a cluster with TLS enabled -Typically, when connecting to a cluster with TLS enabled the connection settings do not change. The client lib will automatically switch over to TLS 1.2. However, if you're using a self-signed certificate you may have to point to your local CA file using the tlsCaFile setting. +Typically, when connecting to a cluster with TLS enabled the connection settings do not change. The client lib will automatically switch over to TLS 1.2. However, if you're using a self-signed certificate you may have to point to your local CA file using the tlsCaFile setting. -When connecting to a nats cluster that requires the client to provide TLS certificates use the tlsCertFile and tlsKeyFile to point at your local TLS certificate and private key file. +When connecting to a nats cluster that requires the client to provide TLS certificates use the tlsCertFile and tlsKeyFile to point at your local TLS certificate and private key file. Nats Server documentation for: - [Enabling TLS](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/tls) @@ -191,7 +191,7 @@ $goodbyer // batch will be processed to the end and the handling would be stopped // $goodbyer->interrupt(); }); - + // you also can create ephemeral consumer // the only thing that ephemeral consumer is created as soon as object is created // you have to create full consumer configuration first @@ -201,7 +201,7 @@ use Basis\Nats\Consumer\DeliverPolicy; $configuration = (new ConsumerConfiguration($stream->getName())) ->setDeliverPolicy(DeliverPolicy::NEW) ->setSubjectFilter('mailer.greet'); - + $ephemeralConsumer = $stream->createEphemeralConsumer($configuration); // now you can use ephemeral consumer in the same way as durable consumer @@ -210,12 +210,12 @@ $ephemeralConsumer->handle(function ($address) { }); // the only difference - you don't have to remove it manually, it will be deleted by NATS when socket connection is closed -// be aware that NATS will not remove that consumer immediately, process can take few seconds +// be aware that NATS will not remove that consumer immediately, process can take few seconds var_dump( $ephemeralConsumer->getName(), $ephemeralConsumer->info(), ); - + // if you need to append some headers, construct payload manually use Basis\Nats\Message\Payload; @@ -224,7 +224,7 @@ $payload = new Payload('nekufa@gmail.com', [ ]); $stream->put('mailer.bye', $payload); - + ``` ## Key Value Storage @@ -292,32 +292,34 @@ Installing dependencies from lock file (including require-dev) % export NATS_PORT=4222 % export NATS_CLIENT_LOG=1 % composer run perf-test -PHPUnit 9.5.10 by Sebastian Bergmann and contributors. +PHPUnit 9.5.28 by Sebastian Bergmann and contributors. -Runtime: PHP 8.1.1 +Runtime: PHP 8.3.3-1+ubuntu22.04.1+deb.sury.org+1 Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist Warning: No code coverage driver available -[2022-01-19T10:42:14.008230+00:00] SubjectTest.testPerformance.INFO: start performance test [] [] -[2022-01-19T10:42:14.246606+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":421871.0,"length":100000,"time":0.23703885078430176} [] -[2022-01-19T10:42:14.530670+00:00] SubjectTest.testPerformance.INFO: processing {"rps":355120.0,"length":100000,"time":0.2839939594268799} [] +[2024-03-11T17:06:30.907277+00:00] PerformanceTest.testPerformance.DEBUG: send CONNECT {"headers":true,"pedantic":false,"verbose":false,"lang":"php","version":"dev"} [] [] +[2024-03-11T17:06:30.907312+00:00] PerformanceTest.testPerformance.INFO: start performance test [] [] +[2024-03-11T17:06:32.135674+00:00] PerformanceTest.testPerformance.INFO: publishing {"rps":407072.0,"length":500000,"time":1.228281021118164} [] +[2024-03-11T17:06:33.412134+00:00] PerformanceTest.testPerformance.INFO: processing {"rps":391723.0,"length":500000,"time":1.2764089107513428} [] % export NATS_CLIENT_VERBOSE=1 % composer run perf-test -PHPUnit 9.5.10 by Sebastian Bergmann and contributors. +PHPUnit 9.5.28 by Sebastian Bergmann and contributors. -Runtime: PHP 8.1.1 +Runtime: PHP 8.3.3-1+ubuntu22.04.1+deb.sury.org+1 Configuration: /home/nekufa/software/github/nats.php/phpunit.xml.dist Warning: No code coverage driver available -[2022-01-19T10:42:21.319838+00:00] SubjectTest.testPerformance.INFO: start performance test [] [] -[2022-01-19T10:42:21.766501+00:00] SubjectTest.testPerformance.INFO: publishing {"rps":224640.0,"length":100000,"time":0.4451560974121094} [] -[2022-01-19T10:42:21.922010+00:00] SubjectTest.testPerformance.INFO: processing {"rps":353317.0,"length":100000,"time":0.15544414520263672} [] +[2024-03-11T17:07:12.829135+00:00] PerformanceTest.testPerformance.DEBUG: send CONNECT {"headers":true,"pedantic":false,"verbose":true,"lang":"php","version":"dev"} [] [] +[2024-03-11T17:07:12.829172+00:00] PerformanceTest.testPerformance.INFO: start performance test [] [] +[2024-03-11T17:07:14.069321+00:00] PerformanceTest.testPerformance.INFO: publishing {"rps":403207.0,"length":500000,"time":1.2400550842285156} [] +[2024-03-11T17:07:15.548207+00:00] PerformanceTest.testPerformance.INFO: processing {"rps":338104.0,"length":500000,"time":1.4788329601287842} [] . 1 / 1 (100%) -nekufa@fasiga ~ % cat /proc/cpuinfo | grep i5 -model name : Intel(R) Core(TM) i5-4670K CPU @ 3.40GHz +nekufa@fasiga ~ % cat /proc/cpuinfo | grep AMD +model name : AMD Ryzen 5 3600X 6-Core Processor ``` ## Configuration Options diff --git a/phpunit.xml.dist b/phpunit.xml.dist index c8c1c7e..7e1e052 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -15,6 +15,8 @@ + + diff --git a/src/Client.php b/src/Client.php index f33060d..aae3527 100644 --- a/src/Client.php +++ b/src/Client.php @@ -4,14 +4,8 @@ namespace Basis\Nats; -use Basis\Nats\Message\Connect; -use Basis\Nats\Message\Factory; -use Basis\Nats\Message\Info; use Basis\Nats\Message\Msg; use Basis\Nats\Message\Payload; -use Basis\Nats\Message\Ping; -use Basis\Nats\Message\Pong; -use Basis\Nats\Message\Prototype; use Basis\Nats\Message\Publish; use Basis\Nats\Message\Subscribe; use Basis\Nats\Message\Unsubscribe; @@ -19,23 +13,14 @@ use Exception; use LogicException; use Psr\Log\LoggerInterface; -use Throwable; class Client { - public Connect $connect; - public Info $info; public readonly Api $api; - private readonly ?Authenticator $authenticator; + private string $name = ''; - private $socket; - private $context; private array $handlers = []; - private float $ping = 0; - private float $pong = 0; - private ?float $lastDataReadFailureAt = null; - private string $name = ''; private array $subscriptions = []; private bool $skipInvalidMessages = false; @@ -43,10 +28,12 @@ class Client public function __construct( public readonly Configuration $configuration = new Configuration(), public ?LoggerInterface $logger = null, + public ?Connection $connection = null, ) { $this->api = new Api($this); - - $this->authenticator = Authenticator::create($this->configuration); + if (!$connection) { + $this->connection = new Connection(client: $this, logger: $logger); + } } public function api($command, array $args = [], ?Closure $callback = null): ?object @@ -71,46 +58,6 @@ public function api($command, array $args = [], ?Closure $callback = null): ?obj return $result; } - /** - * @return $this - * @throws Throwable - */ - public function connect(): self - { - if ($this->socket) { - return $this; - } - - $config = $this->configuration; - - $dsn = "$config->host:$config->port"; - $flags = STREAM_CLIENT_CONNECT; - $this->context = stream_context_create(); - $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context); - - if ($error || !$this->socket) { - throw new Exception($errorMessage ?: "Connection error", $error); - } - - $this->setTimeout($config->timeout); - - $this->connect = new Connect($config->getOptions()); - - if ($this->name) { - $this->connect->name = $this->name; - } - - $this->info = $this->process($config->timeout); - if (isset($this->info->nonce) && $this->authenticator) { - $this->connect->sig = $this->authenticator->sign($this->info->nonce); - $this->connect->nkey = $this->authenticator->getPublicKey(); - } - - $this->send($this->connect); - - return $this; - } - public function dispatch(string $name, mixed $payload, ?float $timeout = null) { if ($timeout === null) { @@ -146,22 +93,18 @@ public function getApi(): Api public function ping(): bool { - $this->ping = microtime(true); - $this->send(new Ping([])); - $this->process($this->configuration->timeout); - $result = $this->ping <= $this->pong; - $this->ping = 0; - - return $result; + return $this->connection->ping(); } public function publish(string $name, mixed $payload, ?string $replyTo = null): self { - return $this->send(new Publish([ + $this->connection->sendMessage(new Publish([ 'payload' => Payload::parse($payload), 'replyTo' => $replyTo, 'subject' => $name, ])); + + return $this; } public function request(string $name, mixed $payload, Closure $handler): self @@ -194,7 +137,7 @@ public function unsubscribe(string $name): self foreach ($this->subscriptions as $i => $subscription) { if ($subscription['name'] == $name) { unset($this->subscriptions[$i]); - $this->send(new Unsubscribe(['sid' => $subscription['sid']])); + $this->connection->sendMessage(new Unsubscribe(['sid' => $subscription['sid']])); unset($this->handlers[$subscription['sid']]); } } @@ -202,6 +145,11 @@ public function unsubscribe(string $name): self return $this; } + public function getSubscriptions(): array + { + return $this->subscriptions; + } + public function setDelay(float $delay, string $mode = Configuration::DELAY_CONSTANT): self { $this->configuration->setDelay($delay, $mode); @@ -214,126 +162,9 @@ public function setLogger(?LoggerInterface $logger): self return $this; } - public function setTimeout(float $value): self - { - $this->connect(); - $seconds = (int) floor($value); - $milliseconds = (int) (1000 * ($value - $seconds)); - - stream_set_timeout($this->socket, $seconds, $milliseconds); - - return $this; - } - - /** - * @throws Throwable - */ - public function process(null|int|float $timeout = 0, bool $reply = true, bool $checkTimeout = true) - { - $this->lastDataReadFailureAt = null; - $max = microtime(true) + $timeout; - $ping = time() + $this->configuration->pingInterval; - - $iteration = 0; - while (true) { - try { - $line = $this->readLine(1024, "\r\n", $checkTimeout); - - if ($line && ($this->ping || trim($line) != 'PONG') && ($this->pong || trim($line) != 'PING')) { - break; - } - if ($line === false && $ping < time()) { - try { - $this->send(new Ping([])); - $line = $this->readLine(1024, "\r\n"); - $ping = time() + $this->configuration->pingInterval; - if ($line && ($this->ping || trim($line) != 'PONG') && ($this->pong || trim($line) != 'PING')) { - break; - } - } catch (Throwable $e) { - if ($this->ping) { - return; - } - $this->processSocketException($e); - } - } - $now = microtime(true); - if ($now >= $max) { - return null; - } - $this->logger?->debug('sleep', compact('max', 'now')); - $this->configuration->delay($iteration++); - } catch (Throwable $e) { - $this->processSocketException($e); - } - } - - switch (trim($line)) { - case 'PING': - $this->logger?->debug('receive ' . $line); - $this->send(new Pong([])); - $now = microtime(true); - if ($now >= $max) { - return null; - } - return $this->process($max - $now, $reply, $checkTimeout); - - case 'PONG': - $this->logger?->debug('receive ' . $line); - return $this->pong = microtime(true); - - case '+OK': - return $this->logger?->debug('receive ' . $line); - } - - try { - $message = Factory::create(trim($line)); - } catch (Throwable $exception) { - $this->logger?->debug($line); - throw $exception; - } - - $payload = ''; - if ($message instanceof Msg) { - if ($message->length) { - $iteration = 0; - while (strlen($payload) < $message->length) { - $payloadLine = $this->readLine($message->length, '', false); - if (!$payloadLine) { - if ($iteration > 16) { - $this->processSocketException( - new LogicException("No payload for message $message->sid") - ); - break; - } - $this->configuration->delay($iteration++); - continue; - } - if (strlen($payloadLine) != $message->length) { - $this->logger?->debug( - 'got ' . strlen($payloadLine) . '/' . $message->length . ': ' . $payloadLine - ); - } - $payload .= $payloadLine; - } - } - $message->parse($payload); - } - - $this->logger?->debug('receive ' . $line . $payload); - return $this->onMessage($message, $reply); - } - - protected function onMessage(Prototype $message, bool $reply) + public function process(null|int|float $timeout = 0, bool $reply = true) { - if ($message instanceof Info) { - if (isset($message->tls_verify) && $message->tls_verify) { - $this->enableTls(true); - } elseif (isset($message->tls_required) && $message->tls_required) { - $this->enableTls(false); - } - return $message; - } + $message = $this->connection->getMessage($timeout); if ($message instanceof Msg) { if (!array_key_exists($message->sid, $this->handlers)) { @@ -347,53 +178,17 @@ protected function onMessage(Prototype $message, bool $reply) $this->publish($message->replyTo, $result); } return $result; - } - - return null; - } - - /** - * - * - * @throws Exception - */ - private function enableTls(bool $requireClientCert): void - { - if ($requireClientCert) { - if (!empty($this->configuration->tlsKeyFile)) { - if (!file_exists($this->configuration->tlsKeyFile)) { - throw new Exception("tlsKeyFile file does not exist: " . $this->configuration->tlsKeyFile); - } - stream_context_set_option($this->context, 'ssl', 'local_pk', $this->configuration->tlsKeyFile); - } - if (!empty($this->configuration->tlsCertFile)) { - if (!file_exists($this->configuration->tlsCertFile)) { - throw new Exception("tlsCertFile file does not exist: " . $this->configuration->tlsCertFile); - } - stream_context_set_option($this->context, 'ssl', 'local_cert', $this->configuration->tlsCertFile); - } - } - - if (!empty($this->configuration->tlsCaFile)) { - if (!file_exists($this->configuration->tlsCaFile)) { - throw new Exception("tlsCaFile file does not exist: " . $this->configuration->tlsCaFile); - } - stream_context_set_option($this->context, 'ssl', 'cafile', $this->configuration->tlsCaFile); - } - - if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) { - throw new Exception('Failed to connect: Error enabling TLS'); + } else { + return $message; } } - private function doSubscribe(string $subject, ?string $group, Closure $handler): self { $sid = bin2hex(random_bytes(4)); - $this->handlers[$sid] = $handler; - $this->send(new Subscribe([ + $this->connection->sendMessage(new Subscribe([ 'sid' => $sid, 'subject' => $subject, 'group' => $group, @@ -407,74 +202,20 @@ private function doSubscribe(string $subject, ?string $group, Closure $handler): return $this; } - private function processSocketException(Throwable $e): self + public function getName(): string { - if (!$this->configuration->reconnect) { - $this->logger?->error($e->getMessage()); - throw $e; - } - - $iteration = 0; - - while (true) { - try { - $this->socket = null; - $this->connect(); - } catch (Throwable $e) { - $this->configuration->delay($iteration++); - continue; - } - break; - } - - foreach ($this->subscriptions as $i => $subscription) { - $this->send(new Subscribe([ - 'sid' => $subscription['sid'], - 'subject' => $subscription['name'], - ])); - } - return $this; + return $this->name; } - private function send(Prototype $message): self + public function setName(string $name): self { - $this->connect(); - - $line = $message->render() . "\r\n"; - $length = strlen($line); - - $this->logger?->debug('send ' . $line); - - while (strlen($line)) { - try { - $written = @fwrite($this->socket, $line, 1024); - if ($written === false) { - throw new LogicException('Error sending data'); - } - if ($written === 0) { - throw new LogicException('Broken pipe or closed connection'); - } - if ($length == $written) { - break; - } - $line = substr($line, $written); - } catch (Throwable $e) { - $this->processSocketException($e); - $line = $message->render() . "\r\n"; - } - } - - if ($this->configuration->verbose && $line !== "PING\r\n") { - // get feedback - $this->process($this->configuration->timeout); - } - + $this->name = $name; return $this; } - public function setName(string $name): self + public function setTimeout(float $value): self { - $this->name = $name; + $this->connection->setTimeout($value); return $this; } @@ -483,23 +224,4 @@ public function skipInvalidMessages(bool $skipInvalidMessages): self $this->skipInvalidMessages = $skipInvalidMessages; return $this; } - - private function readLine(int $length, string $ending = '', bool $checkTimeout = true): string|bool - { - $line = stream_get_line($this->socket, $length, $ending); - if ($line || !$checkTimeout) { - $this->lastDataReadFailureAt = null; - return $line; - } - - $now = microtime(true); - $this->lastDataReadFailureAt = $this->lastDataReadFailureAt ?? $now; - $timeWithoutDataRead = $now - $this->lastDataReadFailureAt; - - if ($timeWithoutDataRead > $this->configuration->timeout) { - throw new LogicException('Socket read timeout'); - } - - return false; - } } diff --git a/src/Connection.php b/src/Connection.php new file mode 100644 index 0000000..8853721 --- /dev/null +++ b/src/Connection.php @@ -0,0 +1,271 @@ +authenticator = Authenticator::create($client->configuration); + $this->config = $client->configuration; + } + + public function getConnectMessage(): Connect + { + return $this->connectMessage; + } + + public function getInfoMessage(): Info + { + return $this->infoMessage; + } + + public function getMessage(null|int|float $timeout = 0): ?Message + { + $now = microtime(true); + $max = $now + $timeout; + $iteration = 0; + + while ($now <= $max) { + $line = stream_get_line($this->socket, 1024, "\r\n"); + $now = microtime(true); + if ($line) { + $message = Factory::create($line); + $this->activityAt = microtime(true); + if ($message instanceof Msg) { + $payload = $this->getPayload($message->length); + $message->parse($payload); + $this->logger?->debug('receive ' . $line . $payload); + return $message; + } + $this->logger?->debug('receive ' . $line); + if ($message instanceof Ok) { + continue; + } elseif ($message instanceof Ping) { + $this->sendMessage(new Pong([])); + } elseif ($message instanceof Pong) { + $this->pongAt = microtime(true); + } elseif ($message instanceof Info) { + if (isset($message->tls_verify) && $message->tls_verify) { + $this->enableTls(true); + } elseif (isset($message->tls_required) && $message->tls_required) { + $this->enableTls(false); + } + return $message; + } + } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < microtime(true)) { + if ($this->pingAt + $this->config->pingInterval < microtime(true)) { + $this->sendMessage(new Ping()); + } + } elseif ($now < $max) { + $this->logger?->debug('sleep', compact('max', 'now')); + $this->config->delay($iteration++); + } + } + + if ($this->activityAt + $this->config->timeout < microtime(true)) { + $this->processException(new LogicException('Socket read timeout')); + } + + return null; + } + + public function ping(): bool + { + $this->sendMessage(new Ping()); + $this->getMessage($this->config->timeout); + + return $this->pingAt <= $this->pongAt; + } + + public function sendMessage(Message $message) + { + $this->init(); + + $line = $message->render() . "\r\n"; + $length = strlen($line); + + $this->logger?->debug('send ' . $line); + + while (strlen($line)) { + try { + $written = @fwrite($this->socket, $line, 1024); + if ($written === false) { + throw new LogicException('Error sending data'); + } + if ($written === 0) { + throw new LogicException('Broken pipe or closed connection'); + } + if ($length == $written) { + break; + } + $line = substr($line, $written); + } catch (Throwable $e) { + $this->processException($e); + $line = $message->render() . "\r\n"; + } + } + + if ($message instanceof Ping) { + $this->pingAt = microtime(true); + } + } + + public function setLogger(?LoggerInterface $logger) + { + $this->logger = $logger; + } + + public function setTimeout(float $value) + { + $this->init(); + $seconds = (int) floor($value); + $milliseconds = (int) (1000 * ($value - $seconds)); + + stream_set_timeout($this->socket, $seconds, $milliseconds); + } + + protected function init() + { + if ($this->socket) { + return $this; + } + + $config = $this->config; + $dsn = "$config->host:$config->port"; + $flags = STREAM_CLIENT_CONNECT; + $this->context = stream_context_create(); + $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context); + + if ($error || !$this->socket) { + throw new Exception($errorMessage ?: "Connection error", $error); + } + + $this->setTimeout($config->timeout); + + $this->connectMessage = new Connect($config->getOptions()); + + if ($this->client->getName()) { + $this->connectMessage->name = $this->client->getName(); + } + + $this->infoMessage = $this->getMessage($config->timeout); + if (isset($this->infoMessage->nonce) && $this->authenticator) { + $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce); + $this->connectMessage->nkey = $this->authenticator->getPublicKey(); + } + + $this->sendMessage($this->connectMessage); + } + + protected function enableTls(bool $requireClientCert): void + { + if ($requireClientCert) { + if (!empty($this->config->tlsKeyFile)) { + if (!file_exists($this->config->tlsKeyFile)) { + throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile); + } + stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile); + } + if (!empty($this->config->tlsCertFile)) { + if (!file_exists($this->config->tlsCertFile)) { + throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile); + } + stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile); + } + } + + if (!empty($this->config->tlsCaFile)) { + if (!file_exists($this->config->tlsCaFile)) { + throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile); + } + stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile); + } + + if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) { + throw new Exception('Failed to connect: Error enabling TLS'); + } + } + + protected function getPayload(int $length): string + { + $payload = ''; + $iteration = 0; + while (strlen($payload) < $length) { + $payloadLine = stream_get_line($this->socket, $length, ''); + if (!$payloadLine) { + if ($iteration > 16) { + break; + } + $this->config->delay($iteration++); + continue; + } + if (strlen($payloadLine) != $length) { + $this->logger?->debug( + 'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine + ); + } + $payload .= $payloadLine; + } + return $payload; + } + + private function processException(Throwable $e) + { + $this->logger?->error($e->getMessage()); + + if (!$this->config->reconnect) { + throw $e; + } + + $iteration = 0; + + while (true) { + try { + $this->socket = null; + $this->init(); + } catch (Throwable $e) { + $this->config->delay($iteration++); + continue; + } + break; + } + + foreach ($this->client->getSubscriptions() as $subscription) { + $this->sendMessage(new Subscribe([ + 'sid' => $subscription['sid'], + 'subject' => $subscription['name'], + ])); + } + } +} diff --git a/src/Consumer/Consumer.php b/src/Consumer/Consumer.php index 3e182fd..4dc4ec0 100644 --- a/src/Consumer/Consumer.php +++ b/src/Consumer/Consumer.php @@ -144,7 +144,7 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack $runtime->empty = true; // expires request means that we should receive answer from stream // consumer timeout can be more that client connection timeout - $this->client->process($this->expires ? PHP_INT_MAX : null, $ack, false); + $this->client->process($this->expires ? PHP_INT_MAX : null, $ack); if ($runtime->empty) { if ($emptyHandler) { diff --git a/src/Message/Factory.php b/src/Message/Factory.php index fdeb305..02670ff 100644 --- a/src/Message/Factory.php +++ b/src/Message/Factory.php @@ -14,25 +14,36 @@ private function __construct() public static function create(string $line): Prototype { - if (!str_contains($line, ' ')) { - throw new LogicException("Parse message failure: $line"); + $message = match ($line) { + '+OK' => new Ok(), + 'PING' => new Ping(), + 'PONG' => new Pong(), + default => null, + }; + + if ($message == null) { + if (!str_contains($line, ' ')) { + throw new LogicException("Parse message failure: $line"); + } + + [$type, $body] = explode(' ', $line, 2); + + if ($type == '-ERR') { + $message = trim($body, "'"); + throw new LogicException($message); + } + + $message = match ($type) { + 'CONNECT' => Connect::create($body), + 'INFO' => Info::create($body), + 'PUBLISH' => Publish::create($body), + 'SUBSCRIBE' => Subscribe::create($body), + 'UNSUBSCRIBE' => Unsubscribe::create($body), + 'HMSG' => Msg::create($body), + 'MSG' => Msg::create($body), + }; } - [$type, $body] = explode(' ', $line, 2); - - $nick = ucfirst(strtolower($type)); - - if ($nick == '-err') { - $message = trim($body, "'"); - throw new LogicException($message); - } - - if ($nick == 'Hmsg') { - $nick = 'Msg'; - } - - $class = 'Basis\\Nats\\Message\\' . $nick; - - return call_user_func_array([$class, 'create'], [$body]); + return $message; } } diff --git a/src/Message/Info.php b/src/Message/Info.php index 3eadf92..db6595d 100644 --- a/src/Message/Info.php +++ b/src/Message/Info.php @@ -6,33 +6,34 @@ class Info extends Prototype { - public string $server_id; - public string $server_name; - public string $version; + public bool $headers; + public int $max_payload; + public int $port; public int $proto; - public ?string $git_commit; public string $go; public string $host; - public int $port; - public bool $headers; + public string $server_id; + public string $server_name; + public string $version; + + /** @var string[]|null */ + public ?array $connect_urls; + /** @var string[]|null */ + public ?array $ws_connect_urls; public ?bool $auth_required; + public ?bool $cluster_dynamic; + public ?bool $jetstream; + public ?bool $ldm; + public ?bool $tls_available; public ?bool $tls_required; public ?bool $tls_verify; - public ?bool $tls_available; - public int $max_payload; - public ?bool $jetstream; - public ?string $ip; public ?int $client_id; public ?string $client_ip; - public ?string $nonce; public ?string $cluster; - public ?bool $cluster_dynamic; public ?string $domain; - /** @var string[]|null */ - public ?array $connect_urls; - /** @var string[]|null */ - public ?array $ws_connect_urls; - public ?bool $ldm; + public ?string $git_commit; + public ?string $ip; + public ?string $nonce; public ?string $xkey; public function render(): string diff --git a/src/Message/Msg.php b/src/Message/Msg.php index a27622e..1cc1bd5 100644 --- a/src/Message/Msg.php +++ b/src/Message/Msg.php @@ -8,13 +8,14 @@ class Msg extends Prototype { - public ?int $hlength = null; - public ?string $replyTo = null; public int $length; public Payload $payload; public string $sid; public string $subject; + + public ?int $hlength = null; public ?int $timestampNanos = null; + public ?string $replyTo = null; public static function create(string $data): self { diff --git a/src/Message/Ok.php b/src/Message/Ok.php new file mode 100644 index 0000000..5aac6c4 --- /dev/null +++ b/src/Message/Ok.php @@ -0,0 +1,13 @@ +getValues(); + if ($values === null) { + return; + } foreach ($values as $k => $v) { if (!property_exists($this, $k)) { diff --git a/tests/Functional/ClientTest.php b/tests/Functional/ClientTest.php index f2b5d86..ec8af51 100644 --- a/tests/Functional/ClientTest.php +++ b/tests/Functional/ClientTest.php @@ -4,7 +4,7 @@ namespace Tests\Functional; -use Basis\Nats\Client; +use Basis\Nats\Connection; use ReflectionProperty; use Tests\FunctionalTestCase; @@ -33,10 +33,10 @@ public function testReconnect() $client = $this->getClient(); $this->assertTrue($client->ping()); - $property = new ReflectionProperty(Client::class, 'socket'); + $property = new ReflectionProperty(Connection::class, 'socket'); $property->setAccessible(true); - fclose($property->getValue($client)); + fclose($property->getValue($client->connection)); $this->assertTrue($client->ping()); } @@ -51,8 +51,8 @@ public function testName() { $client = $this->createClient(); $client->setName('name-test'); - $client->connect(); - $this->assertSame($client->connect->name, 'name-test'); + $client->ping(); + $this->assertSame($client->connection->getConnectMessage()->name, 'name-test'); } public function testInvalidConnection() @@ -72,8 +72,8 @@ public function testTLSConnection() $this->assertTrue($client->ping()); - $this->assertTrue($client->info->tls_required); - $this->assertTrue($client->info->tls_verify); + $this->assertTrue($client->connection->getInfoMessage()->tls_required); + $this->assertTrue($client->connection->getInfoMessage()->tls_verify); } diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index ac5b4ab..77b7e61 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -19,7 +19,7 @@ class StreamTest extends FunctionalTestCase public function testConsumerExpiration() { - $client = $this->createClient(['timeout' => 0.1, 'delay' => 0.1]); + $client = $this->createClient(['timeout' => 0.2, 'delay' => 0.1]); $stream = $client->getApi()->getStream('empty'); $stream->getConfiguration() ->setSubjects(['empty']); @@ -28,11 +28,11 @@ public function testConsumerExpiration() $consumer = $stream->getConsumer('empty')->create(); $consumer->getConfiguration()->setSubjectFilter('empty'); - $info = $client->info; + $info = $client->connection->getInfoMessage(); $consumer->setIterations(1)->setExpires(3)->handle(function () { }); - $this->assertSame($info, $client->info); + $this->assertSame($info, $client->connection->getInfoMessage()); } public function testDeduplication() diff --git a/tests/Functional/SubjectTest.php b/tests/Functional/SubjectTest.php index 6ce1369..7ac693c 100644 --- a/tests/Functional/SubjectTest.php +++ b/tests/Functional/SubjectTest.php @@ -40,7 +40,7 @@ public function testSubscribeQueue() $setter = function ($socket) { $this->socket = $socket; }; - $setter->call($client, $memoryStream); + $setter->call($client->connection, $memoryStream); $client->subscribeQueue('subject', 'group', function () { }); diff --git a/tests/FunctionalTestCase.php b/tests/FunctionalTestCase.php index 8983901..51b9e8b 100644 --- a/tests/FunctionalTestCase.php +++ b/tests/FunctionalTestCase.php @@ -36,6 +36,8 @@ public function getConfiguration(array ...$options): Configuration return new Configuration([ 'host' => getenv('NATS_HOST'), 'port' => +getenv('NATS_PORT'), + 'delay' => 0.05, + 'delayMode' => Configuration::DELAY_LINEAR, 'timeout' => 0.5, 'verbose' => getenv('NATS_CLIENT_VERBOSE') == '1', ], ...$options); diff --git a/tests/Performance/PerformanceTest.php b/tests/Performance/PerformanceTest.php index caf2db0..74f6d1f 100644 --- a/tests/Performance/PerformanceTest.php +++ b/tests/Performance/PerformanceTest.php @@ -8,13 +8,13 @@ class PerformanceTest extends FunctionalTestCase { - private int $limit = 100_000; + private int $limit = 500_000; private int $counter = 0; public function testPerformance() { $client = $this->createClient()->setTimeout(0.1)->setDelay(0); - $client->setLogger(null); + $client->connection->setLogger(null); $this->logger?->info('start performance test');