Skip to content

Commit 217b3af

Browse files
committed
feat: add session_pool_zh.md
1 parent e0934c2 commit 217b3af

File tree

4 files changed

+133
-0
lines changed

4 files changed

+133
-0
lines changed

docs/assets/1.png

120 KB
Loading

docs/assets/2.png

117 KB
Loading

docs/assets/3.png

115 KB
Loading

docs/session_pool_zh.md

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# IoTDB-CSharp客户端的定制化SessionPool机制
2+
3+
### 主要参数
4+
5+
- `_username`:用户名
6+
- `_password`:密码
7+
- `_zoneId`:时区
8+
- `_host`:主机ip
9+
- `_port`:端口号
10+
- `_fetchSize`:单次请求数据大小
11+
- `_poolSize`:线程池大小(默认为4)
12+
13+
### 数据结构
14+
15+
- #### Client
16+
17+
该数据结构对**客户端连接**进行了封装,维护一个客户端实例**TSIService.Client、**线程码**SessionId**与状态码**StatementId**以及分帧传输流**TFramedTransport。**
18+
19+
```c#
20+
public Client(TSIService.Client client, long sessionId, long statementId, TFramedTransport transport)
21+
{
22+
ServiceClient = client;
23+
SessionId = sessionId;
24+
StatementId = statementId;
25+
Transport = transport;
26+
}
27+
```
28+
29+
- #### ConcurrentClientQueue
30+
31+
该数据结构封装了一个**高效的线程安全队列**,用于维护客户端与服务器的多个Client连接。
32+
33+
### Client-Server交互全流程
34+
35+
- #### 建立Client连接
36+
- 用户创建一个**SessionPool**并调用**Open()**函数后,系统会创建一个**ConcurrentClientQueue**实例,并向其中创建并添加 _poolSize 个**Client**连接(客户端连接)。
37+
- 创建**Client**连接时,首先建立Tcp连接并获得**TSIService.Client**的实例,然后通过**openSessionAsync()**函数为该客户端开启一个新的线程,开启成功后获得线程码**SessionId**与状态码**StatementId**,进而创建一个**Client**连接。
38+
- 添加**Client**连接时,调用**ConcurrentClientQueue****Add()**函数,代码如下:
39+
40+
```c#
41+
public void Add(Client client)
42+
{
43+
Monitor.Enter(ClientQueue);
44+
ClientQueue.Enqueue(client);
45+
Monitor.Pulse(ClientQueue);
46+
Monitor.Exit(ClientQueue);
47+
}
48+
```
49+
50+
> 通过System.Threading.**Monitor**类实现多线程对**ConcurrentQueue**的同步访问,以确保数据的安全性。
51+
52+
- #### 获取空闲连接
53+
54+
当请求发生时,系统会在**ConcurrentClientQueue**中寻找一个空闲的**Client**连接,即调用 **ConcurrentClientQueue****Take()**函数,代码如下:
55+
56+
```c#
57+
public Client Take()
58+
{
59+
Monitor.Enter(ClientQueue);
60+
if (ClientQueue.IsEmpty)
61+
{
62+
Monitor.Wait(ClientQueue);
63+
}
64+
ClientQueue.TryDequeue(out var client);
65+
Monitor.Exit(ClientQueue);
66+
return client;
67+
}
68+
```
69+
70+
如果请求时**ConcurrentClientQueue**中没有空闲**Client**连接时,系统会调用 Monitor 类中的 **Wait()** 方法让线程等待,直到队列不为空时,弹出空闲**Client**连接。
71+
72+
- #### 执行操作
73+
74+
获取到空闲Client连接后,系统便在此连接上进行数据操作,示例如下:
75+
76+
```c#
77+
public async Task<int> InsertRecordAsync(string deviceId, RowRecord record)
78+
{
79+
var client = _clients.Take(); // 获取空闲的Client连接
80+
var req = new TSInsertRecordReq(client.SessionId, deviceId, record.Measurements, record.ToBytes(),
81+
record.Timestamps);
82+
try
83+
{
84+
var status = await client.ServiceClient.insertRecordAsync(req);
85+
if (_debugMode)
86+
{
87+
_logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message);
88+
}
89+
return _utilFunctions.verify_success(status, SuccessCode);
90+
}
91+
92+
catch (TException e)
93+
{
94+
throw new TException("Record insertion failed", e);
95+
}
96+
97+
finally
98+
{
99+
_clients.Add(client);
100+
}
101+
}
102+
```
103+
104+
- #### 回收Client连接
105+
106+
当操作结束后,系统会回收该空闲连接,通过 **ConcurrentClientQueue.Add()** 函数将该连接重新加入队列,并在在添加后会通过 **Pulse()** 方法通知其他处于等待状态的线程。考虑到操作过程中可能出现异常,所有操作都被放在try-catch块中,即使捕获到了异常也会将该Client连接放回队列中,防止连接丢失。
107+
108+
### 对比评测
109+
110+
#### 本地测试
111+
112+
> ##### 测试环境:
113+
>
114+
> - 操作系统:macOS
115+
> - 处理器:2.3GHz 八核 Intel Core i9
116+
> - IoTDB版本:0.12.0
117+
118+
<img src="assets/1.png" alt="1" style="zoom:67%;" />
119+
120+
#### 远端测试
121+
122+
> ##### 测试环境:
123+
>
124+
> - 本地:
125+
> - 操作系统:macOS
126+
> - 处理器:2.3GHz 八核 Intel Core i9
127+
> - 服务器:
128+
> - IoTDB版本:0.12.1
129+
130+
<img src="assets/2.png" alt="2" style="zoom:67%;" />
131+
132+
<img src="assets/3.png" alt="3" style="zoom:67%;" />
133+

0 commit comments

Comments
 (0)