From abb6e8179f74c46ead8660153f1398040e0c2c3f Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 2 Feb 2025 22:46:48 +0800 Subject: [PATCH 1/3] Fix https://github.com/apache/pulsar/issues/23910 --- .../mledger/impl/ManagedCursorImpl.java | 2 +- .../persistent/PersistentMessageFinder.java | 20 +++---------------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 203d48933f0a5..ee53a77f8782c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1336,7 +1336,7 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate } if (max <= 0) { - callback.findEntryComplete(null, ctx); + callback.findEntryFailed(new ManagedLedgerException("No entries available"), Optional.empty(), ctx); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 5a4631cf205f1..72d53a717c156 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import com.google.common.annotations.VisibleForTesting; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -91,6 +93,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback } } + @VisibleForTesting public static Pair getFindPositionRange(Iterable ledgerInfos, Position lastConfirmedEntry, long targetTimestamp, int ledgerCloseTimestampMaxClockSkewMillis) { @@ -105,15 +108,11 @@ public static Pair getFindPositionRange(Iterable Position start = null; Position end = null; - LedgerInfo secondToLastLedgerInfo = null; - LedgerInfo lastLedgerInfo = null; for (LedgerInfo info : ledgerInfos) { if (!info.hasTimestamp()) { // unexpected case, don't set start and end return Pair.of(null, null); } - secondToLastLedgerInfo = lastLedgerInfo; - lastLedgerInfo = info; long closeTimestamp = info.getTimestamp(); // For an open ledger, closeTimestamp is 0 if (closeTimestamp == 0) { @@ -128,19 +127,6 @@ public static Pair getFindPositionRange(Iterable break; } } - // If the second-to-last ledger's close timestamp is less than the target timestamp, then start from the - // first entry of the last ledger when there are confirmed entries in the ledger - if (lastLedgerInfo != null && secondToLastLedgerInfo != null - && secondToLastLedgerInfo.getTimestamp() > 0 - && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) { - Position firstPositionInLedger = PositionFactory.create(lastLedgerInfo.getLedgerId(), 0); - if (lastConfirmedEntry != null - && lastConfirmedEntry.compareTo(firstPositionInLedger) >= 0) { - start = firstPositionInLedger; - } else { - start = lastConfirmedEntry; - } - } return Pair.of(start, end); } From ef8ecd433a6e5ff25b3465b361d1486ccac08f4b Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 4 Feb 2025 14:09:20 +0800 Subject: [PATCH 2/3] fix checkstyle & tests --- .../broker/service/persistent/PersistentMessageFinder.java | 3 +-- .../pulsar/broker/service/PersistentMessageFinderTest.java | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 72d53a717c156..e780f1672848b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -19,10 +19,9 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import com.google.common.annotations.VisibleForTesting; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6f2f1f3a1a2c0..62e27eaea4169 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -636,7 +636,7 @@ public void testGetFindPositionRange_LastTimestampIsZero() { assertNotNull(range); assertNotNull(range.getLeft()); assertNull(range.getRight()); - assertEquals(range.getLeft(), PositionFactory.create(3, 0)); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); } @Test @@ -654,7 +654,7 @@ public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() { assertNotNull(range); assertNotNull(range.getLeft()); assertNull(range.getRight()); - assertEquals(range.getLeft(), PositionFactory.create(2, 9)); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); } @Test @@ -689,7 +689,7 @@ public void testGetFindPositionRange_MixedTimestamps() { assertNotNull(range); assertNotNull(range.getLeft()); assertNotNull(range.getRight()); - assertEquals(range.getLeft(), PositionFactory.create(3, 0)); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); assertEquals(range.getRight(), PositionFactory.create(3, 9)); } From 10ae74e8171cadf259758386b40ec20e35170e92 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 4 Feb 2025 15:38:41 +0800 Subject: [PATCH 3/3] fix checkstyle & tests --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ee53a77f8782c..203d48933f0a5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1336,7 +1336,7 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate } if (max <= 0) { - callback.findEntryFailed(new ManagedLedgerException("No entries available"), Optional.empty(), ctx); + callback.findEntryComplete(null, ctx); return; }