Skip to content

Commit

Permalink
Correctly increment the reference count of entries in the dynamic tab…
Browse files Browse the repository at this point in the history
…le (#266)

Motivation:

We did not correctly update all reference counts and so could end up
with envict things that are actual still in use.

Modifications:

Increment reference count for everything that is used and keep track of
all of the indices that needs to be handled on ack

Result:

Correct dynamic table impl and handling
  • Loading branch information
normanmaurer authored Dec 1, 2023
1 parent 13a2787 commit 35f5f7c
Showing 1 changed file with 58 additions and 29 deletions.
87 changes: 58 additions & 29 deletions src/main/java/io/netty/incubator/codec/http3/QpackEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.netty.util.collection.LongObjectHashMap;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Map;
import java.util.Queue;

import static io.netty.incubator.codec.http3.Http3CodecUtils.closeOnFailure;
import static io.netty.incubator.codec.http3.QpackHeaderField.sizeOf;
Expand All @@ -42,7 +44,7 @@ final class QpackEncoder {
private final QpackEncoderDynamicTable dynamicTable;
private int maxBlockedStreams;
private int blockedStreams;
private LongObjectHashMap<StreamTracker> streamTrackers;
private LongObjectHashMap<Queue<Indices>> streamSectionTrackers;

QpackEncoder() {
this(new QpackEncoderDynamicTable());
Expand All @@ -66,24 +68,32 @@ void encodeHeaders(QpackAttributes qpackAttributes, ByteBuf out, ByteBufAllocato
ByteBuf tmp = allocator.buffer();
try {
int maxDynamicTblIdx = -1;
Map.Entry<CharSequence, CharSequence> maxDynamicTblIdxHeader = null;
int requiredInsertCount = 0;
Indices dynamicTableIndices = null;
for (Map.Entry<CharSequence, CharSequence> header : headers) {
CharSequence name = header.getKey();
CharSequence value = header.getValue();
int dynamicTblIdx = encodeHeader(qpackAttributes, tmp, base, name, value);
if (dynamicTblIdx > maxDynamicTblIdx) {
maxDynamicTblIdx = dynamicTblIdx;
maxDynamicTblIdxHeader = header;
if (dynamicTblIdx >= 0) {
int req = dynamicTable.addReferenceToEntry(name, value, dynamicTblIdx);
if (dynamicTblIdx > maxDynamicTblIdx) {
maxDynamicTblIdx = dynamicTblIdx;
requiredInsertCount = req;
}
if (dynamicTableIndices == null) {
dynamicTableIndices = new Indices();
}
dynamicTableIndices.add(dynamicTblIdx);
}
}
int requiredInsertCount = 0;
if (maxDynamicTblIdx >= 0) {
requiredInsertCount = dynamicTable.addReferenceToEntry(maxDynamicTblIdxHeader.getKey(),
maxDynamicTblIdxHeader.getValue(), maxDynamicTblIdx);
assert streamTrackers != null;
streamTrackers.computeIfAbsent(streamId, __ -> new StreamTracker())
.add(maxDynamicTblIdx);

// Track all the indices that we need to ack later.
if (dynamicTableIndices != null) {
assert streamSectionTrackers != null;
streamSectionTrackers.computeIfAbsent(streamId, __ -> new ArrayDeque<>())
.add(dynamicTableIndices);
}

// https://www.rfc-editor.org/rfc/rfc9204.html#name-encoded-field-section-prefi
// 0 1 2 3 4 5 6 7
// +---+---+---+---+---+---+---+---+
Expand Down Expand Up @@ -118,7 +128,7 @@ void configureDynamicTable(QpackAttributes attributes, long maxTableCapacity, in
encodePrefixedInteger(tableCapacity, (byte) 0b0010_0000, 5, maxTableCapacity);
closeOnFailure(encoderStream.writeAndFlush(tableCapacity));

streamTrackers = new LongObjectHashMap<>();
streamSectionTrackers = new LongObjectHashMap<>();
maxBlockedStreams = blockedStreams;
}
}
Expand All @@ -130,18 +140,19 @@ void configureDynamicTable(QpackAttributes attributes, long maxTableCapacity, in
* @param streamId For which the header fields section is acknowledged.
*/
void sectionAcknowledgment(long streamId) throws QpackException {
assert streamTrackers != null;
final StreamTracker tracker = streamTrackers.get(streamId);
assert streamSectionTrackers != null;
final Queue<Indices> tracker = streamSectionTrackers.get(streamId);
if (tracker == null) {
throw INVALID_SECTION_ACKNOWLEDGMENT;
}

int nextCount = tracker.takeNextInsertCount();
Indices dynamicTableIndices = tracker.poll();

if (tracker.isEmpty()) {
streamTrackers.remove(streamId);
streamSectionTrackers.remove(streamId);
}
if (nextCount >= 0) {
dynamicTable.acknowledgeInsertCount(nextCount);
if (dynamicTableIndices != null) {
dynamicTableIndices.forEach(dynamicTable::acknowledgeInsertCount);
}
}

Expand All @@ -152,12 +163,15 @@ void sectionAcknowledgment(long streamId) throws QpackException {
* @param streamId which is cancelled.
*/
void streamCancellation(long streamId) throws QpackException {
assert streamTrackers != null;
final StreamTracker tracker = streamTrackers.remove(streamId);
assert streamSectionTrackers != null;
final Queue<Indices> tracker = streamSectionTrackers.remove(streamId);
if (tracker != null) {
int nextCount;
while ((nextCount = tracker.takeNextInsertCount()) >= 0) {
dynamicTable.acknowledgeInsertCount(nextCount);
for (;;) {
Indices dynamicTableIndices = tracker.poll();
if (dynamicTableIndices == null) {
break;
}
dynamicTableIndices.forEach(dynamicTable::acknowledgeInsertCount);
}
}
}
Expand Down Expand Up @@ -495,13 +509,28 @@ private boolean mayNotBlockStream() {
return blockedStreams >= maxBlockedStreams - 1;
}

private static final class StreamTracker extends ArrayDeque<Integer> {
StreamTracker() {
super(1); // we will mostly have a single header block in a stream.
private static final class Indices {
private int idx;
// Let's just assume 4 indices for now that we will store here as max.
private int[] array = new int[4];

void add(int index) {
if (idx == array.length) {
// Double it if needed.
array = Arrays.copyOf(array, array.length << 1);
}
array[idx++] = index;
}

void forEach(IndexConsumer consumer) throws QpackException {
for (int i = 0; i < idx; i++) {
consumer.accept(array[i]);
}
}

int takeNextInsertCount() {
return isEmpty() ? -1 : poll();
@FunctionalInterface
interface IndexConsumer {
void accept(int idx) throws QpackException;
}
}
}

0 comments on commit 35f5f7c

Please sign in to comment.