Skip to content

Commit

Permalink
Merge pull request #28 from Volonda/issue-26
Browse files Browse the repository at this point in the history
Heartbeat doesn't work properly
  • Loading branch information
mmasiukevich authored Dec 4, 2021
2 parents 97d2750 + aca0804 commit 3561e05
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
4 changes: 3 additions & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ function () {
$this->properties->tune($maxChannel, $maxFrame);

if ($heartbeatInterval > 0) {
$this->connection->heartbeat($heartbeatInterval);
$this->connection->heartbeat($heartbeatInterval, function (){
$this->state = self::STATE_NOT_CONNECTED;
});
}
}
);
Expand Down
34 changes: 23 additions & 11 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ final class Connection
private $lastWrite = 0;

/**
* @var string|null
* @var int
*/
private $lastRead = 0;

/**
* @var string
*/
private $heartbeatWatcherId;

Expand Down Expand Up @@ -126,6 +131,7 @@ function () use ($timeout, $maxAttempts, $noDelay) {
}

$this->socket = yield connect($this->uri, $context);
$this->lastRead = Loop::now();

asyncCall(
function () {
Expand All @@ -138,6 +144,7 @@ function () {

while ($frame = $this->parser->parse()) {
$class = \get_class($frame);
$this->lastRead = Loop::now();

/**
* @psalm-var callable(AbstractFrame):Promise<bool> $callback
Expand All @@ -157,18 +164,14 @@ function () {
);
}

public function heartbeat(int $interval): void
public function heartbeat(int $interval, ?callable $connectionLost = null): void
{
$this->heartbeatWatcherId = Loop::repeat(
$interval,
function (string $watcherId) use ($interval) {
if ($this->socket === null) {
Loop::cancel($watcherId);

return;
}
function ($watcherId) use ($interval, $connectionLost){
$currentTime = Loop::now();

$currentTime = Loop::now();
if (null !== $this->socket) {
$lastWrite = $this->lastWrite ?: $currentTime;

$nextHeartbeat = $lastWrite + $interval;
Expand All @@ -182,9 +185,18 @@ function (string $watcherId) use ($interval) {
);
}

unset($currentTime, $lastWrite, $nextHeartbeat);
unset($lastWrite, $nextHeartbeat);
}
);

if (null !== $connectionLost && 0 !== $this->lastRead) {
if ($currentTime > ($this->lastRead + $interval + 1000)) {
$connectionLost();
Loop::cancel($watcherId);
}
}

unset($currentTime);
});
}

public function close(): void
Expand Down

0 comments on commit 3561e05

Please sign in to comment.