Skip to content

Commit

Permalink
[ISSUE #8653] Fix index service upload last file when broker shutdown…
Browse files Browse the repository at this point in the history
… and fetcher check in tiered storage
  • Loading branch information
lizhimins committed Sep 6, 2024
1 parent 97b318f commit ce1988f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,15 @@ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int
}

// determine whether tiered storage path conditions are met
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
&& !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
// return true to read from tiered storage if the CommitLog is empty
if (next != null && next.getCommitLog() != null &&
next.getCommitLog().getMinOffset() < 0L) {
return true;
}
if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
}
}

if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
Expand All @@ -208,10 +214,10 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
}

if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
log.trace("GetMessageAsync from current store, " +
log.trace("GetMessageAsync from remote store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
} else {
log.trace("GetMessageAsync from remote store, " +
log.trace("GetMessageAsync from next store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void forceUpload() {
log.error("IndexStoreService force upload error", e);
throw new RuntimeException(e);
} finally {
readWriteLock.writeLock().lock();
readWriteLock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -398,7 +398,7 @@ public void shutdown() {
for (Map.Entry<Long /* timestamp */, IndexFile> entry : timeStoreTable.entrySet()) {
entry.getValue().shutdown();
}
if (!autoCreateNewFile) {
if (autoCreateNewFile) {
this.forceUpload();
}
this.timeStoreTable.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void doConvertOldFormatTest() throws IOException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = indexService.getTimeStoreTable();
Assert.assertEquals(1, timeStoreTable.size());
Assert.assertEquals(2, timeStoreTable.size());
Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
mappedFile.destroy(10 * 1000);
}
Expand Down Expand Up @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
Assert.assertEquals(2, indexService.getTimeStoreTable().size());
Assert.assertEquals(3, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}
Expand Down

0 comments on commit ce1988f

Please sign in to comment.