diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 7b63e16696e..0e3ede871c3 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -180,9 +180,15 @@ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int } // determine whether tiered storage path conditions are met - if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK) - && !next.checkInStoreByConsumeOffset(topic, queueId, offset)) { - return true; + if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) { + // return true to read from tiered storage if the CommitLog is empty + if (next != null && next.getCommitLog() != null && + next.getCommitLog().getMinOffset() < 0L) { + return true; + } + if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) { + return true; + } } if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM) @@ -208,10 +214,10 @@ public CompletableFuture getMessageAsync(String group, String } if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { - log.trace("GetMessageAsync from current store, " + + log.trace("GetMessageAsync from remote store, " + "topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums); } else { - log.trace("GetMessageAsync from remote store, " + + log.trace("GetMessageAsync from next store, " + "topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums); return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 020b9f3b068..878f1dab656 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -287,7 +287,7 @@ public void forceUpload() { log.error("IndexStoreService force upload error", e); throw new RuntimeException(e); } finally { - readWriteLock.writeLock().lock(); + readWriteLock.writeLock().unlock(); } } @@ -398,7 +398,7 @@ public void shutdown() { for (Map.Entry entry : timeStoreTable.entrySet()) { entry.getValue().shutdown(); } - if (!autoCreateNewFile) { + if (autoCreateNewFile) { this.forceUpload(); } this.timeStoreTable.clear(); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java index fb563f7c6c2..5d9c705735b 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java @@ -120,7 +120,7 @@ public void doConvertOldFormatTest() throws IOException { indexService = new IndexStoreService(fileAllocator, filePath); indexService.start(); ConcurrentSkipListMap timeStoreTable = indexService.getTimeStoreTable(); - Assert.assertEquals(1, timeStoreTable.size()); + Assert.assertEquals(2, timeStoreTable.size()); Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey()); mappedFile.destroy(10 * 1000); } @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException { indexService = new IndexStoreService(fileAllocator, filePath); indexService.start(); Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue()); - Assert.assertEquals(2, indexService.getTimeStoreTable().size()); + Assert.assertEquals(3, indexService.getTimeStoreTable().size()); Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus()); }