From fe5c17cb8b4196e13dbfc3230e0f7dece9c9f6a7 Mon Sep 17 00:00:00 2001 From: Drizzle <464473306@qq.com> Date: Fri, 19 Dec 2025 16:03:49 +0800 Subject: [PATCH] Optimized the function naming (#9935) * add isWakeCommitWhenPutMessage for AIO * optimzie the Function name Change-Id: Id91e3eb9c4488fb9804fb2c105082657e66c44c0 * optimized the function naming Change-Id: Ifc482f91220ff328e5c5425a57a04ac627e8d469 --------- Co-authored-by: drizzle.zk --- .../java/org/apache/rocketmq/broker/BrokerController.java | 2 +- .../broker/processor/EndTransactionProcessor.java | 6 +++--- .../rocksdb/TransactionalMessageRocksDBService.java | 2 +- .../main/java/org/apache/rocketmq/store/CommitLog.java | 8 ++++---- .../org/apache/rocketmq/store/DefaultMessageStore.java | 6 +++--- .../main/java/org/apache/rocketmq/store/MessageStore.java | 6 +++--- .../apache/rocketmq/store/timer/TimerMessageStore.java | 8 ++++---- .../apache/rocketmq/tieredstore/TieredMessageStore.java | 6 +++--- 8 files changed, 22 insertions(+), 22 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index a09e2173b66..efc2949364d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -878,7 +878,7 @@ public boolean initializeMessageStore() { } if (messageStoreConfig.isTransRocksDBEnable()) { this.transMessageRocksDBStore = new TransMessageRocksDBStore(messageStore, brokerStatsManager, new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); - this.messageStore.setTransRocksDBStore(transMessageRocksDBStore); + this.messageStore.setTransMessageRocksDBStore(transMessageRocksDBStore); } } catch (Exception e) { result = false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 7298e5da58a..f90b5342045 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -203,7 +203,7 @@ private void deletePrepareMessage(OperationResult result) { if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage); } else if (this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) { - this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage); + this.brokerController.getMessageStore().getTransMessageRocksDBStore().deletePrepareMessage(prepareMessage); } else { LOGGER.warn("deletePrepareMessage error, topic of half message is: {}, transRocksDBEnable: {}", halfTopic, this.brokerController.getMessageStoreConfig().isTransRocksDBEnable()); } @@ -287,8 +287,8 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) { long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); String checkTimes = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); - if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransRocksDBStore()) { - Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset()); + if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransMessageRocksDBStore()) { + Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransMessageRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset()); if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) { msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTimesRocksDB)); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java index 389c75e4267..1fc38eb3d6d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java @@ -58,7 +58,7 @@ public class TransactionalMessageRocksDBService { public TransactionalMessageRocksDBService(final MessageStore messageStore, final BrokerController brokerController) { this.messageStore = messageStore; - this.transMessageRocksDBStore = messageStore.getTransRocksDBStore(); + this.transMessageRocksDBStore = messageStore.getTransMessageRocksDBStore(); this.messageRocksDBStorage = transMessageRocksDBStore.getMessageRocksDBStorage(); this.brokerController = brokerController; } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 459f2074b24..286f31cd4a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -111,9 +111,9 @@ public class CommitLog implements Swappable { public CommitLog(final DefaultMessageStore messageStore) { String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog(); - RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush() + RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush() ? messageStore.getRunningFlags() : null; - + if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(), messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), @@ -927,8 +927,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile, private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, boolean recoverNormally) throws RocksDBException { boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally); - if (null != this.defaultMessageStore.getTransRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) { - result = result && this.defaultMessageStore.getTransRocksDBStore().isMappedFileMatchedRecover(phyOffset); + if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) { + result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset); } if (null != this.defaultMessageStore.getIndexRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) { result = result && this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7848b76016d..aae6d50da97 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1080,12 +1080,12 @@ public TimerMessageStore getTimerMessageStore() { } @Override - public TimerMessageRocksDBStore getTimerRocksDBStore() { + public TimerMessageRocksDBStore getTimerMessageRocksDBStore() { return this.timerMessageRocksDBStore; } @Override - public TransMessageRocksDBStore getTransRocksDBStore() { + public TransMessageRocksDBStore getTransMessageRocksDBStore() { return this.transMessageRocksDBStore; } @@ -1100,7 +1100,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc } @Override - public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) { + public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) { this.transMessageRocksDBStore = transMessageRocksDBStore; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index b297ee542f3..2490bb5b2fb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -210,15 +210,15 @@ CompletableFuture getMessageAsync(final String group, final St TimerMessageStore getTimerMessageStore(); - TimerMessageRocksDBStore getTimerRocksDBStore(); + TimerMessageRocksDBStore getTimerMessageRocksDBStore(); - TransMessageRocksDBStore getTransRocksDBStore(); + TransMessageRocksDBStore getTransMessageRocksDBStore(); void setTimerMessageStore(TimerMessageStore timerMessageStore); void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore); - void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore); + void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore); /** * Get the offset of the message in the commit log, which is also known as physical offset. diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index 53999e72c4c..a32b4a3f217 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -318,7 +318,7 @@ public void recover() { } currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset()); if (storeConfig.isTimerRocksDBEnable()) { - long commitOffsetInRocksDB = messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB(); + long commitOffsetInRocksDB = messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB(); LOGGER.info("recover time wheel, currQueueOffset: {}, commitOffsetInRocksDB: {}", currQueueOffset, commitOffsetInRocksDB); currQueueOffset = Math.max(currQueueOffset, commitOffsetInRocksDB); } @@ -2087,12 +2087,12 @@ private void recallToTimeline(long delayTime, long offsetPy, int sizePy, Message LOGGER.error("recallToTimeline param error, delayTime: {}, offsetPy: {}, sizePy: {}, messageExt: {}", delayTime, offsetPy, sizePy, messageExt); return; } - if (null == messageStore.getTimerRocksDBStore() || null == messageStore.getTimerRocksDBStore().getTimeline()) { + if (null == messageStore.getTimerMessageRocksDBStore() || null == messageStore.getTimerMessageRocksDBStore().getTimeline()) { LOGGER.error("recallToTimeline error, timerRocksDBStore is null or timeline is null"); return; } try { - messageStore.getTimerRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt); + messageStore.getTimerMessageRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt); } catch (Exception e) { LOGGER.error("recallToTimeline error: {}", e.getMessage()); } @@ -2109,7 +2109,7 @@ public boolean restart() { LOGGER.info("restart TimerMessageStore has been running"); return true; } - long commitOffsetRocksDB = this.messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB(); + long commitOffsetRocksDB = this.messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB(); long commitOffsetFile = this.messageStore.getTimerMessageStore().getCommitQueueOffset(); long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB); currQueueOffset = maxCommitOffset; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 3e84f201227..b30f868d194 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -314,12 +314,12 @@ public long getMinOffsetInQueue(String topic, int queueId) { } @Override - public TimerMessageRocksDBStore getTimerRocksDBStore() { + public TimerMessageRocksDBStore getTimerMessageRocksDBStore() { return timerMessageRocksDBStore; } @Override - public TransMessageRocksDBStore getTransRocksDBStore() { + public TransMessageRocksDBStore getTransMessageRocksDBStore() { return transMessageRocksDBStore; } @@ -329,7 +329,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc } @Override - public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) { + public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) { this.transMessageRocksDBStore = transMessageRocksDBStore; }