Skip to content

Commit 2eb4c27

Browse files
committed
feat(redis):优化Redis会话命令写入机制- 引入WriteBuffer依赖以支持高效数据写入
-重构RedisSession构造函数,接受WriteBuffer参数 - 新增writeCommand方法处理命令写入和并发控制 - 实现基于信号量的命令队列处理机制 - 添加Tuple内部类用于存储命令和Future的组合 - 移除旧的计数器逻辑,简化会话管理流程- 在RedisMessageProcessor中调用flush确保数据及时发送
1 parent 856e969 commit 2eb4c27

File tree

3 files changed

+55
-24
lines changed

3 files changed

+55
-24
lines changed

src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void process0(AioSession session, RESP msg) {
9696
} else {
9797
future.complete(msg);
9898
}
99+
redisSession.flush();
99100
}
100101

101102
/**
@@ -114,7 +115,7 @@ public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, T
114115
// 处理新建会话事件
115116
case NEW_SESSION: {
116117
// 为新会话创建并绑定Redis会话对象
117-
RedisSession redisSession = new RedisSession();
118+
RedisSession redisSession = new RedisSession(session.writeBuffer());
118119
session.setAttachment(redisSession);
119120
}
120121
break;

src/main/java/tech/smartboot/redisun/RedisSession.java

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package tech.smartboot.redisun;
22

3+
import org.smartboot.socket.transport.WriteBuffer;
34
import tech.smartboot.redisun.resp.RESP;
45

6+
import java.io.IOException;
57
import java.util.concurrent.CompletableFuture;
68
import java.util.concurrent.ConcurrentLinkedQueue;
9+
import java.util.concurrent.Semaphore;
710

811
/**
912
* Redis会话管理类
@@ -36,14 +39,14 @@ final class RedisSession {
3639

3740
private int offerCount = 0;
3841
private int pollCount = 0;
42+
private final WriteBuffer writeBuffer;
3943

40-
public int incrOfferCount() {
41-
return ++offerCount;
44+
public RedisSession(WriteBuffer writeBuffer) {
45+
this.writeBuffer = writeBuffer;
4246
}
4347

44-
public int getOfferCount() {
45-
return offerCount;
46-
}
48+
private final ConcurrentLinkedQueue<Tuple> commandQueue = new ConcurrentLinkedQueue<>();
49+
private final Semaphore semaphore = new Semaphore(1);
4750

4851
/**
4952
* 获取正在解码的响应对象
@@ -68,12 +71,43 @@ public CompletableFuture<RESP> poll() {
6871
return pipeline.poll();
6972
}
7073

71-
public void offer(CompletableFuture<RESP> future) {
72-
pipeline.offer(future);
74+
public void writeCommand(CompletableFuture<RESP> future, Command command) throws IOException {
75+
if (semaphore.tryAcquire()) {
76+
offerCount++;
77+
pipeline.offer(future);
78+
command.writeTo(writeBuffer);
79+
Tuple tuple;
80+
while ((tuple = commandQueue.poll()) != null) {
81+
pipeline.offer(tuple.future);
82+
tuple.command.writeTo(writeBuffer);
83+
}
84+
writeBuffer.flush();
85+
semaphore.release();
86+
// flush();
87+
} else {
88+
offerCount++;
89+
commandQueue.offer(new Tuple(future, command));
90+
}
7391
}
7492

75-
public int getPollCount() {
76-
return pollCount;
93+
public void flush() {
94+
if (commandQueue.isEmpty() || !semaphore.tryAcquire()) {
95+
return;
96+
}
97+
98+
try {
99+
Tuple tuple;
100+
while ((tuple = commandQueue.poll()) != null) {
101+
offerCount++;
102+
pipeline.offer(tuple.future);
103+
tuple.command.writeTo(writeBuffer);
104+
}
105+
} catch (Throwable e) {
106+
throw new RedisunException(e);
107+
} finally {
108+
semaphore.release();
109+
}
110+
77111
}
78112

79113
int load() {
@@ -82,4 +116,13 @@ int load() {
82116
return size >= 0 ? size : -size;
83117
}
84118

119+
private static class Tuple {
120+
private final CompletableFuture<RESP> future;
121+
private final Command command;
122+
123+
public Tuple(CompletableFuture<RESP> future, Command command) {
124+
this.future = future;
125+
this.command = command;
126+
}
127+
}
85128
}

src/main/java/tech/smartboot/redisun/Redisun.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -541,20 +541,7 @@ private CompletableFuture<RESP> execute(Command command) {
541541
future.thenRun(() -> multiplexClient.reuse(finalClient));
542542
}
543543
}
544-
545-
int offerCount = redisSession.incrOfferCount();
546-
int pollCount = redisSession.getPollCount();
547-
548-
synchronized (client) {
549-
// 设置当前命令的future
550-
redisSession.offer(future);
551-
command.writeTo(session.writeBuffer());
552-
}
553-
554-
// 刷新缓冲区,发送数据
555-
if (offerCount == redisSession.getOfferCount() && pollCount == redisSession.getPollCount()) {
556-
session.writeBuffer().flush();
557-
}
544+
redisSession.writeCommand(future, command);
558545
} catch (Throwable e) {
559546
// 发生异常时完成future
560547
if (client != null) {

0 commit comments

Comments
 (0)