Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ public boolean initializeMessageStore() {
}
if (messageStoreConfig.isTransRocksDBEnable()) {
this.transMessageRocksDBStore = new TransMessageRocksDBStore(messageStore, brokerStatsManager, new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.messageStore.setTransRocksDBStore(transMessageRocksDBStore);
this.messageStore.setTransMessageRocksDBStore(transMessageRocksDBStore);
}
} catch (Exception e) {
result = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private void deletePrepareMessage(OperationResult result) {
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage);
} else if (this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) {
this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage);
this.brokerController.getMessageStore().getTransMessageRocksDBStore().deletePrepareMessage(prepareMessage);
} else {
LOGGER.warn("deletePrepareMessage error, topic of half message is: {}, transRocksDBEnable: {}", halfTopic, this.brokerController.getMessageStoreConfig().isTransRocksDBEnable());
}
Expand Down Expand Up @@ -287,8 +287,8 @@ private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
String checkTimes = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransRocksDBStore()) {
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
if (StringUtils.isEmpty(checkTimes) && this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null != this.brokerController.getMessageStore().getTransMessageRocksDBStore()) {
Integer checkTimesRocksDB = this.brokerController.getMessageStore().getTransMessageRocksDBStore().getCheckTimes(msgInner.getTopic(), msgInner.getTransactionId(), msgExt.getCommitLogOffset());
if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) {
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTimesRocksDB));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TransactionalMessageRocksDBService {

public TransactionalMessageRocksDBService(final MessageStore messageStore, final BrokerController brokerController) {
this.messageStore = messageStore;
this.transMessageRocksDBStore = messageStore.getTransRocksDBStore();
this.transMessageRocksDBStore = messageStore.getTransMessageRocksDBStore();
this.messageRocksDBStorage = transMessageRocksDBStore.getMessageRocksDBStorage();
this.brokerController = brokerController;
}
Expand Down
8 changes: 4 additions & 4 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public class CommitLog implements Swappable {

public CommitLog(final DefaultMessageStore messageStore) {
String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
RunningFlags runningFlags = messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
? messageStore.getRunningFlags() : null;

if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
Expand Down Expand Up @@ -927,8 +927,8 @@ private boolean isMappedFileMatchedRecover(final MappedFile mappedFile,

private boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, boolean recoverNormally) throws RocksDBException {
boolean result = this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally);
if (null != this.defaultMessageStore.getTransRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
result = result && this.defaultMessageStore.getTransRocksDBStore().isMappedFileMatchedRecover(phyOffset);
if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && !defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable()) {
result = result && this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
}
if (null != this.defaultMessageStore.getIndexRocksDBStore() && defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) {
result = result && this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,12 +1080,12 @@ public TimerMessageStore getTimerMessageStore() {
}

@Override
public TimerMessageRocksDBStore getTimerRocksDBStore() {
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return this.timerMessageRocksDBStore;
}

@Override
public TransMessageRocksDBStore getTransRocksDBStore() {
public TransMessageRocksDBStore getTransMessageRocksDBStore() {
return this.transMessageRocksDBStore;
}

Expand All @@ -1100,7 +1100,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
}

@Override
public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
this.transMessageRocksDBStore = transMessageRocksDBStore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ CompletableFuture<GetMessageResult> getMessageAsync(final String group, final St

TimerMessageStore getTimerMessageStore();

TimerMessageRocksDBStore getTimerRocksDBStore();
TimerMessageRocksDBStore getTimerMessageRocksDBStore();

TransMessageRocksDBStore getTransRocksDBStore();
TransMessageRocksDBStore getTransMessageRocksDBStore();

void setTimerMessageStore(TimerMessageStore timerMessageStore);

void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRocksDBStore);

void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore);
void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore);

/**
* Get the offset of the message in the commit log, which is also known as physical offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void recover() {
}
currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset());
if (storeConfig.isTimerRocksDBEnable()) {
long commitOffsetInRocksDB = messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
long commitOffsetInRocksDB = messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
LOGGER.info("recover time wheel, currQueueOffset: {}, commitOffsetInRocksDB: {}", currQueueOffset, commitOffsetInRocksDB);
currQueueOffset = Math.max(currQueueOffset, commitOffsetInRocksDB);
}
Expand Down Expand Up @@ -2087,12 +2087,12 @@ private void recallToTimeline(long delayTime, long offsetPy, int sizePy, Message
LOGGER.error("recallToTimeline param error, delayTime: {}, offsetPy: {}, sizePy: {}, messageExt: {}", delayTime, offsetPy, sizePy, messageExt);
return;
}
if (null == messageStore.getTimerRocksDBStore() || null == messageStore.getTimerRocksDBStore().getTimeline()) {
if (null == messageStore.getTimerMessageRocksDBStore() || null == messageStore.getTimerMessageRocksDBStore().getTimeline()) {
LOGGER.error("recallToTimeline error, timerRocksDBStore is null or timeline is null");
return;
}
try {
messageStore.getTimerRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt);
messageStore.getTimerMessageRocksDBStore().getTimeline().putDeleteRecord(delayTime, messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(), messageExt);
} catch (Exception e) {
LOGGER.error("recallToTimeline error: {}", e.getMessage());
}
Expand All @@ -2109,7 +2109,7 @@ public boolean restart() {
LOGGER.info("restart TimerMessageStore has been running");
return true;
}
long commitOffsetRocksDB = this.messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
long commitOffsetRocksDB = this.messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
long commitOffsetFile = this.messageStore.getTimerMessageStore().getCommitQueueOffset();
long maxCommitOffset = Math.max(commitOffsetFile, commitOffsetRocksDB);
currQueueOffset = maxCommitOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,12 @@ public long getMinOffsetInQueue(String topic, int queueId) {
}

@Override
public TimerMessageRocksDBStore getTimerRocksDBStore() {
public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return timerMessageRocksDBStore;
}

@Override
public TransMessageRocksDBStore getTransRocksDBStore() {
public TransMessageRocksDBStore getTransMessageRocksDBStore() {
return transMessageRocksDBStore;
}

Expand All @@ -329,7 +329,7 @@ public void setTimerMessageRocksDBStore(TimerMessageRocksDBStore timerMessageRoc
}

@Override
public void setTransRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
public void setTransMessageRocksDBStore(TransMessageRocksDBStore transMessageRocksDBStore) {
this.transMessageRocksDBStore = transMessageRocksDBStore;
}

Expand Down
Loading