Skip to content

Commit 434ec66

Browse files
authored
fix(network): fix potential ByteBuf LEAK in fetch (#3027)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent daef2e2 commit 434ec66

File tree

3 files changed

+39
-9
lines changed

3 files changed

+39
-9
lines changed

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,10 @@ object SocketServer {
414414

415415
val ListenerReconfigurableConfigs: Set[String] = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
416416

417+
// AutoMQ inject start
418+
val MaxInflightRequestsPerConnection = 64;
419+
// AutoMQ inject end
420+
417421
def closeSocket(
418422
channel: SocketChannel,
419423
logging: Logging
@@ -1073,7 +1077,7 @@ private[kafka] class Processor(
10731077
if (channel != null && channel.isMuted) {
10741078
val unmute = if (channelContext == null) {
10751079
true
1076-
} else if (channelContext.nextCorrelationId.size() < 8 && channelContext.clearQueueFull()) {
1080+
} else if (channelContext.nextCorrelationId.size() < MaxInflightRequestsPerConnection && channelContext.clearQueueFull()) {
10771081
true
10781082
} else {
10791083
false
@@ -1200,7 +1204,7 @@ private[kafka] class Processor(
12001204
// AutoMQ will pipeline the requests to accelerate the performance and also keep the request order.
12011205

12021206
// Mute the channel if the inflight requests exceed the threshold.
1203-
if (channelContext.nextCorrelationId.size() >= SocketServerConfigs.MAX_INFLIGHT_REQUESTS_PER_CONNECTION && !channel.isMuted) {
1207+
if (channelContext.nextCorrelationId.size() >= MaxInflightRequestsPerConnection && !channel.isMuted) {
12041208
if (isTraceEnabled) {
12051209
trace(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${channelContext.nextCorrelationId.size()}.")
12061210
}
@@ -1248,7 +1252,7 @@ private[kafka] class Processor(
12481252
if (channel.isMuted) {
12491253
val unmute = if (channelContext == null) {
12501254
true
1251-
} else if (channelContext.nextCorrelationId.size() < 8 && channelContext.clearQueueFull()) {
1255+
} else if (channelContext.nextCorrelationId.size() < MaxInflightRequestsPerConnection && channelContext.clearQueueFull()) {
12521256
if (isTraceEnabled) {
12531257
trace(s"Unmute channel ${send.destinationId} because the inflight requests are below the threshold.")
12541258
}
@@ -1289,7 +1293,7 @@ private[kafka] class Processor(
12891293
}
12901294
remove
12911295
})
1292-
channelContexts.remove(connectionId)
1296+
removeChannelContext(connectionId)
12931297
// the channel has been closed by the selector but the quotas still need to be updated
12941298
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
12951299
} catch {
@@ -1330,7 +1334,7 @@ private[kafka] class Processor(
13301334
}
13311335
remove
13321336
})
1333-
channelContexts.remove(connectionId)
1337+
removeChannelContext(connectionId)
13341338
// inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
13351339
// AutoMQ for Kafka inject end
13361340
}
@@ -1442,6 +1446,26 @@ private[kafka] class Processor(
14421446
// AutoMQ for Kafka inject end
14431447
}
14441448

1449+
// AutoMQ inject start
1450+
private def removeChannelContext(connectionId: String): Unit = {
1451+
val channelContext = channelContexts.remove(connectionId)
1452+
if (channelContext == null) {
1453+
return
1454+
}
1455+
channelContext.synchronized {
1456+
channelContext.nextCorrelationId.clear()
1457+
channelContext.responses.forEach((_, response) => {
1458+
response match {
1459+
case sendResponse: SendResponse =>
1460+
sendResponse.responseSend.release()
1461+
case _ =>
1462+
}
1463+
})
1464+
channelContext.responses.clear()
1465+
}
1466+
}
1467+
// AutoMQ inject end
1468+
14451469
private def dequeueResponse(): RequestChannel.Response = {
14461470
val response = responseQueue.poll()
14471471
if (response != null)

core/src/main/scala/kafka/server/FetchSession.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,16 @@ class IncrementalFetchContext(private val time: Time,
548548
if (session.epoch != expectedEpoch) {
549549
info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " +
550550
s"got ${session.epoch}. Possible duplicate request.")
551+
// AutoMQ inject start
552+
// The fetch will return empty data, so we need to release the fetched records.
553+
updates.forEach((_, response) => {
554+
response.records() match {
555+
case r: PooledResource =>
556+
r.release()
557+
case _ =>
558+
}
559+
})
560+
// AutoMQ inject end
551561
FetchResponse.of(Errors.INVALID_FETCH_SESSION_EPOCH, 0, session.id, new FetchSession.RESP_MAP)
552562
} else {
553563
// Iterate over the update list using PartitionIterator. This will prune updates which don't need to be sent

server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,6 @@ public class SocketServerConfigs {
172172
public static final int NUM_NETWORK_THREADS_DEFAULT = 3;
173173
public static final String NUM_NETWORK_THREADS_DOC = "The number of threads that the server uses for receiving requests from the network and sending responses to the network. Noted: each listener (except for controller listener) creates its own thread pool.";
174174

175-
// AutoMQ inject start
176-
public static final int MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 64;
177-
// AutoMQ inject end
178-
179175
public static final ConfigDef CONFIG_DEF = new ConfigDef()
180176
.define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC)
181177
.define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC)

0 commit comments

Comments
 (0)