Skip to content

Commit

Permalink
Fix the auth data is NPE error (#1531)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Nov 14, 2024
1 parent e78bcfa commit 7d76b42
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package io.streamnative.pulsar.handlers.mqtt.broker.channel;

import static io.streamnative.pulsar.handlers.mqtt.common.Constants.AUTH_DATA_ATTRIBUTE_KEY;
import io.netty.channel.ChannelHandlerContext;
import io.streamnative.pulsar.handlers.mqtt.broker.impl.consumer.MQTTConsumer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.ServerCnx;
Expand Down Expand Up @@ -65,4 +67,9 @@ private void safelyRemoveConsumer(Consumer consumer) {
});
}
}

@Override
public AuthenticationDataSource getAuthenticationData() {
return ctx.channel().attr(AUTH_DATA_ATTRIBUTE_KEY).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.CONNECT_ACK;
import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.DISCONNECTED;
import static io.streamnative.pulsar.handlers.mqtt.common.Connection.ConnectionState.ESTABLISHED;
import static io.streamnative.pulsar.handlers.mqtt.common.Constants.AUTH_DATA_ATTRIBUTE_KEY;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.getAuthMethod;
import static io.streamnative.pulsar.handlers.mqtt.common.utils.NettyUtils.ATTR_KEY_CONNECTION;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class Connection {

static ChannelException channelInactiveException = new ChannelException("Channel is inactive");


Connection(ConnectionBuilder builder) {
this.clientId = builder.clientId;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -116,6 +118,7 @@ public class Connection {
this.processor = builder.processor;
this.fromProxy = builder.fromProxy;
this.authData = builder.authData;
this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData);
this.addIdleStateHandler();
this.manager.addConnection(this);
this.topicAliasManager = new TopicAliasManager(clientRestrictions.getTopicAliasMaximum());
Expand Down Expand Up @@ -162,6 +165,7 @@ public CompletableFuture<Void> sendAck(MqttAck mqttAck) {

public void updateAuthData(AuthenticationDataSource authData) {
this.authData = authData;
this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData);
}

public CompletableFuture<Void> sendAckThenClose(MqttAck mqttAck) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package io.streamnative.pulsar.handlers.mqtt.common;

import io.netty.util.AttributeKey;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

/**
* Server constants keeper.
*/
Expand Down Expand Up @@ -47,6 +50,9 @@ public final class Constants {

public static final String MQTT_SUB_PROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1, mqttv5.0";

public static final AttributeKey<AuthenticationDataSource> AUTH_DATA_ATTRIBUTE_KEY =
AttributeKey.valueOf("authData");

private Constants() {
}
}

0 comments on commit 7d76b42

Please sign in to comment.