Skip to content

Commit c9b734b

Browse files
authored
[fix][ml] Fix getNumberOfEntries may point to deleted ledger (#24852)
1 parent acad78c commit c9b734b

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3882,11 +3882,17 @@ public long getNumberOfEntries(Range<Position> range) {
38823882
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
38833883

38843884
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
3885-
// If the 2 positions are in the same ledger
3886-
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3887-
count += fromIncluded ? 1 : 0;
3888-
count += toIncluded ? 1 : 0;
3889-
return count;
3885+
LedgerInfo li = ledgers.get(toPosition.getLedgerId());
3886+
if (li != null) {
3887+
// If the 2 positions are in the same ledger
3888+
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
3889+
count += fromIncluded ? 1 : 0;
3890+
count += toIncluded ? 1 : 0;
3891+
return count;
3892+
} else {
3893+
// if the ledgerId is not in the ledgers, it means it has been deleted
3894+
return 0;
3895+
}
38903896
} else {
38913897
long count = 0;
38923898
// If the from & to are pointing to different ledgers, then we need to :

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.testng.Assert.assertSame;
4343
import static org.testng.Assert.assertTrue;
4444
import static org.testng.Assert.fail;
45+
import com.google.common.collect.Range;
4546
import com.google.common.collect.Sets;
4647
import io.netty.buffer.ByteBuf;
4748
import io.netty.buffer.ByteBufAllocator;
@@ -2721,6 +2722,37 @@ public void testGetNumberOfEntriesInStorage() throws Exception {
27212722
assertEquals(length, numberOfEntries);
27222723
}
27232724

2725+
@Test
2726+
public void testGetNumberOfEntries() throws Exception {
2727+
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
2728+
initManagedLedgerConfig(managedLedgerConfig);
2729+
managedLedgerConfig.setMaxEntriesPerLedger(5);
2730+
ManagedLedgerImpl managedLedger =
2731+
(ManagedLedgerImpl) factory.open("testGetNumberOfEntries", managedLedgerConfig);
2732+
// open cursor to prevent ledger to be deleted when ledger rollover
2733+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
2734+
int numberOfEntries = 10;
2735+
List<Position> positions = new ArrayList<>(numberOfEntries);
2736+
for (int i = 0; i < numberOfEntries; i++) {
2737+
positions.add(managedLedger.addEntry(("entry-" + i).getBytes(Encoding)));
2738+
}
2739+
Position mdPos = positions.get(numberOfEntries - 1);
2740+
Position rdPos = PositionFactory.create(mdPos.getLedgerId(), mdPos.getEntryId() + 1);
2741+
managedCursor.delete(positions);
2742+
// trigger ledger rollover and wait for the new ledger created
2743+
Awaitility.await().untilAsserted(() -> {
2744+
assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString());
2745+
});
2746+
managedLedger.rollCurrentLedgerIfFull();
2747+
Awaitility.await().untilAsserted(() -> {
2748+
assertEquals(managedLedger.getLedgersInfo().size(), 1);
2749+
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
2750+
});
2751+
2752+
long length = managedLedger.getNumberOfEntries(Range.closed(mdPos, rdPos));
2753+
assertEquals(length, 0);
2754+
}
2755+
27242756
@Test
27252757
public void testEstimatedBacklogSize() throws Exception {
27262758
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2297,7 +2297,7 @@ public void testAcknowledgeWithReconnection() throws Exception {
22972297

22982298
Awaitility.await().untilAsserted(() ->
22992299
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2300-
5));
2300+
0));
23012301

23022302
// Make consumer reconnect to broker
23032303
admin.topics().unload(topicName);

0 commit comments

Comments
 (0)