Skip to content

Commit b358339

Browse files
authored
[fix] Change read max position to earliest position (#1325)
1 parent d822217 commit b358339

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.bookkeeper.mledger.ManagedCursor;
3333
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3434
import org.apache.bookkeeper.mledger.Position;
35+
import org.apache.bookkeeper.mledger.PositionFactory;
3536
import org.apache.commons.collections4.CollectionUtils;
3637
import org.apache.commons.lang3.tuple.Pair;
3738
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -193,7 +194,7 @@ private void readMoreEntries() {
193194
if (log.isDebugEnabled()) {
194195
log.debug("{} Schedule read of {} messages.", name, messagesToRead);
195196
}
196-
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, null);
197+
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, PositionFactory.LATEST);
197198
} else {
198199
if (log.isDebugEnabled()) {
199200
log.debug("{} Not schedule read due to pending read. Messages to read {}.",

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.bookkeeper.mledger.ManagedCursor;
4545
import org.apache.bookkeeper.mledger.ManagedLedgerException;
4646
import org.apache.bookkeeper.mledger.Position;
47+
import org.apache.bookkeeper.mledger.PositionFactory;
4748
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
4849
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
4950
import org.apache.pulsar.broker.PulsarServerException;
@@ -161,7 +162,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
161162
HAVE_PENDING_READ_UPDATER.set(ExchangeMessageRouter.this, FALSE);
162163
log.error("Failed to read entries from exchange {}", exchange.getName(), exception);
163164
}
164-
}, null, null);
165+
}, null, PositionFactory.LATEST);
165166
} else {
166167
log.warn("{} Not schedule read due to pending read. Messages to read {}.",
167168
exchange.getName(), availablePermits);

0 commit comments

Comments
 (0)