Skip to content

Commit

Permalink
SendReliabilityLayer: do not send reliable packets outside the client…
Browse files Browse the repository at this point in the history
…'s reliable window

The client implements a window of 512 packets, outside of which packets will be dropped. Eventually, the server will resend them, but after a long delay.
To avoid this, we buffer packets whose reliable message indexes are too large for the current reliable window.
This significantly improves performance when transmitting large amounts of data.
  • Loading branch information
dktapps committed Mar 4, 2024
1 parent fd74ba2 commit ebd8b6c
Showing 1 changed file with 77 additions and 24 deletions.
101 changes: 77 additions & 24 deletions src/generic/SendReliabilityLayer.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
use raklib\protocol\PacketReliability;
use raklib\protocol\SplitPacketInfo;
use function array_fill;
use function array_push;
use function assert;
use function count;
use function str_split;
use function strlen;
Expand All @@ -41,12 +43,23 @@ final class SendReliabilityLayer{

private int $messageIndex = 0;

private int $reliableWindowStart;
private int $reliableWindowEnd;
/**
* @var bool[] message index => acked
* @phpstan-var array<int, bool>
*/
private array $reliableWindow = [];

/** @var int[] */
private array $sendOrderedIndex;
/** @var int[] */
private array $sendSequencedIndex;

/** @var ReliableCacheEntry[] */
/** @var EncapsulatedPacket[] */
private array $reliableBacklog = [];

/** @var EncapsulatedPacket[] */
private array $resendQueue = [];

/** @var ReliableCacheEntry[] */
Expand All @@ -60,18 +73,22 @@ final class SendReliabilityLayer{

/**
* @phpstan-param int<Session::MIN_MTU_SIZE, max> $mtuSize
* @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
* @phpstan-param \Closure(int) : void $onACK
* @phpstan-param \Closure(Datagram) : void $sendDatagramCallback
* @phpstan-param \Closure(int) : void $onACK
*/
public function __construct(
private int $mtuSize,
private \Closure $sendDatagramCallback,
private \Closure $onACK
private \Closure $onACK,
private int $reliableWindowSize = 512,
){
$this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
$this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);

$this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD;

$this->reliableWindowStart = 0;
$this->reliableWindowEnd = $this->reliableWindowSize;
}

/**
Expand Down Expand Up @@ -102,6 +119,19 @@ public function sendQueue() : void{
}

private function addToQueue(EncapsulatedPacket $pk, bool $immediate) : void{
if(PacketReliability::isReliable($pk->reliability)){
if($pk->messageIndex === null || $pk->messageIndex < $this->reliableWindowStart){
throw new \InvalidArgumentException("Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)");
}
if($pk->messageIndex >= $this->reliableWindowEnd){
//If we send this now, the client's reliable window may overflow, causing the packet to need redelivery
$this->reliableBacklog[$pk->messageIndex] = $pk;
return;
}

$this->reliableWindow[$pk->messageIndex] = false;
}

if($pk->identifierACK !== null and $pk->messageIndex !== null){
$this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex;
}
Expand Down Expand Up @@ -172,11 +202,26 @@ public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immedia
}
}

private function updateReliableWindow() : void{
while(
isset($this->reliableWindow[$this->reliableWindowStart]) && //this messageIndex has been used
$this->reliableWindow[$this->reliableWindowStart] === true //we received an ack for this messageIndex
){
unset($this->reliableWindow[$this->reliableWindowStart]);
$this->reliableWindowStart++;
$this->reliableWindowEnd++;
}
}

public function onACK(ACK $packet) : void{
foreach($packet->packets as $seq){
if(isset($this->reliableCache[$seq])){
foreach($this->reliableCache[$seq]->getPackets() as $pk){
if($pk->identifierACK !== null and $pk->messageIndex !== null){
assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd);
$this->reliableWindow[$pk->messageIndex] = true;
$this->updateReliableWindow();

if($pk->identifierACK !== null){
unset($this->needACK[$pk->identifierACK][$pk->messageIndex]);
if(count($this->needACK[$pk->identifierACK]) === 0){
unset($this->needACK[$pk->identifierACK]);
Expand All @@ -192,8 +237,9 @@ public function onACK(ACK $packet) : void{
public function onNACK(NACK $packet) : void{
foreach($packet->packets as $seq){
if(isset($this->reliableCache[$seq])){
//TODO: group resends if the resulting datagram is below the MTU
$this->resendQueue[] = $this->reliableCache[$seq];
foreach($this->reliableCache[$seq]->getPackets() as $pk){
$this->resendQueue[] = $pk;
}
unset($this->reliableCache[$seq]);
}
}
Expand All @@ -202,37 +248,44 @@ public function onNACK(NACK $packet) : void{
public function needsUpdate() : bool{
return (
count($this->sendQueue) !== 0 or
count($this->reliableBacklog) !== 0 or
count($this->resendQueue) !== 0 or
count($this->reliableCache) !== 0
);
}

public function update() : void{
if(count($this->resendQueue) > 0){
$limit = 16;
foreach($this->resendQueue as $k => $pk){
$this->sendDatagram($pk->getPackets());
unset($this->resendQueue[$k]);

if(--$limit <= 0){
break;
}
}

if(count($this->resendQueue) > ReceiveReliabilityLayer::$WINDOW_SIZE){
$this->resendQueue = [];
}
}

foreach($this->reliableCache as $seq => $pk){
if($pk->getTimestamp() < (time() - 8)){
$this->resendQueue[] = $pk;
//behave as if a NACK was received
array_push($this->resendQueue, ...$pk->getPackets());
unset($this->reliableCache[$seq]);
}else{
break;
}
}

if(count($this->resendQueue) > 0){
foreach($this->resendQueue as $pk){
//resends should always be within the reliable window
$this->addToQueue($pk, false);
}
$this->resendQueue = [];
}

if(count($this->reliableBacklog) > 0){
foreach($this->reliableBacklog as $k => $pk){
assert($pk->messageIndex !== null && $pk->messageIndex >= $this->reliableWindowStart);
if($pk->messageIndex >= $this->reliableWindowEnd){
//we can't send this packet yet, the client's reliable window will drop it
break;
}

$this->addToQueue($pk, false);
unset($this->reliableBacklog[$k]);
}
}

$this->sendQueue();
}
}

0 comments on commit ebd8b6c

Please sign in to comment.