Skip to content

Commit

Permalink
fix: fixed bug with stream clean up and added concurrency tests
Browse files Browse the repository at this point in the history
* using a stop-gap measure where we add and remove data to the finish frame message.
* Likely there is a bug in the `quiche` code where this packet is not being sent due to it being a 0-length message. It was fixed before in their code, but they may have missed an edge case where a large volume of data is being processed.
* Fixed race condition with stream pulling data throwing error and cleaning up after stream completes. Somehow despite the code having no awaits, the readable stream was getting pulled and errored out the stream before cleaning up after the finish frame.
* Added tests for concurrent servers and clients.
* Cleaned up logging for streams.

* Related #14

[ci skip]
  • Loading branch information
tegefaulkes committed May 16, 2023
1 parent af0bfbc commit 645d8dd
Show file tree
Hide file tree
Showing 8 changed files with 804 additions and 145 deletions.
48 changes: 30 additions & 18 deletions src/QUICConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class QUICConnection extends EventTarget {
protected codeToReason: StreamCodeToReason;
protected maxReadableStreamBytes: number | undefined;
protected maxWritableStreamBytes: number | undefined;
protected destroyingMap: Map<StreamId, QUICStream> = new Map();

// This basically allows one to await this promise
// once resolved, always resolved...
Expand Down Expand Up @@ -421,7 +420,7 @@ class QUICConnection extends EventTarget {
};
try {
this.conn.recv(data, recvInfo);
this.logger.debug(`RECEIVED ${data.byteLength} of data`);
this.logger.info(`RECEIVED ${data.byteLength} of data`);
} catch (e) {
this.logger.error(`recv error ${e.message}`);
// Depending on the exception, the `this.conn.recv`
Expand Down Expand Up @@ -460,18 +459,15 @@ class QUICConnection extends EventTarget {
if (this.resolveCloseP != null) this.resolveCloseP();
return;
}
if (
!this.conn.isDraining() &&
(this.conn.isInEarlyData() || this.conn.isEstablished())
) {
if (this.conn.isInEarlyData() || this.conn.isEstablished()) {
const readIds: Array<number> = [];
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
let quicStream = this.streamMap.get(streamId);
if (quicStream == null) {
// The creation will set itself to the stream map
quicStream = await QUICStream.createQUICStream({
streamId,
connection: this,
destroyingMap: this.destroyingMap,
codeToReason: this.codeToReason,
reasonToCode: this.reasonToCode,
maxReadableStreamBytes: this.maxReadableStreamBytes,
Expand All @@ -482,9 +478,14 @@ class QUICConnection extends EventTarget {
new events.QUICConnectionStreamEvent({ detail: quicStream }),
);
}
readIds.push(quicStream.streamId);
quicStream.read();
quicStream.dispatchEvent(new events.QUICStreamReadableEvent());
}
if (readIds.length > 0) {
this.logger.info(`processed reads for ${readIds}`);
}
const writeIds: Array<number> = [];
for (const streamId of this.conn.writable() as Iterable<StreamId>) {
let quicStream = this.streamMap.get(streamId);
if (quicStream == null) {
Expand All @@ -494,7 +495,6 @@ class QUICConnection extends EventTarget {
connection: this,
codeToReason: this.codeToReason,
reasonToCode: this.reasonToCode,
destroyingMap: this.destroyingMap,
maxReadableStreamBytes: this.maxReadableStreamBytes,
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
});
Expand All @@ -503,18 +503,15 @@ class QUICConnection extends EventTarget {
);
}
quicStream.dispatchEvent(new events.QUICStreamWritableEvent());
writeIds.push(quicStream.streamId);
quicStream.write();
}
// Checking shortlist if streams have finished.
for (const [streamId, stream] of this.destroyingMap) {
if (stream.isFinished()) {
// If it has finished, it will trigger its own clean up.
// Remove the stream from the shortlist.
this.destroyingMap.delete(streamId);
}
if (writeIds.length > 0) {
this.logger.info(`processed writes for ${writeIds}`);
}
}
} finally {
this.garbageCollectStreams('recv');
this.logger.debug('RECV FINALLY');
// Set the timeout
this.checkTimeout();
Expand All @@ -527,7 +524,7 @@ class QUICConnection extends EventTarget {
) {
this.logger.debug('CALLING DESTROY 2');
// Destroy in the background, we still need to process packets
void this.destroy();
void this.destroy().catch(() => {});
}
}
}
Expand Down Expand Up @@ -558,6 +555,7 @@ class QUICConnection extends EventTarget {
} else if (this.conn.isDraining()) {
return;
}
let numSent = 0;
try {
const sendBuffer = new Uint8Array(quiche.MAX_DATAGRAM_SIZE);
let sendLength: number;
Expand Down Expand Up @@ -630,8 +628,10 @@ class QUICConnection extends EventTarget {
return;
}
this.dispatchEvent(new events.QUICConnectionSendEvent());
numSent += 1;
}
} finally {
if (numSent > 0) this.garbageCollectStreams('send');
this.logger.debug('SEND FINALLY');
this.checkTimeout();
if (
Expand Down Expand Up @@ -689,7 +689,6 @@ class QUICConnection extends EventTarget {
connection: this,
codeToReason: this.codeToReason,
reasonToCode: this.reasonToCode,
destroyingMap: this.destroyingMap,
maxReadableStreamBytes: this.maxReadableStreamBytes,
maxWritableStreamBytes: this.maxWritableStreamBytes,
logger: this.logger.getChild(`${QUICStream.name} ${streamId!}`),
Expand Down Expand Up @@ -743,7 +742,7 @@ class QUICConnection extends EventTarget {
) {
this.logger.debug('CALLING DESTROY 3');
// Destroy in the background, we still need to process packets
void this.destroy();
void this.destroy().catch(() => {});
}
this.checkTimeout();
};
Expand Down Expand Up @@ -795,6 +794,19 @@ class QUICConnection extends EventTarget {
}
}
};

protected garbageCollectStreams(where: string) {
const nums: Array<number> = [];
// Only check if packets were sent
for (const [streamId, quicStream] of this.streamMap) {
// Stream sending can finish after a packet is sent
nums.push(streamId);
quicStream.read();
}
if (nums.length > 0) {
this.logger.info(`checking read finally ${where} for ${nums}`);
}
}
}

export default QUICConnection;
3 changes: 2 additions & 1 deletion src/QUICServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class QUICServer extends EventTarget {
this.logger.debug(
`Accepting new connection from QUIC packet from ${remoteInfo.host}:${remoteInfo.port}`,
);
const clientConnRef = Buffer.from(header.scid).toString('hex').slice(32);
const connection = await QUICConnection.acceptQUICConnection({
scid,
dcid: dcidOriginal,
Expand All @@ -301,7 +302,7 @@ class QUICServer extends EventTarget {
maxReadableStreamBytes: this.maxReadableStreamBytes,
maxWritableStreamBytes: this.maxWritableStreamBytes,
logger: this.logger.getChild(
`${QUICConnection.name} ${scid.toString().slice(32)}`,
`${QUICConnection.name} ${scid.toString().slice(32)}-${clientConnRef}`,
),
});
connection.setKeepAlive(this.keepaliveIntervalTime);
Expand Down
Loading

0 comments on commit 645d8dd

Please sign in to comment.