|
18 | 18 | */
|
19 | 19 | package org.apache.pegasus.rpc.async;
|
20 | 20 |
|
21 |
| -import static org.junit.jupiter.api.Assertions.assertEquals; |
22 |
| -import static org.junit.jupiter.api.Assertions.assertNull; |
23 |
| -import static org.junit.jupiter.api.Assertions.assertTrue; |
24 |
| -import static org.junit.jupiter.api.Assertions.fail; |
| 21 | +import static org.junit.jupiter.api.Assertions.*; |
25 | 22 |
|
26 | 23 | import io.netty.channel.EventLoopGroup;
|
27 | 24 | import io.netty.channel.nio.NioEventLoopGroup;
|
28 | 25 | import java.util.ArrayList;
|
| 26 | +import java.util.Objects; |
29 | 27 | import java.util.concurrent.Callable;
|
30 | 28 | import java.util.concurrent.ExecutionException;
|
31 | 29 | import java.util.concurrent.FutureTask;
|
|
38 | 36 | import org.apache.pegasus.client.ClientOptions;
|
39 | 37 | import org.apache.pegasus.client.PegasusClient;
|
40 | 38 | import org.apache.pegasus.operator.client_operator;
|
| 39 | +import org.apache.pegasus.operator.query_cfg_operator; |
41 | 40 | import org.apache.pegasus.operator.rrdb_get_operator;
|
42 | 41 | import org.apache.pegasus.operator.rrdb_put_operator;
|
| 42 | +import org.apache.pegasus.replication.query_cfg_request; |
43 | 43 | import org.apache.pegasus.rpc.KeyHasher;
|
44 | 44 | import org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
|
45 | 45 | import org.apache.pegasus.tools.Toollet;
|
@@ -284,12 +284,27 @@ public void testSessionConnectTimeout() throws InterruptedException {
|
284 | 284 |
|
285 | 285 | @Test
|
286 | 286 | public void testSessionTryConnectWhenConnected() throws InterruptedException {
|
287 |
| - rpc_address addr = new rpc_address(); |
288 |
| - addr.fromString("127.0.0.1:34801"); |
289 |
| - ReplicaSession rs = manager.getReplicaSession(addr); |
| 287 | + ReplicaSession rs = |
| 288 | + manager.getReplicaSession( |
| 289 | + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34801"))); |
290 | 290 | rs.tryConnect().awaitUninterruptibly();
|
291 | 291 | Thread.sleep(100);
|
292 |
| - assertEquals(rs.getState(), ReplicaSession.ConnState.CONNECTED); |
| 292 | + assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState()); |
293 | 293 | assertNull(rs.tryConnect()); // do not connect again
|
294 | 294 | }
|
| 295 | + |
| 296 | + @Test |
| 297 | + public void testSessionAuth() throws InterruptedException { |
| 298 | + ReplicaSession rs = |
| 299 | + manager.getReplicaSession( |
| 300 | + Objects.requireNonNull(rpc_address.fromIpPort("127.0.0.1:34601"))); |
| 301 | + rs.tryConnect().awaitUninterruptibly(); |
| 302 | + Thread.sleep(100); |
| 303 | + assertEquals(ReplicaSession.ConnState.CONNECTED, rs.getState()); |
| 304 | + |
| 305 | + query_cfg_request req = new query_cfg_request("temp", new ArrayList<Integer>()); |
| 306 | + final ReplicaSession.RequestEntry entry = new ReplicaSession.RequestEntry(); |
| 307 | + entry.op = new query_cfg_operator(new gpid(-1, -1), req); |
| 308 | + assertFalse(rs.tryPendRequest(entry)); |
| 309 | + } |
295 | 310 | }
|
0 commit comments