From ed3fb2f690cd99184ddd5597e853c6edcdd75e80 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 8 May 2025 11:58:33 +0800 Subject: [PATCH 01/10] Optimize message TTL check --- .../PersistentMessageExpiryMonitor.java | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 6404e56d59bd0..180332ac14852 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.SortedMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -37,7 +38,6 @@ import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -81,31 +81,24 @@ public boolean isAutoSkipNonRecoverableData() { @Override public boolean expireMessages(int messageTTLInSeconds) { - if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { - log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, - messageTTLInSeconds); - // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew - checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); - // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. - cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { - try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp); - } catch (Exception e) { - log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e); - } finally { - entry.release(); - } - return false; - }, this, null); - return true; - } else { + if (!expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName, subName); } return false; } + log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, + messageTTLInSeconds); + // First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew + checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); + // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. + long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTTLInSeconds); + int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = topic.getBrokerService().pulsar() + .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis(); + new PersistentMessageFinder(topicName, cursor, managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis) + .findMessages(expiredMessageTimestamp, this); + return true; } private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) { From aa86d1a53cfc06183afcf2aeae682f475fdd5d7d Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 8 May 2025 12:04:27 +0800 Subject: [PATCH 02/10] Optimize message TTL check --- .../persistent/PersistentMessageExpiryMonitor.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 180332ac14852..c0d54e7eb3453 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -52,6 +52,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag private final Rate msgExpired; private final LongAdder totalMsgExpired; private final PersistentSubscription subscription; + private final PersistentMessageFinder finder; private static final int FALSE = 0; private static final int TRUE = 1; @@ -70,6 +71,10 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription this.subscription = subscription; this.msgExpired = new Rate(); this.totalMsgExpired = new LongAdder(); + int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = topic.getBrokerService().pulsar() + .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis(); + this.finder = new PersistentMessageFinder(topicName, cursor, + managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis); } @VisibleForTesting @@ -94,10 +99,7 @@ public boolean expireMessages(int messageTTLInSeconds) { checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); // Some part of entries in active Ledger may have reached TTL, so we need to continue searching. long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTTLInSeconds); - int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = topic.getBrokerService().pulsar() - .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis(); - new PersistentMessageFinder(topicName, cursor, managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis) - .findMessages(expiredMessageTimestamp, this); + finder.findMessages(expiredMessageTimestamp, this); return true; } From 7d4699fb17ee90e70e05c0ac73924b5bf9af6e16 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 11 May 2025 23:42:52 +0800 Subject: [PATCH 03/10] fix tests --- .../broker/service/PersistentMessageFinderTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index bab21fbc00137..e18a24a9ababe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -60,6 +61,8 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -483,6 +486,12 @@ public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Thr PersistentTopic mock = mock(PersistentTopic.class); when(mock.getName()).thenReturn("topicname"); when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + BrokerService brokerService = mock(BrokerService.class); + doReturn(brokerService).when(mock).getBrokerService(); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(pulsarService).when(brokerService).pulsar(); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + doReturn(serviceConfiguration).when(pulsarService).getConfig(); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); AsyncCallbacks.MarkDeleteCallback markDeleteCallback = (AsyncCallbacks.MarkDeleteCallback) spy( From 288928e9b0300583446d1a0c4eb00286882ad21d Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 12 May 2025 11:22:02 +0800 Subject: [PATCH 04/10] fix tests --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index e18a24a9ababe..8f79420426c8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -463,6 +463,12 @@ public void testIncorrectClientClock() throws Exception { PersistentTopic mock = mock(PersistentTopic.class); when(mock.getName()).thenReturn("topicname"); when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + BrokerService brokerService = mock(BrokerService.class); + doReturn(brokerService).when(mock).getBrokerService(); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(pulsarService).when(brokerService).pulsar(); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + doReturn(serviceConfiguration).when(pulsarService).getConfig(); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); monitor.expireMessages(maxTTLSeconds); From 8721c9c5ca6d4d21c2efec922390def009ed8260 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 12 May 2025 19:12:25 +0800 Subject: [PATCH 05/10] fix tests --- .../service/PersistentMessageFinderTest.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 8f79420426c8f..71abc1567d008 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -237,9 +237,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional }); assertTrue(ex.get()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), @@ -422,9 +420,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); assertTrue(monitor.expireMessages(ttlSeconds)); @@ -460,6 +456,14 @@ public void testIncorrectClientClock() throws Exception { // The number of ledgers should be (entriesNum / MaxEntriesPerLedger) + 1 // Please refer to: https://github.com/apache/pulsar/pull/22034 assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1); + PersistentTopic mock = mockPersistentTopic("topicname"); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); + monitor.expireMessages(maxTTLSeconds); + assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + } + + private PersistentTopic mockPersistentTopic(String topicName) throws Exception { PersistentTopic mock = mock(PersistentTopic.class); when(mock.getName()).thenReturn("topicname"); when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); @@ -469,10 +473,7 @@ public void testIncorrectClientClock() throws Exception { doReturn(pulsarService).when(brokerService).pulsar(); ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); doReturn(serviceConfiguration).when(pulsarService).getConfig(); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); - Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); - monitor.expireMessages(maxTTLSeconds); - assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + return mock; } @Test @@ -489,15 +490,7 @@ public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Thr ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); } assertEquals(ledger.getLedgersInfoAsList().size(), 2); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST); - BrokerService brokerService = mock(BrokerService.class); - doReturn(brokerService).when(mock).getBrokerService(); - PulsarService pulsarService = mock(PulsarService.class); - doReturn(pulsarService).when(brokerService).pulsar(); - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); - doReturn(serviceConfiguration).when(pulsarService).getConfig(); + PersistentTopic mock = mockPersistentTopic("topicname"); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); AsyncCallbacks.MarkDeleteCallback markDeleteCallback = (AsyncCallbacks.MarkDeleteCallback) spy( @@ -536,9 +529,8 @@ void testMessageExpiryWithPosition() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - PersistentTopic topic = mock(PersistentTopic.class); + PersistentTopic topic = mockPersistentTopic("topicname"); when(subscription.getTopic()).thenReturn(topic); - when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); @@ -584,6 +576,7 @@ void testMessageExpiryWithPosition() throws Exception { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); + doReturn("cursor").when(mockCursor).getName(); PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to From c66fa32173cf2e36b5e81a00ab536d6eda607974 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 13 May 2025 17:33:16 +0800 Subject: [PATCH 06/10] fix tests --- .../pulsar/broker/service/TransactionMarkerDeleteTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index fc10d315cb14a..b945f5abcbc4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -77,6 +77,7 @@ public void testMarkerDeleteTimes() throws Exception { ServiceConfiguration configuration = mock(ServiceConfiguration.class); doReturn(brokerService).when(topic).getBrokerService(); doReturn(pulsarService).when(brokerService).getPulsar(); + doReturn(pulsarService).when(brokerService).pulsar(); doReturn(configuration).when(pulsarService).getConfig(); doReturn(false).when(configuration).isTransactionCoordinatorEnabled(); doReturn(managedLedger).when(topic).getManagedLedger(); From 8f4c8f6b044f0cdce748377bdc1e0abec91b2514 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 13 May 2025 18:34:39 +0800 Subject: [PATCH 07/10] fix tests --- .../pulsar/broker/service/MessageCumulativeAckTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index cc4fe22962484..20ab5ec96207d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -36,6 +36,8 @@ import static org.mockito.Mockito.when; import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; + +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; @@ -78,6 +80,8 @@ public void setup() throws Exception { var mockManagedLedger = mock(ManagedLedger.class); when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService()); + ManagedCursor cursor = mock(ManagedCursor.class); + doReturn("sub-1").when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", mock(ManagedCursorImpl.class), false)); doNothing().when(sub).acknowledgeMessage(any(), any(), any()); From 6ccaa7be967aa43cac86715d7cb7c6444f1df2db Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 13 May 2025 18:36:11 +0800 Subject: [PATCH 08/10] fix tests --- .../apache/pulsar/broker/service/MessageCumulativeAckTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 20ab5ec96207d..9400f1e386528 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Codec; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -81,7 +82,7 @@ public void setup() throws Exception { when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService()); ManagedCursor cursor = mock(ManagedCursor.class); - doReturn("sub-1").when(cursor).getName(); + doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", mock(ManagedCursorImpl.class), false)); doNothing().when(sub).acknowledgeMessage(any(), any(), any()); From 1ce887d9fa696aaa8ce7b591d766bb3d7221a6b3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 13 May 2025 19:41:40 +0800 Subject: [PATCH 09/10] fix tests --- .../apache/pulsar/broker/service/MessageCumulativeAckTest.java | 2 +- .../org/apache/pulsar/broker/transaction/TransactionTest.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 9400f1e386528..55fd514c39749 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -84,7 +84,7 @@ public void setup() throws Exception { ManagedCursor cursor = mock(ManagedCursor.class); doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", - mock(ManagedCursorImpl.class), false)); + cursor, false)); doNothing().when(sub).acknowledgeMessage(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 5972c1cc19099..09f2b49472532 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -145,6 +145,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactionServiceFactory; import org.apache.pulsar.compaction.PulsarCompactionServiceFactory; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; @@ -1549,6 +1550,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { when(topic.getName()).thenReturn("topic-a"); // Mock cursor for subscription. ManagedCursor cursor_subscription = mock(ManagedCursor.class); + doReturn(Codec.encode("sub-a")).when(cursor_subscription).getName(); doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive(); // Create subscription. String subscriptionName = "sub-a"; From b6dc4162885330f26d0e3ad1bf7c126e12181a73 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 14 May 2025 01:48:40 +0800 Subject: [PATCH 10/10] fix tests --- .../apache/pulsar/broker/service/MessageCumulativeAckTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 55fd514c39749..31a9b7f95d676 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -40,7 +40,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic;