diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 7c4e5e025a7..7a7b5f84368 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -389,6 +389,9 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina int endpos = currentPos + msgLen; // alignment end position int extraAppendSize = UNSAFE_PAGE_SIZE - endpos % UNSAFE_PAGE_SIZE; + if (extraAppendSize == UNSAFE_PAGE_SIZE) { + extraAppendSize = 0; + } int actualAppendSize = msgLen + extraAppendSize; this.fileChannel.position(currentPos); diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index cd5e9f44807..390dec9f98e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1699,9 +1699,6 @@ public String getServiceName() { public void run() { setState(AbstractStateService.START); TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); - //Mark different rounds - boolean isRound = true; - Map avoidDeleteLose = new HashMap<>(); while (!this.isStopped()) { try { setState(AbstractStateService.WAITING); @@ -1718,18 +1715,9 @@ public void run() { MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy()); if (null != msgExt) { if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) { - //Clearing is performed once in each round. - //The deletion message is received first and the common message is received once - if (!isRound) { - isRound = true; - for (MessageExt messageExt : avoidDeleteLose.values()) { - addMetric(messageExt, 1); - } - avoidDeleteLose.clear(); - } if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) { - - avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt); + //Execute metric plus one for messages that fail to be deleted + addMetric(msgExt, 1); tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)); } tr.idempotentRelease(); @@ -1739,13 +1727,9 @@ public void run() { if (null == uniqueKey) { LOGGER.warn("No uniqueKey for msg:{}", msgExt); } - //Mark ready for next round - if (isRound) { - isRound = false; - } - if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 - && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, storeConfig.isAppendTopicForTimerDeleteKey()))) { - avoidDeleteLose.remove(uniqueKey); + if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, storeConfig.isAppendTopicForTimerDeleteKey()))) { + //Normally, it cancels out with the +1 above + addMetric(msgExt, -1); doRes = true; tr.idempotentRelease(); perfCounterTicks.getCounter("dequeue_delete").flow(1);