Skip to content

Commit 981f9a4

Browse files
committed
refactor(session):重构Redis会话命令写入逻辑
- 移除RedisSession中的WriteBuffer依赖 - 简化命令队列处理流程 - 移除信号量同步机制 -优化数据刷写逻辑 - 移除内部Tuple类定义 - 调整会话创建初始化参数
1 parent a943c9a commit 981f9a4

File tree

3 files changed

+24
-53
lines changed

3 files changed

+24
-53
lines changed

src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public void process0(AioSession session, RESP msg) {
9696
} else {
9797
future.complete(msg);
9898
}
99-
redisSession.flush();
10099
}
101100

102101
/**
@@ -115,7 +114,7 @@ public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, T
115114
// 处理新建会话事件
116115
case NEW_SESSION: {
117116
// 为新会话创建并绑定Redis会话对象
118-
RedisSession redisSession = new RedisSession(session.writeBuffer());
117+
RedisSession redisSession = new RedisSession();
119118
session.setAttachment(redisSession);
120119
}
121120
break;
Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
package tech.smartboot.redisun;
22

3-
import org.smartboot.socket.transport.WriteBuffer;
43
import tech.smartboot.redisun.resp.RESP;
54

6-
import java.io.IOException;
75
import java.util.concurrent.CompletableFuture;
86
import java.util.concurrent.ConcurrentLinkedQueue;
9-
import java.util.concurrent.Semaphore;
107

118
/**
129
* Redis会话管理类
@@ -39,14 +36,14 @@ final class RedisSession {
3936

4037
private int offerCount = 0;
4138
private int pollCount = 0;
42-
private final WriteBuffer writeBuffer;
4339

44-
public RedisSession(WriteBuffer writeBuffer) {
45-
this.writeBuffer = writeBuffer;
40+
public int incrOfferCount() {
41+
return ++offerCount;
4642
}
4743

48-
private final ConcurrentLinkedQueue<Tuple> commandQueue = new ConcurrentLinkedQueue<>();
49-
private final Semaphore semaphore = new Semaphore(1);
44+
public int getOfferCount() {
45+
return offerCount;
46+
}
5047

5148
/**
5249
* 获取正在解码的响应对象
@@ -71,41 +68,12 @@ public CompletableFuture<RESP> poll() {
7168
return pipeline.poll();
7269
}
7370

74-
public void writeCommand(CompletableFuture<RESP> future, Command command) throws IOException {
75-
offerCount++;
76-
if (semaphore.tryAcquire()) {
77-
pipeline.offer(future);
78-
command.writeTo(writeBuffer);
79-
Tuple tuple;
80-
while ((tuple = commandQueue.poll()) != null) {
81-
pipeline.offer(tuple.future);
82-
tuple.command.writeTo(writeBuffer);
83-
}
84-
writeBuffer.flush();
85-
semaphore.release();
86-
flush();
87-
} else {
88-
commandQueue.offer(new Tuple(future, command));
89-
}
71+
public void offer(CompletableFuture<RESP> future) {
72+
pipeline.offer(future);
9073
}
9174

92-
public void flush() {
93-
if (commandQueue.isEmpty() || !semaphore.tryAcquire()) {
94-
return;
95-
}
96-
97-
try {
98-
Tuple tuple;
99-
while ((tuple = commandQueue.poll()) != null) {
100-
pipeline.offer(tuple.future);
101-
tuple.command.writeTo(writeBuffer);
102-
}
103-
} catch (Throwable e) {
104-
throw new RedisunException(e);
105-
} finally {
106-
semaphore.release();
107-
}
108-
75+
public int getPollCount() {
76+
return pollCount;
10977
}
11078

11179
int load() {
@@ -114,13 +82,4 @@ int load() {
11482
return size >= 0 ? size : -size;
11583
}
11684

117-
private static class Tuple {
118-
private final CompletableFuture<RESP> future;
119-
private final Command command;
120-
121-
public Tuple(CompletableFuture<RESP> future, Command command) {
122-
this.future = future;
123-
this.command = command;
124-
}
125-
}
12685
}

src/main/java/tech/smartboot/redisun/Redisun.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,20 @@ private CompletableFuture<RESP> execute(Command command) {
541541
future.thenRun(() -> multiplexClient.reuse(finalClient));
542542
}
543543
}
544-
redisSession.writeCommand(future, command);
544+
545+
int offerCount = redisSession.incrOfferCount();
546+
int pollCount = redisSession.getPollCount();
547+
548+
synchronized (client) {
549+
// 设置当前命令的future
550+
redisSession.offer(future);
551+
command.writeTo(session.writeBuffer());
552+
}
553+
554+
// 刷新缓冲区,发送数据
555+
if (offerCount == redisSession.getOfferCount() && pollCount == redisSession.getPollCount()) {
556+
session.writeBuffer().flush();
557+
}
545558
} catch (Throwable e) {
546559
// 发生异常时完成future
547560
if (client != null) {

0 commit comments

Comments
 (0)