Skip to content

Commit

Permalink
Heartbeat doesn't work properly
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey.karakhtanov committed Jun 23, 2021
1 parent b362ee6 commit aca0804
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
4 changes: 3 additions & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ function () {
$this->properties->tune($maxChannel, $maxFrame);

if ($heartbeatInterval > 0) {
$this->connection->heartbeat($heartbeatInterval);
$this->connection->heartbeat($heartbeatInterval, function (){
$this->state = self::STATE_NOT_CONNECTED;
});
}
}
);
Expand Down
34 changes: 23 additions & 11 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ final class Connection
private $lastWrite = 0;

/**
* @var string|null
* @var int
*/
private $lastRead = 0;

/**
* @var string
*/
private $heartbeatWatcherId;

Expand Down Expand Up @@ -126,6 +131,7 @@ function () use ($timeout, $maxAttempts, $noDelay) {
}

$this->socket = yield connect($this->uri, $context);
$this->lastRead = Loop::now();

asyncCall(
function () {
Expand All @@ -140,6 +146,7 @@ function () {
/** @var AbstractFrame $frame */

$class = \get_class($frame);
$this->lastRead = Loop::now();

/**
* @psalm-var int $i
Expand All @@ -160,18 +167,14 @@ function () {
);
}

public function heartbeat(int $interval): void
public function heartbeat(int $interval, ?callable $connectionLost = null): void
{
$this->heartbeatWatcherId = Loop::repeat(
$interval,
function (string $watcherId) use ($interval) {
if ($this->socket === null) {
Loop::cancel($watcherId);

return;
}
function ($watcherId) use ($interval, $connectionLost){
$currentTime = Loop::now();

$currentTime = Loop::now();
if (null !== $this->socket) {
$lastWrite = $this->lastWrite ?: $currentTime;

$nextHeartbeat = $lastWrite + $interval;
Expand All @@ -185,9 +188,18 @@ function (string $watcherId) use ($interval) {
);
}

unset($currentTime, $lastWrite, $nextHeartbeat);
unset($lastWrite, $nextHeartbeat);
}
);

if (null !== $connectionLost && 0 !== $this->lastRead) {
if ($currentTime > ($this->lastRead + $interval + 1000)) {
$connectionLost();
Loop::cancel($watcherId);
}
}

unset($currentTime);
});
}

public function close(): void
Expand Down

0 comments on commit aca0804

Please sign in to comment.