@@ -13,13 +13,19 @@ import com.sunnychung.application.multiplatform.hellohttp.network.apache.Http2Fr
13
13
import com.sunnychung.application.multiplatform.hellohttp.network.util.CallDataUserResponseUtil
14
14
import com.sunnychung.application.multiplatform.hellohttp.util.log
15
15
import com.sunnychung.lib.multiplatform.kdatetime.KInstant
16
+ import com.sunnychung.lib.multiplatform.kdatetime.extension.seconds
17
+ import kotlinx.coroutines.CancellationException
16
18
import kotlinx.coroutines.CoroutineScope
17
19
import kotlinx.coroutines.Dispatchers
18
20
import kotlinx.coroutines.ExperimentalCoroutinesApi
21
+ import kotlinx.coroutines.cancel
22
+ import kotlinx.coroutines.delay
19
23
import kotlinx.coroutines.flow.MutableSharedFlow
20
24
import kotlinx.coroutines.launch
21
25
import kotlinx.coroutines.runBlocking
22
26
import kotlinx.coroutines.suspendCancellableCoroutine
27
+ import kotlinx.coroutines.sync.Mutex
28
+ import kotlinx.coroutines.sync.withLock
23
29
import org.apache.hc.client5.http.SystemDefaultDnsResolver
24
30
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse
25
31
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer
@@ -44,6 +50,7 @@ import org.apache.hc.core5.http.nio.RequestChannel
44
50
import org.apache.hc.core5.http.protocol.HttpContext
45
51
import org.apache.hc.core5.http2.HttpVersionPolicy
46
52
import org.apache.hc.core5.http2.config.H2Config
53
+ import org.apache.hc.core5.http2.frame.FrameFlag
47
54
import org.apache.hc.core5.http2.frame.FrameType
48
55
import org.apache.hc.core5.http2.frame.RawFrame
49
56
import org.apache.hc.core5.http2.hpack.HPackInspectHeader
@@ -54,8 +61,11 @@ import java.net.InetAddress
54
61
import java.nio.ByteBuffer
55
62
import java.security.Principal
56
63
import java.security.cert.Certificate
64
+ import java.util.Collections
57
65
import java.util.concurrent.ConcurrentHashMap
66
+ import java.util.concurrent.atomic.AtomicBoolean
58
67
import java.util.concurrent.atomic.AtomicInteger
68
+ import kotlin.coroutines.CoroutineContext
59
69
60
70
class ApacheHttpTransportClient (networkClientManager : NetworkClientManager ) : AbstractTransportClient(networkClientManager) {
61
71
@@ -172,6 +182,9 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
172
182
},
173
183
object : H2InspectListener {
174
184
val suspendedHeaderFrames = ConcurrentHashMap <Int , H2HeaderFrame >()
185
+ val openedStreamIds = Collections .newSetFromMap(ConcurrentHashMap <Int , Boolean >())
186
+ val lock = Mutex ()
187
+ val isCancelled = AtomicBoolean (false )
175
188
176
189
override fun onHeaderInputDecoded (connection : HttpConnection , streamId : Int? , headers : MutableList <HPackInspectHeader >) {
177
190
val serialized = http2FrameSerializer.serializeHeaders(headers)
@@ -211,6 +224,14 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
211
224
val type = FrameType .valueOf(frame.type)
212
225
log.d { " processFrame $streamId $type " }
213
226
if (type == FrameType .HEADERS ) {
227
+ if (frame.flags and FrameFlag .END_STREAM .value == 0 ) {
228
+ openedStreamIds + = streamId
229
+ } else {
230
+ openedStreamIds - = streamId
231
+ if (openedStreamIds.isEmpty()) {
232
+ checkForHangConnectionLater()
233
+ }
234
+ }
214
235
val frame = suspendedHeaderFrames.getOrPut(streamId) { H2HeaderFrame (streamId = streamId) }
215
236
frame.frameHeader = serialized
216
237
if (frame.isComplete()) {
@@ -232,6 +253,20 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
232
253
}
233
254
}
234
255
256
+ fun checkForHangConnectionLater () {
257
+ CoroutineScope (Dispatchers .IO ).launch {
258
+ delay(3 .seconds().toMilliseconds())
259
+ lock.withLock {
260
+ if (openedStreamIds.isEmpty() && ! isCancelled.get()) {
261
+ val message = " The connection has no active stream for some seconds, it appears to be hanging. Cancelling the connection."
262
+ emitEvent(callId, message)
263
+ callData.cancel(Exception (message))
264
+ isCancelled.set(true )
265
+ }
266
+ }
267
+ }
268
+ }
269
+
235
270
}
236
271
)
237
272
@@ -304,7 +339,7 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
304
339
}
305
340
}
306
341
307
- CoroutineScope (Dispatchers .IO ).launch {
342
+ CoroutineScope (Dispatchers .IO ).launch(coroutineExceptionHandler()) {
308
343
val callData = callData[callId]!!
309
344
callData.waitForPreparation()
310
345
log.d { " Call $callId is prepared" }
@@ -407,11 +442,15 @@ class ApacheHttpTransportClient(networkClientManager: NetworkClientManager) : Ab
407
442
408
443
})
409
444
410
- data.cancel = {
445
+ data.cancel = { error ->
411
446
log.d { " Request to cancel the call" }
412
447
val cancelResult = call.cancel() // no use at all
413
448
log.d { " Cancel result = $cancelResult " }
414
449
httpClient.close(CloseMode .IMMEDIATE )
450
+
451
+ // httpClient.close is buggy. Do not rely on it
452
+ data.status = ConnectionStatus .DISCONNECTED
453
+ this .cancel(error?.let { CancellationException (it.message, it) })
415
454
}
416
455
}
417
456
0 commit comments