Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Sep 12, 2024
1 parent 46bcbca commit 815bd7f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.mqtt.proxy;

import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqtt5ConnectMessage;
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttPublishMessage;
import static io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.createMqttSubscribeMessage;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -140,6 +141,12 @@ public void doProcessConnect(MqttAdapterMessage adapter, String userRole,
.build();
connection.sendConnAck();

if (proxyConfig.isMqttAuthorizationEnabled()) {
MqttConnectMessage connectMessage = createMqtt5ConnectMessage(msg);
msg = connectMessage;
connection.setConnectMessage(msg);
}

ConnectEvent connectEvent = ConnectEvent.builder()
.clientId(connection.getClientId())
.address(pulsarService.getAdvertisedAddress())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,12 @@ public static MqttPublishMessage createMqttWillMessage(WillMessage willMessage)
return builder.build();
}

public static MqttConnectMessage createMqttConnectMessage(MqttConnectMessage connectMessage,
String authData) {
public static MqttConnectMessage createMqtt5ConnectMessage(MqttConnectMessage connectMessage) {
final MqttConnectVariableHeader header = connectMessage.variableHeader();
MqttProperties properties = new MqttProperties();
properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData));
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
MqttVersion.MQTT_5.protocolName(), MqttVersion.MQTT_5.protocolLevel(), header.hasUserName(),
header.hasPassword(), header.isWillRetain(), header.willQos(), header.isWillFlag(),
header.isCleanSession(), header.keepAliveTimeSeconds(), properties
);
header.isCleanSession(), header.keepAliveTimeSeconds(), connectMessage.variableHeader().properties());
MqttConnectMessage newConnectMessage = new MqttConnectMessage(connectMessage.fixedHeader(), variableHeader,
connectMessage.payload());
return newConnectMessage;
Expand Down

0 comments on commit 815bd7f

Please sign in to comment.