|
| 1 | +# IoTDB-CSharp客户端的定制化SessionPool机制 |
| 2 | + |
| 3 | +### 主要参数 |
| 4 | + |
| 5 | +- `_username`:用户名 |
| 6 | +- `_password`:密码 |
| 7 | +- `_zoneId`:时区 |
| 8 | +- `_host`:主机ip |
| 9 | +- `_port`:端口号 |
| 10 | +- `_fetchSize`:单次请求数据大小 |
| 11 | +- `_poolSize`:线程池大小(默认为4) |
| 12 | + |
| 13 | +### 数据结构 |
| 14 | + |
| 15 | +- #### Client |
| 16 | + |
| 17 | +该数据结构对**客户端连接**进行了封装,维护一个客户端实例**TSIService.Client、**线程码**SessionId**与状态码**StatementId**以及分帧传输流**TFramedTransport。** |
| 18 | + |
| 19 | +```c# |
| 20 | +public Client(TSIService.Client client, long sessionId, long statementId, TFramedTransport transport) |
| 21 | +{ |
| 22 | + ServiceClient = client; |
| 23 | + SessionId = sessionId; |
| 24 | + StatementId = statementId; |
| 25 | + Transport = transport; |
| 26 | +} |
| 27 | +``` |
| 28 | + |
| 29 | +- #### ConcurrentClientQueue |
| 30 | + |
| 31 | +该数据结构封装了一个**高效的线程安全队列**,用于维护客户端与服务器的多个Client连接。 |
| 32 | + |
| 33 | +### Client-Server交互全流程 |
| 34 | + |
| 35 | +- #### 建立Client连接 |
| 36 | + - 用户创建一个**SessionPool**并调用**Open()**函数后,系统会创建一个**ConcurrentClientQueue**实例,并向其中创建并添加 _poolSize 个**Client**连接(客户端连接)。 |
| 37 | + - 创建**Client**连接时,首先建立Tcp连接并获得**TSIService.Client**的实例,然后通过**openSessionAsync()**函数为该客户端开启一个新的线程,开启成功后获得线程码**SessionId**与状态码**StatementId**,进而创建一个**Client**连接。 |
| 38 | + - 添加**Client**连接时,调用**ConcurrentClientQueue**的**Add()**函数,代码如下: |
| 39 | + |
| 40 | +```c# |
| 41 | +public void Add(Client client) |
| 42 | +{ |
| 43 | + Monitor.Enter(ClientQueue); |
| 44 | + ClientQueue.Enqueue(client); |
| 45 | + Monitor.Pulse(ClientQueue); |
| 46 | + Monitor.Exit(ClientQueue); |
| 47 | +} |
| 48 | +``` |
| 49 | + |
| 50 | +> 通过System.Threading.**Monitor**类实现多线程对**ConcurrentQueue**的同步访问,以确保数据的安全性。 |
| 51 | +
|
| 52 | +- #### 获取空闲连接 |
| 53 | + |
| 54 | +当请求发生时,系统会在**ConcurrentClientQueue**中寻找一个空闲的**Client**连接,即调用 **ConcurrentClientQueue** 的**Take()**函数,代码如下: |
| 55 | + |
| 56 | +```c# |
| 57 | +public Client Take() |
| 58 | +{ |
| 59 | + Monitor.Enter(ClientQueue); |
| 60 | + if (ClientQueue.IsEmpty) |
| 61 | + { |
| 62 | + Monitor.Wait(ClientQueue); |
| 63 | + } |
| 64 | + ClientQueue.TryDequeue(out var client); |
| 65 | + Monitor.Exit(ClientQueue); |
| 66 | + return client; |
| 67 | +} |
| 68 | +``` |
| 69 | + |
| 70 | +如果请求时**ConcurrentClientQueue**中没有空闲**Client**连接时,系统会调用 Monitor 类中的 **Wait()** 方法让线程等待,直到队列不为空时,弹出空闲**Client**连接。 |
| 71 | + |
| 72 | +- #### 执行操作 |
| 73 | + |
| 74 | +获取到空闲Client连接后,系统便在此连接上进行数据操作,示例如下: |
| 75 | + |
| 76 | +```c# |
| 77 | +public async Task<int> InsertRecordAsync(string deviceId, RowRecord record) |
| 78 | +{ |
| 79 | + var client = _clients.Take(); // 获取空闲的Client连接 |
| 80 | + var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(), |
| 81 | + record.Timestamps); |
| 82 | + try |
| 83 | + { |
| 84 | + var status = await client.ServiceClient.insertRecordAsync(req); |
| 85 | + if (_debugMode) |
| 86 | + { |
| 87 | + _logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message); |
| 88 | + } |
| 89 | + return _utilFunctions.verify_success(status, SuccessCode); |
| 90 | + } |
| 91 | + |
| 92 | + catch (TException e) |
| 93 | + { |
| 94 | + throw new TException("Record insertion failed", e); |
| 95 | + } |
| 96 | + |
| 97 | + finally |
| 98 | + { |
| 99 | + _clients.Add(client); |
| 100 | + } |
| 101 | +} |
| 102 | +``` |
| 103 | + |
| 104 | +- #### 回收Client连接 |
| 105 | + |
| 106 | +当操作结束后,系统会回收该空闲连接,通过 **ConcurrentClientQueue.Add()** 函数将该连接重新加入队列,并在在添加后会通过 **Pulse()** 方法通知其他处于等待状态的线程。考虑到操作过程中可能出现异常,所有操作都被放在try-catch块中,即使捕获到了异常也会将该Client连接放回队列中,防止连接丢失。 |
| 107 | + |
| 108 | +### 对比评测 |
| 109 | + |
| 110 | +#### 本地测试 |
| 111 | + |
| 112 | +> ##### 测试环境: |
| 113 | +> |
| 114 | +> - 操作系统:macOS |
| 115 | +> - 处理器:2.3GHz 八核 Intel Core i9 |
| 116 | +> - IoTDB版本:0.12.0 |
| 117 | +
|
| 118 | +<img src="assets/1.png" alt="1" style="zoom:67%;" /> |
| 119 | + |
| 120 | +#### 远端测试 |
| 121 | + |
| 122 | +> ##### 测试环境: |
| 123 | +> |
| 124 | +> - 本地: |
| 125 | +> - 操作系统:macOS |
| 126 | +> - 处理器:2.3GHz 八核 Intel Core i9 |
| 127 | +> - 服务器: |
| 128 | +> - IoTDB版本:0.12.1 |
| 129 | +
|
| 130 | +<img src="assets/2.png" alt="2" style="zoom:67%;" /> |
| 131 | + |
| 132 | +<img src="assets/3.png" alt="3" style="zoom:67%;" /> |
| 133 | + |
0 commit comments