Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
Expand All @@ -37,7 +38,6 @@
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand All @@ -52,6 +52,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
private final PersistentSubscription subscription;
private final PersistentMessageFinder finder;

private static final int FALSE = 0;
private static final int TRUE = 1;
Expand All @@ -70,6 +71,10 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription
this.subscription = subscription;
this.msgExpired = new Rate();
this.totalMsgExpired = new LongAdder();
int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = topic.getBrokerService().pulsar()
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis();
this.finder = new PersistentMessageFinder(topicName, cursor,
managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis);
}

@VisibleForTesting
Expand All @@ -81,31 +86,21 @@ public boolean isAutoSkipNonRecoverableData() {

@Override
public boolean expireMessages(int messageTTLInSeconds) {
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
messageTTLInSeconds);
// First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew
checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
// Some part of entries in active Ledger may have reached TTL, so we need to continue searching.
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
} finally {
entry.release();
}
return false;
}, this, null);
return true;
} else {
if (!expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName,
subName);
}
return false;
}
log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
messageTTLInSeconds);
// First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew
checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
// Some part of entries in active Ledger may have reached TTL, so we need to continue searching.
long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTTLInSeconds);
finder.findMessages(expiredMessageTimestamp, this);
return true;
}

private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;

import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -47,6 +48,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -78,8 +80,10 @@ public void setup() throws Exception {
var mockManagedLedger = mock(ManagedLedger.class);
when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService());
ManagedCursor cursor = mock(ManagedCursor.class);
doReturn(Codec.encode("sub-1")).when(cursor).getName();
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
cursor, false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -60,6 +61,8 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -236,9 +239,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
});
assertTrue(ex.get());

PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
PersistentTopic mock = mockPersistentTopic("topicname");

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Expand Down Expand Up @@ -421,9 +422,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
PersistentTopic mock = mockPersistentTopic("topicname");

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
assertTrue(monitor.expireMessages(ttlSeconds));
Expand Down Expand Up @@ -459,15 +458,26 @@ public void testIncorrectClientClock() throws Exception {
// The number of ledgers should be (entriesNum / MaxEntriesPerLedger) + 1
// Please refer to: https://github.com/apache/pulsar/pull/22034
assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
PersistentTopic mock = mockPersistentTopic("topicname");
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
monitor.expireMessages(maxTTLSeconds);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

private PersistentTopic mockPersistentTopic(String topicName) throws Exception {
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(mock).getBrokerService();
PulsarService pulsarService = mock(PulsarService.class);
doReturn(pulsarService).when(brokerService).pulsar();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
doReturn(serviceConfiguration).when(pulsarService).getConfig();
return mock;
}

@Test
public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable {
final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
Expand All @@ -482,9 +492,7 @@ public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Thr
ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp));
}
assertEquals(ledger.getLedgersInfoAsList().size(), 2);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
PersistentTopic mock = mockPersistentTopic("topicname");
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
(AsyncCallbacks.MarkDeleteCallback) spy(
Expand Down Expand Up @@ -523,9 +531,8 @@ void testMessageExpiryWithPosition() throws Exception {
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

PersistentSubscription subscription = mock(PersistentSubscription.class);
PersistentTopic topic = mock(PersistentTopic.class);
PersistentTopic topic = mockPersistentTopic("topicname");
when(subscription.getTopic()).thenReturn(topic);
when(topic.getName()).thenReturn("topicname");

for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
Expand Down Expand Up @@ -571,6 +578,7 @@ void testMessageExpiryWithPosition() throws Exception {
clearInvocations(monitor);

ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
doReturn("cursor").when(mockCursor).getName();
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic,
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void testMarkerDeleteTimes() throws Exception {
ServiceConfiguration configuration = mock(ServiceConfiguration.class);
doReturn(brokerService).when(topic).getBrokerService();
doReturn(pulsarService).when(brokerService).getPulsar();
doReturn(pulsarService).when(brokerService).pulsar();
doReturn(configuration).when(pulsarService).getConfig();
doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
doReturn(managedLedger).when(topic).getManagedLedger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
Expand Down Expand Up @@ -1549,6 +1550,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
when(topic.getName()).thenReturn("topic-a");
// Mock cursor for subscription.
ManagedCursor cursor_subscription = mock(ManagedCursor.class);
doReturn(Codec.encode("sub-a")).when(cursor_subscription).getName();
doThrow(new RuntimeException("1")).when(cursor_subscription).updateLastActive();
// Create subscription.
String subscriptionName = "sub-a";
Expand Down
Loading