diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java index 878f1dab656..6b682449c55 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java @@ -271,17 +271,17 @@ public CompletableFuture> queryAsync( public void forceUpload() { try { readWriteLock.writeLock().lock(); - if (this.currentWriteFile == null) { - log.warn("IndexStoreService no need force upload current write file"); - return; - } - // note: current file has been shutdown before - IndexStoreFile lastFile = new IndexStoreFile(storeConfig, currentWriteFile.getTimestamp()); - if (this.doCompactThenUploadFile(lastFile)) { - this.setCompactTimestamp(lastFile.getTimestamp()); - } else { - throw new TieredStoreException( - TieredStoreErrorCode.UNKNOWN, "IndexStoreService force compact current file error"); + while (true) { + Map.Entry entry = + this.timeStoreTable.higherEntry(this.compactTimestamp.get()); + if (entry == null) { + break; + } + if (this.doCompactThenUploadFile(entry.getValue())) { + this.setCompactTimestamp(entry.getValue().getTimestamp()); + // The total number of files will not too much, prevent io too fast. + TimeUnit.MILLISECONDS.sleep(50); + } } } catch (Exception e) { log.error("IndexStoreService force upload error", e); @@ -393,19 +393,13 @@ protected IndexFile getNextSealedFile() { @Override public void shutdown() { super.shutdown(); - readWriteLock.writeLock().lock(); - try { - for (Map.Entry entry : timeStoreTable.entrySet()) { - entry.getValue().shutdown(); + // Wait index service upload then clear time store table + while (!this.timeStoreTable.isEmpty()) { + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - if (autoCreateNewFile) { - this.forceUpload(); - } - this.timeStoreTable.clear(); - } catch (Exception e) { - log.error("IndexStoreService shutdown error", e); - } finally { - readWriteLock.writeLock().unlock(); } } @@ -424,6 +418,18 @@ public void run() { } this.waitForRunning(TimeUnit.SECONDS.toMillis(10)); } + readWriteLock.writeLock().lock(); + try { + if (autoCreateNewFile) { + this.forceUpload(); + } + this.timeStoreTable.forEach((timestamp, file) -> file.shutdown()); + this.timeStoreTable.clear(); + } catch (Exception e) { + log.error("IndexStoreService shutdown error", e); + } finally { + readWriteLock.writeLock().unlock(); + } log.info(this.getServiceName() + " service shutdown"); } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java index 5d9c705735b..83b407e73ba 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException { indexService = new IndexStoreService(fileAllocator, filePath); indexService.start(); Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue()); - Assert.assertEquals(3, indexService.getTimeStoreTable().size()); + Assert.assertEquals(4, indexService.getTimeStoreTable().size()); Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD, indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus()); }