diff --git a/src/Client.php b/src/Client.php index 9d135de..634ac0f 100644 --- a/src/Client.php +++ b/src/Client.php @@ -310,7 +310,9 @@ function () { $this->properties->tune($maxChannel, $maxFrame); if ($heartbeatInterval > 0) { - $this->connection->heartbeat($heartbeatInterval); + $this->connection->heartbeat($heartbeatInterval, function (){ + $this->state = self::STATE_NOT_CONNECTED; + }); } } ); diff --git a/src/Connection.php b/src/Connection.php index 7b4b3a1..f6799de 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -49,7 +49,12 @@ final class Connection private $lastWrite = 0; /** - * @var string|null + * @var int + */ + private $lastRead = 0; + + /** + * @var string */ private $heartbeatWatcherId; @@ -126,6 +131,7 @@ function () use ($timeout, $maxAttempts, $noDelay) { } $this->socket = yield connect($this->uri, $context); + $this->lastRead = Loop::now(); asyncCall( function () { @@ -140,6 +146,7 @@ function () { /** @var AbstractFrame $frame */ $class = \get_class($frame); + $this->lastRead = Loop::now(); /** * @psalm-var int $i @@ -160,18 +167,14 @@ function () { ); } - public function heartbeat(int $interval): void + public function heartbeat(int $interval, ?callable $connectionLost = null): void { $this->heartbeatWatcherId = Loop::repeat( $interval, - function (string $watcherId) use ($interval) { - if ($this->socket === null) { - Loop::cancel($watcherId); - - return; - } + function ($watcherId) use ($interval, $connectionLost){ + $currentTime = Loop::now(); - $currentTime = Loop::now(); + if (null !== $this->socket) { $lastWrite = $this->lastWrite ?: $currentTime; $nextHeartbeat = $lastWrite + $interval; @@ -185,9 +188,18 @@ function (string $watcherId) use ($interval) { ); } - unset($currentTime, $lastWrite, $nextHeartbeat); + unset($lastWrite, $nextHeartbeat); } - ); + + if (null !== $connectionLost && 0 !== $this->lastRead) { + if ($currentTime > ($this->lastRead + $interval + 1000)) { + $connectionLost(); + Loop::cancel($watcherId); + } + } + + unset($currentTime); + }); } public function close(): void