A light paxos implement
这是对Base-Paxos\Multi-Paxos协议的简单实现,以独立基础库的形式给使用者提供了简单易用的接口,使用者可以基于litpaxos实现自定义的分布式服务。
特性:
- 提供简单易用的接口给使用者,使用者可以实现自定义的分布式服务
- 实现了Base-Paxos的完整协议流程,实现了Instance对齐
- 提供分组功能,一个节点上可以添加多个分组(Group),各个分组之间相对独立
- 实现了Multi-Paxos中同一Leader连续提交时,优化到只有accept流程
- 实现了乱序/并行提交
TODO:
- Acceptor和Prpposer状态日志的持久化,Instance日志的持久化,实现节点崩溃后恢复
- 乱序提交中使用滑动窗口机制保证Instance乱序到达情况下状态机的顺序输入
整体的设计参考微信PhxPaxos,并做了一些精简。
Acceptor,Proposer,Learner三个角色运行在一台机器的同一个进程中,Learner学习的结果输入到状态机,NetWork部分作为一个相对独立的部分运行。
使用LitPaxos只需要完成下面两个步骤:
- 实现至少一个状态机(StateMachine)
- 实现Paxos服务端
- 实现Paxos客户端
下面以一个简单的KV存储为例说明如何使用LitPaxos,完整代码在example目录下。
状态机需要继承自org.littleneko.sm.StateMachine
并实现execute
方法
一个简单的状态机如下所示:
public class KVStateMachine extends StateMachine {
private Map<String, String> kv = new HashMap<>();
@Override
public void execute(String value) {
Gson gson = new Gson();
KVMessage kvMessage = gson.fromJson(value, KVMessage.class);
switch (kvMessage.getTypeEnum()) {
case PUT:
kv.put(kvMessage.getKey(), kvMessage.getValue());
System.out.println("Put: " + kvMessage.getKey() + "-> " + kvMessage.getValue());
break;
case DEL:
kv.remove(kvMessage.getKey());
System.out.println("Del: " + kvMessage.getKey() + "-> " + kvMessage.getValue());
break;
case GET:
String v = kv.get(kvMessage.getKey());
System.out.println("Get: " + kvMessage.getKey() + "-> " + v);
break;
}
}
}
public class PaxosKVServer {
private Node node = new Node();
private String confFile;
private ServerConf serverConf;
private Set<Conn> conns = new HashSet<>();
private LightCommServer commServer;
public PaxosKVServer(String confFile) {
this.confFile = confFile;
}
/**
*
*/
public void init() {
Gson gson = new Gson();
serverConf = gson.fromJson(FileUtils.readFromFile(confFile), ServerConf.class);
ServerParam param = new ServerParam(serverConf.getRequestIP(), serverConf.getRequestPort());
param.setBacklog(128);
param.setOnAccept(conn -> conns.add(conn));
param.setOnRead((conn, msg) -> {
// 通过Node的commit方法提交请求
node.commit(0, new String(msg));
System.out.println("Recv: " + new String(msg));
});
param.setOnClose(conn -> conns.remove(conn));
param.setOnReadError((conn, err) -> System.out.println(err.getMessage()));
param.setOnWriteError((conn, err) -> System.out.println(err.getMessage()));
param.setOnAcceptError(err -> System.out.println(err.getMessage()));
commServer = new LightCommServer(param, 4);
}
/**
* start kv server
*/
public void startServer() {
// start comm server to recv client request
try {
commServer.start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// Init options
Options options = new Options(1);
options.setPaxosConf(serverConf.getPaxosConf());
options.setNodes(serverConf.getNodes());
options.setMyNodeID(serverConf.getMyNodeID());
// add sm
GroupSMInfo groupSMInfo = new GroupSMInfo(0);
groupSMInfo.addSM(new KVStateMachine());
options.addGroupSMInfo(groupSMInfo);
// Run Node
node.runNode(options);
}
public static void main(String[] args) {
String confFile = Paths.get(System.getProperty("user.dir"),"/example/conf/server3.json").toString();
PaxosKVServer server = new PaxosKVServer(confFile);
server.init();
server.startServer();
}
}
其中类org.littleneko.node.PaxosConf
保存了下面三个信息:
// 用于Proposer的超时定时器, 毫秒
@SerializedName("commTimeOut")
private int commTimeOut;
// Learner的学习时间间隔,毫秒
@SerializedName("learnInterval")
private int learnInterval;
// 日志持久化存储的位置
@SerializedName("logFile")
private String logFile;
类org.littleneko.node.NodeInfo
保存了所有Paxos节点的ID,IP和端口。
KVClient通过TCP Socket向KVServer发送请求:
public class PaxosKVClient {
public static void main(String[] args) {
ClientParam param = new ClientParam();
param.setOnConnect(conn -> {
new Thread(() -> {
KVMessage kvMessage1 = new KVMessage(KVMessage.TypeEnum.PUT, "key1", "value1");
KVMessage kvMessage2 = new KVMessage(KVMessage.TypeEnum.PUT, "key2", "value2");
KVMessage kvMessage3 = new KVMessage(KVMessage.TypeEnum.PUT, "key3", "value3");
try {
conn.write(kvMessage1.getJson().getBytes());
conn.write(kvMessage2.getJson().getBytes());
conn.write(kvMessage3.getJson().getBytes());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
});
param.setOnRead((conn, msg) -> System.out.println("Receive " + new String(msg)));
param.setOnClose(conn -> System.out.println("Server close!"));
try {
LightCommClient client = new LightCommClient(4);
client.connect("127.0.0.1", 1201, param);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
提交请求的value为字符串,该字符串最终会作为参数传递到StateMachine
中的execute
方法中,用户可以自定义value的格式。
这里我们定义了类KVMessage
并序列化为Json作为提交的Value类型。