@@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
67
67
this .source = source ;
68
68
this .isClient = isClient ;
69
69
70
- this .serverReceiver = new InternalDuplexConnection (this , source );
71
- this .clientReceiver = new InternalDuplexConnection (this , source );
70
+ this .serverReceiver = new InternalDuplexConnection (Type . SERVER , this , source );
71
+ this .clientReceiver = new InternalDuplexConnection (Type . CLIENT , this , source );
72
72
this .serverConnection = registry .initConnection (Type .SERVER , serverReceiver );
73
73
this .clientConnection = registry .initConnection (Type .CLIENT , clientReceiver );
74
74
}
@@ -221,6 +221,7 @@ public String toString() {
221
221
222
222
private static class InternalDuplexConnection extends Flux <ByteBuf >
223
223
implements Subscription , DuplexConnection {
224
+ private final Type type ;
224
225
private final ClientServerInputMultiplexer clientServerInputMultiplexer ;
225
226
private final DuplexConnection source ;
226
227
@@ -231,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
231
232
CoreSubscriber <? super ByteBuf > actual ;
232
233
233
234
public InternalDuplexConnection (
234
- ClientServerInputMultiplexer clientServerInputMultiplexer , DuplexConnection source ) {
235
+ Type type ,
236
+ ClientServerInputMultiplexer clientServerInputMultiplexer ,
237
+ DuplexConnection source ) {
238
+ this .type = type ;
235
239
this .clientServerInputMultiplexer = clientServerInputMultiplexer ;
236
240
this .source = source ;
237
241
}
@@ -331,7 +335,14 @@ public double availability() {
331
335
332
336
@ Override
333
337
public String toString () {
334
- return "InternalDuplexConnection{" + ", source=" + source + ", state=" + state + '}' ;
338
+ return "InternalDuplexConnection{"
339
+ + "type="
340
+ + type
341
+ + ", source="
342
+ + source
343
+ + ", state="
344
+ + state
345
+ + '}' ;
335
346
}
336
347
}
337
348
}
0 commit comments