diff --git a/server/src/main/java/com/wuba/wlock/server/communicate/registry/handler/GetPaxosConfigHandler.java b/server/src/main/java/com/wuba/wlock/server/communicate/registry/handler/GetPaxosConfigHandler.java index 0f061a9..f704205 100644 --- a/server/src/main/java/com/wuba/wlock/server/communicate/registry/handler/GetPaxosConfigHandler.java +++ b/server/src/main/java/com/wuba/wlock/server/communicate/registry/handler/GetPaxosConfigHandler.java @@ -25,12 +25,15 @@ import com.wuba.wlock.server.communicate.retrans.RetransServerManager; import com.wuba.wlock.server.config.PaxosConfig; import com.wuba.wlock.server.config.ServerConfig; +import com.wuba.wlock.server.wpaxos.SMID; import com.wuba.wlock.server.wpaxos.WpaxosService; +import com.wuba.wpaxos.ProposeResult; import com.wuba.wpaxos.comm.NodeInfo; import com.wuba.wpaxos.comm.Options; import com.wuba.wpaxos.config.PaxosNodeFunctionRet; import com.wuba.wpaxos.config.PaxosTryCommitRet; import com.wuba.wlock.server.exception.ConfigException; +import com.wuba.wpaxos.storemachine.SMCtx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -213,6 +216,10 @@ private boolean addMember(int groupIdx, final Set addNodes) { Thread.sleep(RETRY_DELAY[Math.min(i, RETRY_TIMES)]); } catch (InterruptedException e) { } + + if (addMember == PaxosNodeFunctionRet.Paxos_MembershipOp_NoGid.getRet()) { + noGidInit(groupIdx); + } } else { break; } @@ -221,14 +228,23 @@ private boolean addMember(int groupIdx, final Set addNodes) { logger.error("add member error", e); } if (addMember != PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && addMember != PaxosNodeFunctionRet.Paxos_MembershipOp_Add_NodeExist.getRet()) { - logger.debug("TEST => add member false : result is " + addMember); + logger.error("add member false : result is " + addMember); result = false; } } } + return result; } + private void noGidInit(int groupIdx) { + logger.info("no gid init propose. groupId: {}", groupIdx); + ProposeResult propose = WpaxosService.getInstance().propose(new byte[]{}, groupIdx, new SMCtx(SMID.NULL.getValue(), null)); + if (propose.getResult() != PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet()) { + logger.info("no gid init propose fail. groupId: {} result: {}", groupIdx, propose.getResult()); + } + } + private boolean deleteMember(int groupIdx, final Set deleteNodes) { if (!WpaxosService.getInstance().isIMMaster(groupIdx) && !WpaxosService.getInstance().isNoMaster(groupIdx)) { return true; @@ -246,6 +262,10 @@ private boolean deleteMember(int groupIdx, final Set deleteNodes) { Thread.sleep(RETRY_DELAY[Math.min(i, RETRY_TIMES)]); } catch (InterruptedException e) { } + + if (deleteMember == PaxosNodeFunctionRet.Paxos_MembershipOp_NoGid.getRet()) { + noGidInit(groupIdx); + } } else { break; } @@ -254,6 +274,7 @@ private boolean deleteMember(int groupIdx, final Set deleteNodes) { logger.error("delete member error", e); } if (deleteMember != PaxosTryCommitRet.PaxosTryCommitRet_OK.getRet() && deleteMember != PaxosNodeFunctionRet.Paxos_MembershipOp_Remove_NodeNotExist.getRet()) { + logger.error("delete member false : result is " + deleteMember); result = false; } } diff --git a/server/src/main/java/com/wuba/wlock/server/service/impl/ReentrantLockService.java b/server/src/main/java/com/wuba/wlock/server/service/impl/ReentrantLockService.java index d481be7..68b0371 100644 --- a/server/src/main/java/com/wuba/wlock/server/service/impl/ReentrantLockService.java +++ b/server/src/main/java/com/wuba/wlock/server/service/impl/ReentrantLockService.java @@ -601,8 +601,6 @@ public boolean watchLock(LockContext lockContext, int groupId) { proposeDeleteKey(watchLockReq, groupId, lockOwnerInfo); trySnatchLock(key, groupId, version, watchLockReq.getRegistryKey()); return true; - } else if (version > watchLockReq.getFencingToken()) { - lockNotify.lockNotifyUpdate2(key, new LockOwner(lockOwnerInfo.getIp(), lockOwnerInfo.getThreadId(), lockOwnerInfo.getPid(), version), groupId); } } catch (LockException e) { LOGGER.error("{} watch lock key : {} error.", watchLockReq.getHost(), key, e); diff --git a/server/src/main/java/com/wuba/wlock/server/wpaxos/SMID.java b/server/src/main/java/com/wuba/wlock/server/wpaxos/SMID.java index 5cf9f91..496c383 100644 --- a/server/src/main/java/com/wuba/wlock/server/wpaxos/SMID.java +++ b/server/src/main/java/com/wuba/wlock/server/wpaxos/SMID.java @@ -24,7 +24,8 @@ public enum SMID { WHEEL_TICK(3), MIGRATE_COMMAND(4), MIGRATE_POINT(5), - GROUP_META(6) + GROUP_META(6), + NULL(7); ; private int value; diff --git a/server/src/main/java/com/wuba/wlock/server/wpaxos/WpaxosService.java b/server/src/main/java/com/wuba/wlock/server/wpaxos/WpaxosService.java index b80e5d8..a22d9a8 100644 --- a/server/src/main/java/com/wuba/wlock/server/wpaxos/WpaxosService.java +++ b/server/src/main/java/com/wuba/wlock/server/wpaxos/WpaxosService.java @@ -102,6 +102,7 @@ public void runPaxos() throws Exception { smInfo.getSmList().add(new MigrateCommandSM(i, SMID.MIGRATE_COMMAND.getValue(), false)); smInfo.getSmList().add(new MigrateChangePointSM(i, SMID.MIGRATE_POINT.getValue(), true)); smInfo.getSmList().add(new GroupMetaSM(i, SMID.GROUP_META.getValue(), false)); + smInfo.getSmList().add(new NullStateMachine(i, SMID.NULL.getValue(), false)); options.getGroupSMInfoList().add(smInfo); } diff --git a/server/src/main/java/com/wuba/wlock/server/wpaxos/statemachine/NullStateMachine.java b/server/src/main/java/com/wuba/wlock/server/wpaxos/statemachine/NullStateMachine.java new file mode 100644 index 0000000..f8548db --- /dev/null +++ b/server/src/main/java/com/wuba/wlock/server/wpaxos/statemachine/NullStateMachine.java @@ -0,0 +1,34 @@ +package com.wuba.wlock.server.wpaxos.statemachine; + +import com.wuba.wpaxos.storemachine.SMCtx; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * 用于做一些初始化操作,比如 生成 gid + * @author huguocai + */ +public class NullStateMachine extends AbstractStateMachine { + private static final Logger log = LoggerFactory.getLogger(NullStateMachine.class); + + public NullStateMachine(int groupIdx, int smID, boolean needCheckpoint) { + super(groupIdx, smID, needCheckpoint); + } + + @Override + public boolean execute(int groupIdx, long instanceID, byte[] paxosValue, SMCtx smCtx) { + log.info("execute NullStateMachine groupId: {}, instanceID: {}", groupIdx, instanceID); + return true; + } + + @Override + public byte[] beforePropose(int groupIdx, byte[] sValue) { + return new byte[0]; + } + + @Override + public boolean needCallBeforePropose() { + return false; + } +}