Skip to content

Commit

Permalink
Upgrade Pulsar from 2.10.0 to 3.4.0 (#1377)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Jun 20, 2024
1 parent ab471c6 commit cbd4ea9
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 34 deletions.
2 changes: 1 addition & 1 deletion mqtt-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.10.0.0-rc4</version>
<version>3.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -63,7 +63,7 @@ protected CompletableFuture<Optional<Topic>> getTopicReference(String mqttTopicN
});
}

protected CompletableFuture<PositionImpl> writeToPulsarTopic(Connection connection, MqttPublishMessage msg) {
protected CompletableFuture<Position> writeToPulsarTopic(Connection connection, MqttPublishMessage msg) {
return writeToPulsarTopic(connection, msg, false);
}

Expand All @@ -73,7 +73,7 @@ protected CompletableFuture<PositionImpl> writeToPulsarTopic(Connection connecti
* @param checkSubscription Check if the subscription exists, throw #{MQTTNoMatchingSubscriberException}
* if the subscription does not exist;
*/
protected CompletableFuture<PositionImpl> writeToPulsarTopic(Connection connection, MqttPublishMessage msg,
protected CompletableFuture<Position> writeToPulsarTopic(Connection connection, MqttPublishMessage msg,
boolean checkSubscription) {
TopicAliasManager topicAliasManager = connection.getTopicAliasManager();
String producerName = connection.getClientId();
Expand Down Expand Up @@ -102,7 +102,7 @@ protected CompletableFuture<PositionImpl> writeToPulsarTopic(Connection connecti
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
MessageImpl<byte[]> message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
CompletableFuture<PositionImpl> ret = MessagePublishContext.publishMessages(producerName, message, topic);
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message, topic);
message.recycle();
return ret.thenApply(position -> {
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand Down Expand Up @@ -158,15 +160,15 @@ public void processPubAck(MqttAdapterMessage adapter) {
int packetId = msg.variableHeader().messageId();
OutstandingPacket packet = outstandingPacketContainer.remove(packetId);
if (packet != null) {
PositionImpl position;
Position position;
if (packet.isBatch()) {
long[] ackSets = new long[packet.getBatchSize()];
for (int i = 0; i < packet.getBatchSize(); i++) {
ackSets[i] = packet.getBatchIndex() == i ? 0 : 1;
}
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId(), ackSets);
position = AckSetStateUtil.createPositionWithAckSet(packet.getLedgerId(), packet.getEntryId(), ackSets);
} else {
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId());
position = PositionFactory.create(packet.getLedgerId(), packet.getEntryId());
}
packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position),
CommandAck.AckType.Individual, Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.client.api.Message;
Expand All @@ -34,7 +35,7 @@ public final class MessagePublishContext implements PublishContext {
private String producerName;
private Topic topic;
private long startTimeNs;
private CompletableFuture<PositionImpl> positionFuture;
private CompletableFuture<Position> positionFuture;

/**
* Executed from managed ledger thread when the message is persisted.
Expand All @@ -50,13 +51,13 @@ public void completed(Exception exception, long ledgerId, long entryId) {
topic.getName(), ledgerId, entryId);
}
topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
positionFuture.complete(PositionImpl.get(ledgerId, entryId));
positionFuture.complete(PositionFactory.create(ledgerId, entryId));
}
recycle();
}

// recycler
public static MessagePublishContext get(CompletableFuture<PositionImpl> positionFuture, String producerName,
public static MessagePublishContext get(CompletableFuture<Position> positionFuture, String producerName,
Topic topic, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.positionFuture = positionFuture;
Expand Down Expand Up @@ -92,9 +93,9 @@ public void recycle() {
/**
* publish mqtt message to pulsar topic, no batch.
*/
public static CompletableFuture<PositionImpl> publishMessages(String producerName, Message<byte[]> message,
Topic topic) {
CompletableFuture<PositionImpl> future = new CompletableFuture<>();
public static CompletableFuture<Position> publishMessages(String producerName, Message<byte[]> message,
Topic topic) {
CompletableFuture<Position> future = new CompletableFuture<>();

ByteBuf headerAndPayload = messageToByteBuf(message);
topic.publishMessage(headerAndPayload,
Expand Down
24 changes: 20 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<version>2.10.0.0-rc4</version>
<version>3.4.0-SNAPSHOT</version>
<name>StreamNative :: Pulsar Protocol Handler :: MoP Parent</name>
<description>Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler.</description>

Expand Down Expand Up @@ -49,7 +49,7 @@
<mockito.version>2.22.0</mockito.version>
<testng.version>6.14.3</testng.version>
<awaitility.version>4.0.2</awaitility.version>
<pulsar.version>3.2.0-SNAPSHOT</pulsar.version>
<pulsar.version>3.4.0-SNAPSHOT</pulsar.version>
<mqtt.codec.version>4.1.94.Final</mqtt.codec.version>
<log4j2.version>2.18.0</log4j2.version>
<fusesource.client.version>1.16</fusesource.client.version>
Expand Down Expand Up @@ -376,12 +376,10 @@
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>

<repository>
<id>snapshot</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>

<repository>
<snapshots>
<enabled>false</enabled>
Expand All @@ -390,5 +388,23 @@
<name>bintray</name>
<url>https://yahoo.bintray.com/maven</url>
</repository>
<repository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/service/local/repositories/0/content</url>
</repository>
<repository>
<id>nexus-snapshot</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>
</project>
2 changes: 1 addition & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.10.0.0-rc4</version>
<version>3.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt-tests</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.channel.EventLoopGroup;
import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.utils.ConfigurationUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
Expand All @@ -33,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand Down Expand Up @@ -459,18 +459,19 @@ public void reallyShutdown() {
private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) throws IOException {
return mockBookKeeper;
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties) {
return CompletableFuture.completedFuture(mockBookKeeper);
}

@Override
public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store, EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties, StatsLogger statsLogger)
throws IOException {
return mockBookKeeper;
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
EventLoopGroup eventLoopGroup,
Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
Map<String, Object> ensemblePlacementPolicyProperties, StatsLogger statsLogger) {
return CompletableFuture.completedFuture(mockBookKeeper);
}

@Override
Expand Down

0 comments on commit cbd4ea9

Please sign in to comment.