diff --git a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java
index f95721af..9f622984 100644
--- a/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java
+++ b/application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java
@@ -11,6 +11,7 @@
import javasabr.mqtt.network.MqttConnectionFactory;
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
+import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
import javasabr.mqtt.network.message.in.PublishMqttInMessage;
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
import javasabr.mqtt.service.AuthenticationService;
@@ -30,12 +31,12 @@
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
-import javasabr.mqtt.service.impl.InMemoryRetainMessageService;
import javasabr.mqtt.service.impl.DefaultTopicService;
import javasabr.mqtt.service.impl.DisabledAuthorizationService;
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
import javasabr.mqtt.service.impl.FileCredentialsSource;
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
+import javasabr.mqtt.service.impl.InMemoryRetainMessageService;
import javasabr.mqtt.service.impl.InMemorySubscriptionService;
import javasabr.mqtt.service.impl.SimpleAuthenticationService;
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
@@ -51,6 +52,7 @@
import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory;
import javasabr.mqtt.service.message.out.factory.Mqtt5MessageOutFactory;
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
+import javasabr.mqtt.service.message.validator.ClientIdMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishMessageExpiryIntervalMqttInMessageFieldValidator;
import javasabr.mqtt.service.message.validator.PublishPayloadMqttInMessageFieldValidator;
@@ -163,19 +165,27 @@ TopicService topicService() {
return new DefaultTopicService();
}
+ @Bean
+ ClientIdMqttInMessageFieldValidator clientIdMqttInMessageFieldValidator(
+ MessageOutFactoryService messageOutFactoryService) {
+ return new ClientIdMqttInMessageFieldValidator(messageOutFactoryService);
+ }
+
@Bean
MqttInMessageHandler connectInMqttInMessageHandler(
ClientIdRegistry clientIdRegistry,
AuthenticationService authenticationService,
MqttSessionService sessionService,
SubscriptionService subscriptionService,
- MessageOutFactoryService messageOutFactoryService) {
+ MessageOutFactoryService messageOutFactoryService,
+ List extends MqttInMessageFieldValidator super ExternalNetworkMqttUser, ConnectMqttInMessage>> fieldValidators) {
return new ConnectInMqttInMessageHandler(
clientIdRegistry,
authenticationService,
sessionService,
subscriptionService,
- messageOutFactoryService);
+ messageOutFactoryService,
+ fieldValidators);
}
@Bean
@@ -391,10 +401,6 @@ MqttServerConnectionConfig externalConnectionConfig(Environment env) {
"mqtt.external.connection.topic.alias.maximum",
int.class,
0),
- env.getProperty(
- "mqtt.external.connection.default.session.expiration.time",
- long.class,
- MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT),
env.getProperty(
"mqtt.external.connection.keep.alive.enabled",
boolean.class,
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/ExternalConnectionTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/ExternalConnectionTest.groovy
index 47ad60af..b5e28f0a 100644
--- a/application/src/test/groovy/javasabr/mqtt/broker/application/ExternalConnectionTest.groovy
+++ b/application/src/test/groovy/javasabr/mqtt/broker/application/ExternalConnectionTest.groovy
@@ -17,7 +17,7 @@ import java.util.concurrent.CompletionException
class ExternalConnectionTest extends IntegrationSpecification {
- def "client should connect to broker without user and pass using mqtt 3.1.1"() {
+ def "client should connect to broker without user and pass using MQTT 3.1.1"() {
given:
def client = buildExternalMqtt311Client()
when:
@@ -29,15 +29,14 @@ class ExternalConnectionTest extends IntegrationSpecification {
client.disconnect().join()
}
- def "client should connect to broker without user and pass using mqtt 5"() {
+ def "client should connect to broker without user and pass using MQTT 5"() {
given:
def client = buildExternalMqtt5Client()
when:
def result = client.connect().join()
then:
result.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS
- result.sessionExpiryInterval.present
- result.sessionExpiryInterval.getAsLong() == MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT
+ !result.sessionExpiryInterval.present
result.serverKeepAlive.present
result.serverKeepAlive.getAsInt() == MqttProperties.SERVER_KEEP_ALIVE_DISABLED
!result.serverReference.present
@@ -48,7 +47,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
client.disconnect().join()
}
- def "client should connect to broker with user and pass using mqtt 3.1.1"() {
+ def "client should connect to broker with user and pass using MQTT 3.1.1"() {
given:
def client = buildExternalMqtt311Client()
when:
@@ -60,15 +59,14 @@ class ExternalConnectionTest extends IntegrationSpecification {
client.disconnect().join()
}
- def "client should connect to broker with user and pass using mqtt 5"() {
+ def "client should connect to broker with user and pass using MQTT 5"() {
given:
def client = buildExternalMqtt5Client()
when:
def result = connectWith(client, 'user1', 'password')
then:
result.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS
- result.sessionExpiryInterval.present
- result.sessionExpiryInterval.getAsLong() == MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT
+ !result.sessionExpiryInterval.present
result.serverKeepAlive.present
result.serverKeepAlive.getAsInt() == MqttProperties.SERVER_KEEP_ALIVE_DISABLED
!result.serverReference.present
@@ -79,7 +77,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
client.disconnect().join()
}
- def "client should not connect to broker without providing a client id using mqtt 3.1.1"() {
+ def "client should not connect to broker without providing a client id using MQTT 3.1.1"() {
given:
def client = buildExternalMqtt311Client("")
when:
@@ -90,7 +88,8 @@ class ExternalConnectionTest extends IntegrationSpecification {
cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED
}
- def "client should connect to broker without providing a client id using mqtt 5"() {
+ @Ignore("until finalizing clientId validation")
+ def "client should connect to broker without providing a client id using MQTT 5"() {
given:
def client = buildExternalMqtt5Client("")
when:
@@ -103,7 +102,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
client.disconnect().join()
}
- def "client should not connect to broker with invalid client id using mqtt 3.1.1"(String clientId) {
+ def "client should not connect to broker with invalid client id using MQTT 3.1.1"(String clientId) {
given:
def client = buildExternalMqtt311Client(clientId)
when:
@@ -116,7 +115,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
clientId << ["!@#!@*()^&"]
}
- def "client should not connect to broker with invalid client id using mqtt 5"(String clientId) {
+ def "client should not connect to broker with invalid client id using MQTT 5"(String clientId) {
given:
def client = buildExternalMqtt5Client(clientId)
when:
@@ -129,7 +128,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
clientId << ["!@#!@*()^&"]
}
- def "client should not connect to broker with wrong pass using mqtt 3.1.1"() {
+ def "client should not connect to broker with wrong pass using MQTT 3.1.1"() {
given:
def client = buildExternalMqtt311Client()
when:
@@ -141,7 +140,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
}
@Ignore
- def "client should not connect to broker without username and with pass using mqtt 3.1.1"() {
+ def "client should not connect to broker without username and with pass using MQTT 3.1.1"() {
given:
def client = buildMqtt311MockClient()
def clientId = generateClientId()
@@ -166,7 +165,7 @@ class ExternalConnectionTest extends IntegrationSpecification {
connectAck.reasonCode == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD
}
- def "client should not connect to broker with wrong pass using mqtt 5"() {
+ def "client should not connect to broker with wrong pass using MQTT 5"() {
given:
def client = buildExternalMqtt5Client()
when:
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy
index 5b529382..7eeaff95 100644
--- a/application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy
+++ b/application/src/test/groovy/javasabr/mqtt/broker/application/IntegrationSpecification.groovy
@@ -11,6 +11,7 @@ import javasabr.mqtt.model.MqttVersion
import javasabr.mqtt.network.MqttConnection
import javasabr.mqtt.network.MqttMockClient
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser
+import javasabr.mqtt.test.support.BaseSpecification
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
@@ -22,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference
@TestPropertySource("classpath:application-test.properties")
@SpringJUnitConfig(classes = MqttBrokerTestConfig)
-class IntegrationSpecification extends Specification {
+class IntegrationSpecification extends BaseSpecification {
public static final encoding = StandardCharsets.UTF_8
public static final topicFilter = "topic/Filter"
@@ -141,7 +142,7 @@ class IntegrationSpecification extends Specification {
serverConnConfig,
serverConnConfig.maxQos(),
MqttVersion.MQTT_5,
- MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
+ null,
serverConnConfig.receiveMaxPublishes(),
serverConnConfig.maxMessageSize(),
serverConnConfig.topicAliasMaxValue(),
@@ -169,7 +170,7 @@ class IntegrationSpecification extends Specification {
serverConnConfig,
serverConnConfig.maxQos(),
MqttVersion.MQTT_3_1_1,
- MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED,
+ null,
serverConnConfig.receiveMaxPublishes(),
serverConnConfig.maxMessageSize(),
serverConnConfig.topicAliasMaxValue(),
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy
index dfafd833..bfad38e8 100644
--- a/application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy
+++ b/application/src/test/groovy/javasabr/mqtt/broker/application/PublishRetryTest.groovy
@@ -26,6 +26,8 @@ import org.springframework.beans.factory.annotation.Autowired
class PublishRetryTest extends IntegrationSpecification {
+ private static final int testSessionExpiryIntervalInSecs = 120
+
@Autowired
MqttSessionService mqttSessionService
@@ -96,7 +98,7 @@ class PublishRetryTest extends IntegrationSpecification {
when:
publisher.connect().join()
subscriber.connect()
- subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive))
+ subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive, testSessionExpiryIntervalInSecs))
then:
with(subscriber.readNext() as ConnectAckMqttInMessage) {
reasonCode() == ConnectAckReasonCode.SUCCESS
@@ -127,7 +129,7 @@ class PublishRetryTest extends IntegrationSpecification {
when:
subscriber.disconnect()
subscriber.connect()
- subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive))
+ subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive, testSessionExpiryIntervalInSecs))
then:
with(subscriber.readNext() as ConnectAckMqttInMessage) {
reasonCode() == ConnectAckReasonCode.SUCCESS
@@ -226,7 +228,7 @@ class PublishRetryTest extends IntegrationSpecification {
when:
publisher.connect().join()
subscriber.connect()
- subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive))
+ subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive, testSessionExpiryIntervalInSecs))
then:
with(subscriber.readNext() as ConnectAckMqttInMessage) {
reasonCode() == ConnectAckReasonCode.SUCCESS
@@ -257,7 +259,7 @@ class PublishRetryTest extends IntegrationSpecification {
when:
subscriber.disconnect()
subscriber.connect()
- subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive))
+ subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive, testSessionExpiryIntervalInSecs))
then:
with(subscriber.readNext() as ConnectAckMqttInMessage) {
reasonCode() == ConnectAckReasonCode.SUCCESS
@@ -270,7 +272,7 @@ class PublishRetryTest extends IntegrationSpecification {
when:
subscriber.disconnect()
subscriber.connect()
- subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive))
+ subscriber.send(new ConnectMqtt5OutMessage(serviceId, keepAlive, testSessionExpiryIntervalInSecs))
subscriber.send(new PublishReceivedMqtt5OutMessage(
receivedPublish.messageId(),
PublishReceivedReasonCode.SUCCESS
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy
new file mode 100644
index 00000000..593caaba
--- /dev/null
+++ b/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy
@@ -0,0 +1,136 @@
+package javasabr.mqtt.broker.application.service
+
+import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
+import javasabr.mqtt.broker.application.IntegrationSpecification
+import javasabr.mqtt.network.session.ConfigurableNetworkMqttSession
+import javasabr.mqtt.service.ClientIdRegistry
+import javasabr.mqtt.service.session.MqttSessionService
+import org.springframework.beans.factory.annotation.Autowired
+
+import java.time.Duration
+
+class MqttSessionServiceTest extends IntegrationSpecification {
+
+ @Autowired
+ ClientIdRegistry clientIdRegistry
+
+ @Autowired
+ MqttSessionService mqttSessionService
+
+ def "should create fresh session if client request it"() {
+ given:
+ def clientId = fromAsync(clientIdRegistry.generate())
+ def client = buildExternalMqtt5Client(clientId)
+ def previousSession = fromAsync(mqttSessionService.createClean(clientId)) as ConfigurableNetworkMqttSession
+ previousSession.expiryInterval(Duration.ofHours(1))
+ waitForAsync(mqttSessionService.store(clientId, previousSession))
+ when:
+ def connectionResult = fromAsync(client.connectWith()
+ .cleanStart(true)
+ .sessionExpiryInterval(120)
+ .send())
+ then:
+ with(connectionResult) {
+ !isSessionPresent()
+ getReasonCode() == Mqtt5ConnAckReasonCode.SUCCESS
+ }
+ when:
+ waitForAsync(client.disconnect())
+ Thread.sleep(100)
+ def restored = fromAsync(mqttSessionService.restore(clientId))
+ then:
+ restored != null
+ restored !== previousSession
+ }
+
+ def "should not store session for client which doesn't require it"() {
+ given:
+ def clientId = fromAsync(clientIdRegistry.generate())
+ def client = buildExternalMqtt5Client(clientId)
+ when:
+ def connectionResult = fromAsync(client.connectWith().send())
+ then:
+ with(connectionResult) {
+ !isSessionPresent()
+ getReasonCode() == Mqtt5ConnAckReasonCode.SUCCESS
+ }
+ when:
+ waitForAsync(client.disconnect())
+ Thread.sleep(100)
+ def restored = fromAsync(mqttSessionService.restore(clientId))
+ then:
+ restored == null
+ }
+
+ def "should always store session for < MQTT 5.0 clients"() {
+ given:
+ def clientId = fromAsync(clientIdRegistry.generate())
+ def client = buildExternalMqtt311Client(clientId)
+ when:
+ def connectionResult = fromAsync(client.connect())
+ then:
+ !connectionResult.isSessionPresent()
+ when:
+ waitForAsync(client.disconnect())
+ Thread.sleep(100)
+ def restored = fromAsync(mqttSessionService.restore(clientId))
+ then:
+ restored != null
+ }
+
+ def "client should re-use MQTT session between connections"() {
+ given:
+ def clientId = fromAsync(clientIdRegistry.generate())
+ def client = buildExternalMqtt5Client(clientId)
+ when:
+ def restoredSession = mqttSessionService.restore(clientId).block()
+ then: 'there no any stored session for this client'
+ restoredSession == null
+ when:
+ def connectionResult = fromAsync(client.connectWith()
+ .cleanStart(false)
+ .sessionExpiryInterval(120)
+ .send())
+ then:
+ with(connectionResult) {
+ !isSessionPresent()
+ getReasonCode() == Mqtt5ConnAckReasonCode.SUCCESS
+ }
+ when:
+ waitForAsync(mqttSessionService.restore(clientId))
+ then: 'there is active session for this client'
+ def exception = thrown(IllegalStateException)
+ exception.message == "Client:[$clientId] already has active session"
+ when:
+ waitForAsync(client.disconnect())
+ Thread.sleep(100)
+ def restored = fromAsync(mqttSessionService.restore(clientId))
+ then:
+ restored != null
+ restored.clientId() == clientId
+ restored.expiryInterval() != null
+ when:
+ waitForAsync(mqttSessionService.restore(clientId))
+ then: 'The session was already restored'
+ exception = thrown(IllegalStateException)
+ exception.message == "Client:[$clientId] already has active session"
+ when:
+ waitForAsync(mqttSessionService.store(clientId, restored))
+ connectionResult = fromAsync(client.connectWith()
+ .cleanStart(false)
+ .sessionExpiryInterval(120)
+ .send())
+ then:
+ with(connectionResult) {
+ isSessionPresent()
+ getReasonCode() == Mqtt5ConnAckReasonCode.SUCCESS
+ }
+ when:
+ waitForAsync(client.disconnect())
+ Thread.sleep(100)
+ def restored2 = fromAsync(mqttSessionService.restore(clientId))
+ then: 'should be the same session instance'
+ restored2 != null
+ restored2 == restored
+ }
+}
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/service/NetworkMqttSessionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/service/NetworkMqttSessionServiceTest.groovy
deleted file mode 100644
index cd6a4e4a..00000000
--- a/application/src/test/groovy/javasabr/mqtt/broker/application/service/NetworkMqttSessionServiceTest.groovy
+++ /dev/null
@@ -1,47 +0,0 @@
-package javasabr.mqtt.broker.application.service
-
-import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode
-import javasabr.mqtt.broker.application.IntegrationSpecification
-import javasabr.mqtt.service.ClientIdRegistry
-import javasabr.mqtt.service.session.MqttSessionService
-import org.springframework.beans.factory.annotation.Autowired
-
-class NetworkMqttSessionServiceTest extends IntegrationSpecification {
-
- @Autowired
- ClientIdRegistry clientIdRegistry
-
- @Autowired
- MqttSessionService mqttSessionService
-
- def "subscriber should create and re-use mqtt session"() {
- given:
- def clientId = clientIdRegistry.generate().block()
- def client = buildExternalMqtt5Client(clientId)
- when:
- def shouldNoSession = mqttSessionService.restore(clientId).block()
- def result = client.connect().join()
- then:
- result.reasonCode == Mqtt5ConnAckReasonCode.SUCCESS
- shouldNoSession == null
- mqttSessionService.restore(clientId).block() == null
- when:
- client.disconnect().join()
- Thread.sleep(100)
- def restored = mqttSessionService.restore(clientId).block()
- then:
- restored != null
- when:
- mqttSessionService.store(clientId, restored, externalConnectionConfig.defaultSessionExpiryInterval()).block()
- client.connect().join()
- shouldNoSession = mqttSessionService.restore(clientId).block()
- then:
- shouldNoSession == null
- when:
- client.disconnect().join()
- Thread.sleep(100)
- restored = mqttSessionService.restore(clientId).block()
- then:
- restored != null
- }
-}
diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy
index 04e4306b..d672c51b 100644
--- a/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy
+++ b/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy
@@ -27,6 +27,7 @@ class SubscriptionServiceTest extends IntegrationSpecification {
when:
subscriber.connectWith()
.cleanStart(true)
+ .sessionExpiryInterval(120)
.send()
.join()
subscriber.subscribeWith()
diff --git a/application/src/test/resources/log4j2.xml b/application/src/test/resources/log4j2-test.xml
similarity index 89%
rename from application/src/test/resources/log4j2.xml
rename to application/src/test/resources/log4j2-test.xml
index 1c8d6108..9a30a182 100644
--- a/application/src/test/resources/log4j2.xml
+++ b/application/src/test/resources/log4j2-test.xml
@@ -24,6 +24,9 @@
+
+
+
diff --git a/base/src/main/java/javasabr/mqtt/base/util/DebugUtils.java b/base/src/main/java/javasabr/mqtt/base/util/DebugUtils.java
index 3b4ccfc4..b0f93f3c 100644
--- a/base/src/main/java/javasabr/mqtt/base/util/DebugUtils.java
+++ b/base/src/main/java/javasabr/mqtt/base/util/DebugUtils.java
@@ -58,7 +58,11 @@ public void serializeAsProperty(
}
if (fields == null || fields.contains(name)) {
- writer.serializeAsProperty(pojo, jsonGenerator, context);
+ try {
+ writer.serializeAsProperty(pojo, jsonGenerator, context);
+ } catch (IllegalAccessException ignore) {
+ // ignore
+ }
}
}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/MessageOutFactoryService.java b/core-service/src/main/java/javasabr/mqtt/service/MessageOutFactoryService.java
index 9f051ad1..63149c9f 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/MessageOutFactoryService.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/MessageOutFactoryService.java
@@ -1,5 +1,6 @@
package javasabr.mqtt.service;
+import javasabr.mqtt.model.MqttVersion;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;
@@ -7,6 +8,8 @@
public interface MessageOutFactoryService {
MqttMessageOutFactory resolveFactory(NetworkMqttUser user);
+
+ MqttMessageOutFactory resolveFactory(MqttVersion mqttVersion);
MqttMessageOutFactory resolveFactory(MqttConnection connection);
}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java b/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java
index 5ccb560f..3696a3f0 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractNetworkMqttUserReleaseHandler.java
@@ -34,25 +34,24 @@ public Mono> release(ConfigurableNetworkMqttUser user) {
}
protected Mono> releaseImpl(T user) {
-
String clientId = user.clientId();
- user.clientId(StringUtils.EMPTY);
-
if (StringUtils.isEmpty(clientId)) {
- log.warning(user.clientId(), "[%s] This client is already released or rejected"::formatted);
+ log.warning(user.ipAddress(), "[%s] Client is already released or rejected"::formatted);
return Mono.empty();
}
-
+ user.clientId(StringUtils.EMPTY);
+
NetworkMqttSession session = user.session();
Mono> asyncActions = null;
-
if (session != null) {
subscriptionService.cleanSubscriptions(user, session);
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
if (connectionConfig.sessionsEnabled()) {
- asyncActions = sessionService.store(clientId, session, connectionConfig.sessionExpiryInterval());
- user.session(null);
+ asyncActions = sessionService.store(clientId, session);
+ } else {
+ asyncActions = sessionService.delete(clientId, session);
}
+ user.session(null);
}
if (asyncActions != null) {
diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultMessageOutFactoryService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultMessageOutFactoryService.java
index 2e939fa9..9d364348 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultMessageOutFactoryService.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultMessageOutFactoryService.java
@@ -49,8 +49,12 @@ public MqttMessageOutFactory resolveFactory(NetworkMqttUser user) {
@Override
public MqttMessageOutFactory resolveFactory(MqttConnection connection) {
- MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
- MqttVersion mqttVersion = connectionConfig.mqttVersion();
+ MqttClientConnectionConfig clientConfig = connection.clientConnectionConfig();
+ return resolveFactory(clientConfig.mqttVersion());
+ }
+
+ @Override
+ public MqttMessageOutFactory resolveFactory(MqttVersion mqttVersion) {
try {
//noinspection DataFlowIssue
return messageOutFactories[mqttVersion.version()];
diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java
index e0cca0df..7e1ea215 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java
@@ -4,7 +4,6 @@
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.publishing.Publish;
-import javasabr.mqtt.model.subscriber.SingleSubscriber;
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.service.PublishDeliveringService;
import javasabr.mqtt.service.publish.handler.MqttPublishOutMessageHandler;
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java
index 9e5dedf3..66021782 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java
@@ -4,14 +4,15 @@
import static javasabr.mqtt.model.MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.SERVER_KEEP_ALIVE_DISABLED;
-import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED;
-import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET;
import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED;
import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET;
import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD;
-import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
import javasabr.mqtt.model.MqttClientConnectionConfig;
+import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.MqttVersion;
import javasabr.mqtt.model.exception.ConnectionRejectException;
@@ -21,13 +22,16 @@
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
+import javasabr.mqtt.network.session.ConfigurableNetworkMqttSession;
import javasabr.mqtt.network.session.NetworkMqttSession;
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
import javasabr.mqtt.service.AuthenticationService;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.SubscriptionService;
+import javasabr.mqtt.service.message.validator.MqttInMessageFieldValidator;
import javasabr.mqtt.service.session.MqttSessionService;
+import javasabr.rlib.common.util.ArrayUtils;
import javasabr.rlib.common.util.StringUtils;
import lombok.AccessLevel;
import lombok.CustomLog;
@@ -37,7 +41,7 @@
@CustomLog
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class ConnectInMqttInMessageHandler
- extends AbstractMqttInMessageHandler {
+ extends FieldsValidatedMqttInMessageHandler {
ClientIdRegistry clientIdRegistry;
AuthenticationService authenticationService;
@@ -49,8 +53,9 @@ public ConnectInMqttInMessageHandler(
AuthenticationService authenticationService,
MqttSessionService sessionService,
SubscriptionService subscriptionService,
- MessageOutFactoryService messageOutFactoryService) {
- super(ExternalNetworkMqttUser.class, ConnectMqttInMessage.class, messageOutFactoryService);
+ MessageOutFactoryService messageOutFactoryService,
+ List extends MqttInMessageFieldValidator super ExternalNetworkMqttUser, ConnectMqttInMessage>> fieldValidators) {
+ super(ExternalNetworkMqttUser.class, ConnectMqttInMessage.class, messageOutFactoryService, fieldValidators);
this.clientIdRegistry = clientIdRegistry;
this.authenticationService = authenticationService;
this.sessionService = sessionService;
@@ -73,17 +78,27 @@ protected void processValidMessage(
ExternalNetworkMqttUser user,
ConnectMqttInMessage message) {
resolveClientConnectionConfig(user, message);
+ super.processValidMessage(connection, user, message);
+ }
+
+ @Override
+ protected void processMessageWithValidFields(
+ MqttConnection connection,
+ ExternalNetworkMqttUser user,
+ ConnectMqttInMessage message) {
+ String username = Objects.requireNonNullElse(message.username(), StringUtils.EMPTY);
+ byte[] password = Objects.requireNonNullElse(message.password(), ArrayUtils.EMPTY_BYTE_ARRAY);
authenticationService
- .auth(message.username(), message.password())
+ .auth(username, password)
.flatMap(ifTrue(
user,
message, this::registerClient, BAD_USER_NAME_OR_PASSWORD, connectAckReasonCode -> reject(user, connectAckReasonCode)))
.flatMap(ifTrue(
user,
- message, this::restoreSession, CLIENT_IDENTIFIER_NOT_VALID, connectAckReasonCode -> reject(user, connectAckReasonCode)))
+ message, this::restoreSession, ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID, connectAckReasonCode -> reject(user, connectAckReasonCode)))
.subscribe();
}
-
+
private void reject(ExternalNetworkMqttUser user, ConnectAckReasonCode connectAckReasonCode) {
user.sendInBackground(messageOutFactoryService
.resolveFactory(user)
@@ -116,95 +131,113 @@ private Mono registerClient(ExternalNetworkMqttUser user, ConnectMqttIn
.map(ifTrue(newClientId, user::clientId)));
}
- private Mono restoreSession(ConfigurableNetworkMqttUser user, ConnectMqttInMessage packet) {
- if (packet.cleanStart()) {
+ private Mono restoreSession(ConfigurableNetworkMqttUser user, ConnectMqttInMessage message) {
+ if (message.cleanStart()) {
return sessionService
- .create(user.clientId())
- .flatMap(session -> onConnected(user, packet, session, false));
+ .createClean(user.clientId())
+ .flatMap(session -> onConnected(user, session, message, false));
} else {
return sessionService
.restore(user.clientId())
- .flatMap(session -> onConnected(user, packet, session, true))
+ .flatMap(session -> onConnected(user, session, message, true))
.switchIfEmpty(Mono.defer(() -> sessionService
- .create(user.clientId())
- .flatMap(session -> onConnected(user, packet, session, false))));
+ .createClean(user.clientId())
+ .flatMap(session -> onConnected(user, session, message, false))));
}
}
- private void resolveClientConnectionConfig(ConfigurableNetworkMqttUser user, ConnectMqttInMessage packet) {
+ private void resolveClientConnectionConfig(
+ ConfigurableNetworkMqttUser user,
+ ConnectMqttInMessage message) {
MqttConnection connection = user.connection();
MqttServerConnectionConfig serverConfig = connection.serverConnectionConfig();
// select result keep alive time
- int minimalKeepAliveTime = Math.max(serverConfig.minKeepAliveTime(), packet.keepAlive());
+ int minimalKeepAliveTime = Math.max(serverConfig.minKeepAliveTime(), message.keepAlive());
int keepAlive = serverConfig.keepAliveEnabled() ? minimalKeepAliveTime : SERVER_KEEP_ALIVE_DISABLED;
-
- // select result session expiry interval
- long sessionExpiryInterval = serverConfig.sessionsEnabled()
- ? packet.sessionExpiryInterval()
- : SESSION_EXPIRY_INTERVAL_DISABLED;
-
- if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_IS_NOT_SET) {
- sessionExpiryInterval = serverConfig.defaultSessionExpiryInterval();
- }
-
+
// select result receive max
- int receiveMaxPublishes = packet.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET
+ int receiveMaxPublishes = message.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET
? serverConfig.receiveMaxPublishes()
- : Math.min(packet.receiveMaxPublishes(), serverConfig.receiveMaxPublishes());
+ : Math.min(message.receiveMaxPublishes(), serverConfig.receiveMaxPublishes());
// select result maximum packet size
- var maximumPacketSize = packet.maxPacketSize() == MAXIMUM_MESSAGE_SIZE_IS_NOT_SET
+ var maximumPacketSize = message.maxMessageSize() == MAXIMUM_MESSAGE_SIZE_IS_NOT_SET
? serverConfig.maxMessageSize()
- : Math.min(packet.maxPacketSize(), serverConfig.maxMessageSize());
+ : Math.min(message.maxMessageSize(), serverConfig.maxMessageSize());
// select result topic alias maximum
- var topicAliasMaxValue = packet.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_IS_NOT_SET
+ var topicAliasMaxValue = message.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_IS_NOT_SET
? TOPIC_ALIAS_MAXIMUM_DISABLED
- : Math.min(packet.topicAliasMaxValue(), serverConfig.topicAliasMaxValue());
+ : Math.min(message.topicAliasMaxValue(), serverConfig.topicAliasMaxValue());
connection.configure(new MqttClientConnectionConfig(
serverConfig,
serverConfig.maxQos(),
- packet.mqttVersion(),
- sessionExpiryInterval,
+ message.mqttVersion(),
+ resolveSessionExpiryInterval(message, serverConfig),
receiveMaxPublishes,
maximumPacketSize,
topicAliasMaxValue,
keepAlive,
- packet.requestResponseInformation(),
- packet.requestProblemInformation()));
+ message.requestResponseInformation(),
+ message.requestProblemInformation()));
+ }
+
+ private Duration resolveSessionExpiryInterval(
+ ConnectMqttInMessage message,
+ MqttServerConnectionConfig serverConfig) {
+ // do not store such sessions after closing connection
+ if (!serverConfig.sessionsEnabled()) {
+ return MqttProperties.SESSION_EXPIRY_DURATION_DISABLED;
+ }
+ long expiryInterval = message.sessionExpiryInterval();
+ return switch (expiryInterval) {
+ case MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY -> MqttProperties.SESSION_EXPIRY_DURATION_INFINITY;
+ case MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED -> MqttProperties.SESSION_EXPIRY_DURATION_DISABLED;
+ default -> Duration.ofSeconds(expiryInterval);
+ };
}
private Mono onConnected(
ConfigurableNetworkMqttUser user,
- ConnectMqttInMessage message,
NetworkMqttSession session,
+ ConnectMqttInMessage message,
boolean sessionRestored) {
MqttConnection connection = user.connection();
MqttServerConnectionConfig serverConfig = connection.serverConnectionConfig();
MqttClientConnectionConfig clientConfig = connection.clientConnectionConfig();
+ if (session instanceof ConfigurableNetworkMqttSession configurableSession) {
+ configurableSession.expiryInterval(clientConfig.sessionExpiryInterval());
+ }
+
// if it was closed in parallel
if (connection.closed() && serverConfig.sessionsEnabled()) {
// store the session again
- return sessionService.store(user.clientId(), session, clientConfig.sessionExpiryInterval());
+ return sessionService.store(user.clientId(), session);
}
user.session(session);
+ // already validated
+ String requestedClientId = Objects.requireNonNull(message.clientId());
+ long requestedSessionExpiryInterval = message.sessionExpiryInterval();
+ int requestedKeepAlive = message.keepAlive();
+ int requestedReceiveMaxPublishes = message.receiveMaxPublishes();
+
var connectAck = messageOutFactoryService
.resolveFactory(user)
.newConnectAck(
user,
ConnectAckReasonCode.SUCCESS,
sessionRestored,
- message.clientId(),
- message.sessionExpiryInterval(),
- message.keepAlive(),
- message.receiveMaxPublishes());
+ requestedClientId,
+ requestedSessionExpiryInterval,
+ requestedKeepAlive,
+ requestedReceiveMaxPublishes);
subscriptionService.restoreSubscriptions(user, session);
@@ -214,12 +247,10 @@ private Mono onConnected(
}
private boolean onSentConnAck(ConfigurableNetworkMqttUser user, NetworkMqttSession session, boolean result) {
-
if (!result) {
log.warning(user.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted);
return false;
}
-
session.resendNotConfirmedPublishesTo(user);
return true;
}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java
index 967628b9..137faebb 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/FieldsValidatedMqttInMessageHandler.java
@@ -33,7 +33,7 @@ protected FieldsValidatedMqttInMessageHandler(
}
@Override
- protected final void processValidMessage(MqttConnection connection, U user, M message) {
+ protected void processValidMessage(MqttConnection connection, U user, M message) {
for (MqttInMessageFieldValidator super U, M> fieldValidator : fieldValidators) {
if (fieldValidator.isNotValid(connection, user, message)) {
return;
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java
index 97e61c25..80472671 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java
@@ -45,7 +45,7 @@ public MqttOutMessage newConnectAck(
String requestedClientId,
long requestedSessionExpiryInterval,
int requestedKeepAlive,
- int requestedReceiveMax,
+ int requestedReceiveMaxPublishes,
String reason,
String serverReference,
String responseInformation,
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java
index f367f19c..61030f43 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java
@@ -135,7 +135,7 @@ public MqttOutMessage newDisconnect(
userProperties,
reason,
serverReference,
- connectionConfig.sessionExpiryInterval());
+ connectionConfig.sessionExpiryIntervalInSecs());
}
@Override
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java
index 97e6972b..c29291fa 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java
@@ -20,7 +20,6 @@
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.rlib.collections.array.Array;
-import javasabr.rlib.collections.array.MutableArray;
import javasabr.rlib.common.util.ArrayUtils;
import javasabr.rlib.common.util.StringUtils;
import org.jspecify.annotations.Nullable;
@@ -36,7 +35,7 @@ public abstract MqttOutMessage newConnectAck(
String requestedClientId,
long requestedSessionExpiryInterval,
int requestedKeepAlive,
- int requestedReceiveMax,
+ int requestedReceiveMaxPublishes,
String reason,
String serverReference,
String responseInformation,
@@ -51,7 +50,7 @@ public MqttOutMessage newConnectAck(
String requestedClientId,
long requestedSessionExpiryInterval,
int requestedKeepAlive,
- int requestedReceiveMax) {
+ int requestedReceiveMaxPublishes) {
return newConnectAck(
user,
reasonCode,
@@ -59,13 +58,13 @@ public MqttOutMessage newConnectAck(
requestedClientId,
requestedSessionExpiryInterval,
requestedKeepAlive,
- requestedReceiveMax,
+ requestedReceiveMaxPublishes,
StringUtils.EMPTY,
StringUtils.EMPTY,
StringUtils.EMPTY,
StringUtils.EMPTY,
ArrayUtils.EMPTY_BYTE_ARRAY,
- MutableArray.ofType(StringPair.class));
+ MqttMessage.EMPTY_USER_PROPERTIES);
}
public MqttOutMessage newConnectAck(NetworkMqttUser user, ConnectAckReasonCode reasonCode) {
@@ -75,7 +74,7 @@ public MqttOutMessage newConnectAck(NetworkMqttUser user, ConnectAckReasonCode r
reasonCode,
false,
StringUtils.EMPTY,
- connectionConfig.sessionExpiryInterval(),
+ connectionConfig.sessionExpiryIntervalInSecs(),
connectionConfig.keepAlive(),
connectionConfig.receiveMaxPublishes(),
StringUtils.EMPTY,
@@ -83,7 +82,7 @@ public MqttOutMessage newConnectAck(NetworkMqttUser user, ConnectAckReasonCode r
StringUtils.EMPTY,
StringUtils.EMPTY,
ArrayUtils.EMPTY_BYTE_ARRAY,
- MutableArray.ofType(StringPair.class));
+ MqttMessage.EMPTY_USER_PROPERTIES);
}
public MqttOutMessage newPublish(
@@ -104,7 +103,7 @@ public MqttOutMessage newPublish(
PayloadFormat.UNDEFINED,
null,
null,
- MutableArray.ofType(StringPair.class));
+ MqttMessage.EMPTY_USER_PROPERTIES);
}
public abstract MqttOutMessage newPublish(
diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/validator/ClientIdMqttInMessageFieldValidator.java b/core-service/src/main/java/javasabr/mqtt/service/message/validator/ClientIdMqttInMessageFieldValidator.java
new file mode 100644
index 00000000..6a0e0763
--- /dev/null
+++ b/core-service/src/main/java/javasabr/mqtt/service/message/validator/ClientIdMqttInMessageFieldValidator.java
@@ -0,0 +1,44 @@
+package javasabr.mqtt.service.message.validator;
+
+import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
+import javasabr.mqtt.network.MqttConnection;
+import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
+import javasabr.mqtt.network.user.NetworkMqttUser;
+import javasabr.mqtt.service.MessageOutFactoryService;
+import lombok.AccessLevel;
+import lombok.CustomLog;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.FieldDefaults;
+
+@CustomLog
+@RequiredArgsConstructor
+@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
+public class ClientIdMqttInMessageFieldValidator extends
+ MqttInMessageFieldValidator {
+
+ public static final int ORDER = 10;
+
+ MessageOutFactoryService messageOutFactoryService;
+
+ @Override
+ public boolean isNotValid(MqttConnection connection, NetworkMqttUser user, ConnectMqttInMessage message) {
+ String clientId = message.clientId();
+ if (clientId == null || clientId.isBlank()) {
+ handleNotValidClientId(user);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int order() {
+ return ORDER;
+ }
+
+ private void handleNotValidClientId(NetworkMqttUser user) {
+ user.closeWithReason(messageOutFactoryService
+ .resolveFactory(user)
+ .newConnectAck(user, ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID));
+ }
+}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java
index 5e28bc5f..c63b3ba8 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java
@@ -3,7 +3,6 @@
import javasabr.mqtt.model.MqttUser;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.model.publishing.Publish;
-import javasabr.mqtt.model.subscriber.SingleSubscriber;
public interface MqttPublishOutMessageHandler {
diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java b/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java
index 1b14db7c..758f0d2c 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java
@@ -5,9 +5,14 @@
public interface MqttSessionService {
+ Mono createClean(String clientId);
+
Mono restore(String clientId);
- Mono create(String clientId);
-
- Mono store(String clientId, NetworkMqttSession session, long expiryInterval);
+ /**
+ * @return async result 'true' if session was stored
+ */
+ Mono store(String clientId, NetworkMqttSession session);
+
+ Mono delete(String clientId, NetworkMqttSession session);
}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java
index 117b25ab..b3958372 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java
@@ -1,26 +1,29 @@
package javasabr.mqtt.service.session.impl;
import java.io.Closeable;
-import javasabr.mqtt.network.session.ConfigurableNetworkMqttSession;
+import java.time.Duration;
+import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.network.session.NetworkMqttSession;
import javasabr.mqtt.service.session.MqttSessionService;
import javasabr.rlib.collections.array.ArrayFactory;
import javasabr.rlib.collections.array.MutableArray;
-import javasabr.rlib.collections.dictionary.Dictionary;
import javasabr.rlib.collections.dictionary.DictionaryFactory;
import javasabr.rlib.collections.dictionary.LockableRefToRefDictionary;
-import javasabr.rlib.collections.dictionary.MutableRefToRefDictionary;
import javasabr.rlib.common.util.ThreadUtils;
import lombok.AccessLevel;
import lombok.CustomLog;
import lombok.experimental.FieldDefaults;
+import org.jspecify.annotations.Nullable;
import reactor.core.publisher.Mono;
@CustomLog
@FieldDefaults(level = AccessLevel.PRIVATE)
public class InMemoryMqttSessionService implements MqttSessionService, Closeable {
- final LockableRefToRefDictionary storedSession;
+ final LockableRefToRefDictionary activeSessions;
+ final LockableRefToRefDictionary storedSessions;
+ final LockableRefToRefDictionary storedExpirableSessions;
+
final Thread cleanThread;
final int cleanIntervalInMs;
@@ -28,7 +31,9 @@ public class InMemoryMqttSessionService implements MqttSessionService, Closeable
public InMemoryMqttSessionService(int cleanIntervalInMs) {
this.cleanIntervalInMs = cleanIntervalInMs;
- this.storedSession = DictionaryFactory.stampedLockBasedRefToRefDictionary();
+ this.activeSessions = DictionaryFactory.stampedLockBasedRefToRefDictionary();
+ this.storedExpirableSessions = DictionaryFactory.stampedLockBasedRefToRefDictionary();
+ this.storedSessions = DictionaryFactory.stampedLockBasedRefToRefDictionary();
this.cleanThread = new Thread(this::cleanup, "InMemoryMqttSessionService-Cleanup");
this.cleanThread.setPriority(Thread.MIN_PRIORITY);
this.cleanThread.setDaemon(true);
@@ -36,111 +41,235 @@ public InMemoryMqttSessionService(int cleanIntervalInMs) {
}
@Override
- public Mono restore(String clientId) {
-
- InMemoryNetworkMqttSession session = storedSession
- .operations()
- .getInWriteLock(clientId, MutableRefToRefDictionary::remove);
+ public Mono createClean(String clientId) {
+ discardStoredSession(clientId);
+ // check if we already have an active session
+ long stamp = activeSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession currentActiveSession = activeSessions.get(clientId);
+ if (currentActiveSession != null) {
+ throw new IllegalStateException("Client:[%s] already has active session".formatted(clientId));
+ }
+ InMemoryNetworkMqttSession newCleanSession = new InMemoryNetworkMqttSession(clientId);
+ activeSessions.put(clientId, newCleanSession);
+ log.debug(clientId, "[%s] Created new clean session"::formatted);
+ return Mono.just(newCleanSession);
+ } finally {
+ activeSessions.writeUnlock(stamp);
+ }
+ }
- if (session != null) {
- log.debug(clientId, "[%s] Restored session"::formatted);
- } else {
+ @Override
+ public Mono restore(String clientId) {
+ // check if we already have an active session
+ long stamp = activeSessions.readLock();
+ try {
+ InMemoryNetworkMqttSession currentActiveSession = activeSessions.get(clientId);
+ if (currentActiveSession != null) {
+ throw new IllegalStateException("Client:[%s] already has active session".formatted(clientId));
+ }
+ } finally {
+ activeSessions.readUnlock(stamp);
+ }
+ InMemoryNetworkMqttSession restoredSession = tryToRestoreSession(clientId);
+ if (restoredSession == null) {
log.debug(clientId, "[%s] No any stored session"::formatted);
+ return Mono.empty();
}
-
- return Mono.justOrEmpty(session);
+ stamp = activeSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession currentActiveSession = activeSessions.get(clientId);
+ if (currentActiveSession != null) {
+ throw new IllegalStateException("Client:[%s] already has active session".formatted(clientId));
+ }
+ activeSessions.put(clientId, restoredSession);
+ } finally {
+ activeSessions.writeUnlock(stamp);
+ }
+ return Mono.just(restoredSession);
}
@Override
- public Mono create(String clientId) {
-
- InMemoryNetworkMqttSession session = storedSession
- .operations()
- .getInWriteLock(clientId, MutableRefToRefDictionary::remove);
-
- if (session != null) {
- log.debug(clientId, "Removed old session for client:[%s]"::formatted);
+ public Mono store(String clientId, NetworkMqttSession session) {
+ // check if we already have an active session
+ long stamp = activeSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession currentActiveSession = activeSessions.get(clientId);
+ if (currentActiveSession != session) {
+ throw new IllegalStateException("Client:[%s] has another active session".formatted(clientId));
+ }
+ activeSessions.remove(clientId);
+ Duration expiryInterval = currentActiveSession.expiryInterval();
+ if (expiryInterval == MqttProperties.SESSION_EXPIRY_DURATION_DISABLED) {
+ return Mono.just(false);
+ } else if (expiryInterval == MqttProperties.SESSION_EXPIRY_DURATION_INFINITY) {
+ storeNotExpirableSession(clientId, currentActiveSession);
+ } else {
+ storeExpirableSession(clientId, expiryInterval, currentActiveSession);
+ }
+ return Mono.just(true);
+ } finally {
+ activeSessions.writeUnlock(stamp);
}
-
- log.debug(clientId, "Created new session for client:[%s]"::formatted);
-
- return Mono.just(new InMemoryNetworkMqttSession(clientId));
}
@Override
- public Mono store(String clientId, NetworkMqttSession session, long expiryInterval) {
-
- var configurable = (InMemoryNetworkMqttSession) session;
- configurable.expirationTime(System.currentTimeMillis() + (expiryInterval * 1000));
+ public Mono delete(String clientId, NetworkMqttSession session) {
+ long stamp = activeSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession currentActiveSession = activeSessions.get(clientId);
+ if (currentActiveSession != session) {
+ throw new IllegalStateException("Client:[%s] has another active session".formatted(clientId));
+ }
+ activeSessions.remove(clientId);
+ currentActiveSession.clear();
+ return Mono.just(true);
+ } finally {
+ activeSessions.writeUnlock(stamp);
+ }
+ }
- storedSession
- .operations()
- .inWriteLock(clientId, configurable, MutableRefToRefDictionary::put);
+ @Nullable
+ private InMemoryNetworkMqttSession tryToRestoreSession(String clientId) {
+ // try to find stored expirable session to restore
+ long stamp = storedExpirableSessions.writeLock();
+ try {
+ ExpirableSession expirableSession = storedExpirableSessions.remove(clientId);
+ if (expirableSession != null) {
+ log.debug(clientId, "[%s] Restored expirable session"::formatted);
+ return expirableSession.session();
+ }
+ } finally {
+ storedExpirableSessions.writeUnlock(stamp);
+ }
+ // try to find stored not expirable session to restore
+ stamp = storedSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession notExpirableSession = storedSessions.remove(clientId);
+ if (notExpirableSession != null) {
+ log.debug(clientId, "[%s] Restored not expirable session"::formatted);
+ return notExpirableSession;
+ }
+ } finally {
+ storedSessions.writeUnlock(stamp);
+ }
+ return null;
+ }
+
+ private void storeNotExpirableSession(String clientId, InMemoryNetworkMqttSession activeSession) {
+ long stamp = storedSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession previous = storedSessions.get(clientId);
+ if (previous != null) {
+ throw new IllegalStateException("Client:[%s] already has stored not expirable session".formatted(clientId));
+ }
+ storedSessions.put(clientId, activeSession);
+ log.info(clientId, "[%s] Stored not expirable session"::formatted);
+ } finally {
+ storedSessions.writeUnlock(stamp);
+ }
+ }
- log.debug(clientId, "Stored session for client:[%s]"::formatted);
+ private void storeExpirableSession(
+ String clientId,
+ Duration expiryInterval,
+ InMemoryNetworkMqttSession currentActiveSession) {
+ long stamp = storedExpirableSessions.writeLock();
+ try {
+ ExpirableSession previous = storedExpirableSessions.get(clientId);
+ if (previous != null) {
+ throw new IllegalStateException("Client:[%s] already has stored expirable session".formatted(clientId));
+ }
+ storedExpirableSessions.put(clientId, ExpirableSession.of(expiryInterval, currentActiveSession));
+ log.info(clientId, expiryInterval, "[%s] Stored expirable session with expiration:[%s]"::formatted);
+ } finally {
+ storedExpirableSessions.writeUnlock(stamp);
+ }
+ }
- return Mono.just(Boolean.TRUE);
+ private void discardStoredSession(String clientId) {
+ // try to find stored expirable session to discard
+ long stamp = storedExpirableSessions.writeLock();
+ try {
+ ExpirableSession expirableSession = storedExpirableSessions.remove(clientId);
+ if (expirableSession != null) {
+ log.debug(clientId, "[%s] Discard expirable session"::formatted);
+ expirableSession.session().clear();
+ return;
+ }
+ } finally {
+ storedExpirableSessions.writeUnlock(stamp);
+ }
+ // try to find stored not expirable session to discard
+ stamp = storedSessions.writeLock();
+ try {
+ InMemoryNetworkMqttSession storedSession = storedSessions.remove(clientId);
+ if (storedSession != null) {
+ log.debug(clientId, "[%s] Discard not expirable session"::formatted);
+ storedSession.clear();
+ return;
+ }
+ } finally {
+ storedSessions.writeUnlock(stamp);
+ }
+ log.debug(clientId, "[%s] No any stored session to discard"::formatted);
}
private void cleanup() {
-
- var toCheck = ArrayFactory.mutableArray(InMemoryNetworkMqttSession.class);
- var toRemove = ArrayFactory.mutableArray(InMemoryNetworkMqttSession.class);
-
+ var sessionsToCheck = ArrayFactory.mutableArray(ExpirableSession.class);
+ var expiredSessions = ArrayFactory.mutableArray(ExpirableSession.class);
while (!closed) {
ThreadUtils.sleep(cleanIntervalInMs);
-
- toCheck.clear();
- toRemove.clear();
-
- storedSession
- .operations()
- .inReadLock(toCheck, Dictionary::values);
-
- if (findToRemove(toCheck, toRemove)) {
+ if (storedExpirableSessions.isEmpty()) {
continue;
}
-
- storedSession
- .operations()
- .inWriteLock(toRemove, InMemoryMqttSessionService::removeExpiredSessions);
- }
- }
-
- private static void removeExpiredSessions(
- LockableRefToRefDictionary sessions,
- MutableArray expired) {
- long time = System.currentTimeMillis();
- for (ConfigurableNetworkMqttSession session : expired) {
- if (session.expirationTime() <= time) {
+ long stamp = storedExpirableSessions.readLock();
+ try {
+ storedExpirableSessions.values(sessionsToCheck);
+ } finally {
+ storedExpirableSessions.readUnlock(stamp);
+ }
+ if (sessionsToCheck.isEmpty()) {
continue;
}
-
- InMemoryNetworkMqttSession removed = sessions.remove(session.clientId());
- log.debug(session.clientId(), "Removed expired session for client:[%]"::formatted);
-
- // if we already have new session under the same client id
- if (removed != null && removed != session) {
- sessions.put(session.clientId(), removed);
- } else if (removed != null) {
- removed.clear();
+ collectExpiredSessions(sessionsToCheck, expiredSessions);
+ if (!expiredSessions.isEmpty()) {
+ deleteExpiredSessions(expiredSessions);
+ expiredSessions.clear();
}
+ sessionsToCheck.clear();
}
}
- private boolean findToRemove(
- MutableArray toCheck,
- MutableArray toRemove) {
-
- var currentTime = System.currentTimeMillis();
-
- for (InMemoryNetworkMqttSession session : toCheck) {
- if (session.expirationTime() > currentTime) {
- toRemove.add(session);
+ private void collectExpiredSessions(
+ MutableArray sessionsToCheck,
+ MutableArray expiredSessions) {
+ long currentTime = System.currentTimeMillis();
+ for (ExpirableSession expirableSession : sessionsToCheck) {
+ if (expirableSession.expireAfter() < currentTime) {
+ expiredSessions.add(expirableSession);
}
}
+ }
- return toRemove.isEmpty();
+ private void deleteExpiredSessions(MutableArray expiredSessions) {
+ long stamp = storedExpirableSessions.writeLock();
+ try {
+ for (ExpirableSession expirableSession : expiredSessions) {
+ InMemoryNetworkMqttSession session = expirableSession.session();
+ ExpirableSession currentlyStored = storedExpirableSessions.remove(session.clientId());
+ if (expirableSession == currentlyStored) {
+ // nothing was changed during this iteration
+ log.info(session.clientId(), "[%] Removed expired session"::formatted);
+ session.clear();
+ } else if (currentlyStored != null) {
+ // return back the other instance of stored session for the same client id
+ storedExpirableSessions.put(session.clientId(), currentlyStored);
+ }
+ }
+ } finally {
+ storedExpirableSessions.writeUnlock(stamp);
+ }
}
@Override
@@ -148,4 +277,11 @@ public void close() {
closed = true;
cleanThread.interrupt();
}
+
+ private record ExpirableSession(long expireAfter, InMemoryNetworkMqttSession session) {
+ private static ExpirableSession of(Duration expiryInterval, InMemoryNetworkMqttSession session) {
+ long expireAfter = System.currentTimeMillis() + expiryInterval.toMillis();
+ return new ExpirableSession(expireAfter, session);
+ }
+ }
}
diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java
index 080d12ca..c20feb66 100644
--- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java
+++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java
@@ -1,5 +1,6 @@
package javasabr.mqtt.service.session.impl;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.network.session.ConfigurableNetworkMqttSession;
@@ -9,17 +10,16 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
-import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
@CustomLog
@Accessors
-@ToString(of = "clientId")
-@EqualsAndHashCode(of = "clientId")
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
@FieldDefaults(level = AccessLevel.PRIVATE)
public class InMemoryNetworkMqttSession implements ConfigurableNetworkMqttSession {
+ @EqualsAndHashCode.Include
final String clientId;
final AtomicInteger messageIdGenerator;
@@ -38,7 +38,7 @@ public class InMemoryNetworkMqttSession implements ConfigurableNetworkMqttSessio
@Getter
@Setter
- volatile long expirationTime = -1;
+ volatile Duration expiryInterval;
public InMemoryNetworkMqttSession(String clientId) {
this.clientId = clientId;
@@ -49,6 +49,7 @@ public InMemoryNetworkMqttSession(String clientId) {
this.outProcessingPublishes = new InMemoryProcessingPublishes(this);
this.activeSubscriptions = new InMemoryActiveSubscriptions();
this.topicNameMapping = new InMemoryTopicNameMapping();
+ this.expiryInterval = MqttProperties.SESSION_EXPIRY_DURATION_DISABLED;
}
@Override
diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy
index 8b64ea7e..1d24371d 100644
--- a/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy
+++ b/core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy
@@ -12,9 +12,9 @@ import javasabr.mqtt.network.user.NetworkMqttUser
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService
import javasabr.mqtt.service.impl.DefaultPublishReceivingService
-import javasabr.mqtt.service.impl.InMemoryRetainMessageService
import javasabr.mqtt.service.impl.DefaultTopicService
import javasabr.mqtt.service.impl.DisabledAuthorizationService
+import javasabr.mqtt.service.impl.InMemoryRetainMessageService
import javasabr.mqtt.service.impl.InMemorySubscriptionService
import javasabr.mqtt.service.message.handler.impl.PublishReleaseMqttInMessageHandler
import javasabr.mqtt.service.message.out.factory.Mqtt311MessageOutFactory
@@ -41,6 +41,7 @@ import spock.lang.Specification
import java.nio.channels.AsynchronousSocketChannel
import java.nio.charset.StandardCharsets
+import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
abstract class IntegrationServiceSpecification extends Specification {
@@ -127,7 +128,6 @@ abstract class IntegrationServiceSpecification extends Specification {
MqttProperties.SERVER_KEEP_ALIVE_DEFAULT,
MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_DEFAULT,
MqttProperties.TOPIC_ALIAS_MAX_DEFAULT,
- 0,
true,
true,
true,
@@ -149,7 +149,7 @@ abstract class IntegrationServiceSpecification extends Specification {
serverConnectionConfig,
{ MqttConnection ownedConnection ->
def generatedClientId = "mockedClient_${clientIdGenerator.incrementAndGet()}"
- def createdSession = defaultMqttSessionService.create(generatedClientId).block()
+ def createdSession = defaultMqttSessionService.createClean(generatedClientId).block()
def user = new TestExternalNetworkMqttUser(ownedConnection, Mock(NetworkMqttUserReleaseHandler))
user.session(createdSession)
user.clientId(generatedClientId)
@@ -160,7 +160,7 @@ abstract class IntegrationServiceSpecification extends Specification {
serverConnectionConfig,
serverConnectionConfig.maxQos(),
mqttVersion,
- MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT,
+ MqttProperties.SESSION_EXPIRY_DURATION_DISABLED,
serverConnectionConfig.receiveMaxPublishes(),
serverConnectionConfig.maxMessageSize(),
serverConnectionConfig.topicAliasMaxValue(),
diff --git a/model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java b/model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java
index 3379fb84..216e032e 100644
--- a/model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java
+++ b/model/src/main/java/javasabr/mqtt/model/MqttClientConnectionConfig.java
@@ -1,10 +1,13 @@
package javasabr.mqtt.model;
+import java.time.Duration;
+import org.jspecify.annotations.Nullable;
+
public record MqttClientConnectionConfig(
MqttServerConnectionConfig server,
QoS maxQos,
MqttVersion mqttVersion,
- long sessionExpiryInterval,
+ Duration sessionExpiryInterval,
int receiveMaxPublishes,
int maxMessageSize,
int topicAliasMaxValue,
@@ -43,4 +46,8 @@ public int maxStringLength() {
public int maxBinarySize() {
return server.maxBinarySize();
}
+
+ public long sessionExpiryIntervalInSecs() {
+ return sessionExpiryInterval.toSeconds();
+ }
}
diff --git a/model/src/main/java/javasabr/mqtt/model/MqttProperties.java b/model/src/main/java/javasabr/mqtt/model/MqttProperties.java
index 774c498b..8b98d85e 100644
--- a/model/src/main/java/javasabr/mqtt/model/MqttProperties.java
+++ b/model/src/main/java/javasabr/mqtt/model/MqttProperties.java
@@ -1,5 +1,7 @@
package javasabr.mqtt.model;
+import java.time.Duration;
+
public interface MqttProperties {
QoS MAXIMUM_QOS_DEFAULT = QoS.EXACTLY_ONCE;
@@ -8,11 +10,13 @@ public interface MqttProperties {
int MAXIMUM_PACKET_ID = 0xFFFF;
long SESSION_EXPIRY_INTERVAL_DISABLED = 0;
- long SESSION_EXPIRY_INTERVAL_DEFAULT = 120;
long SESSION_EXPIRY_INTERVAL_MIN = 0;
long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL;
long SESSION_EXPIRY_INTERVAL_IS_NOT_SET = -1;
+ Duration SESSION_EXPIRY_DURATION_INFINITY = Duration.ofSeconds(0xFFFFFFFFL);
+ Duration SESSION_EXPIRY_DURATION_DISABLED = Duration.ZERO;
+
int RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET = 0;
int RECEIVE_MAXIMUM_PUBLISHES_MIN = 1;
int RECEIVE_MAXIMUM_PUBLISHES_MAX = 0xFFFF;
@@ -30,9 +34,12 @@ public interface MqttProperties {
long MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET = -1;
long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0;
long MESSAGE_EXPIRY_INTERVAL_MIN = 0;
+ long MESSAGE_EXPIRY_INTERVAL_MAX = 268_435_455;
- int TOPIC_ALIAS_MAXIMUM_IS_NOT_SET = 0;
+ int TOPIC_ALIAS_MAXIMUM_IS_NOT_SET = -1;
int TOPIC_ALIAS_MAXIMUM_DISABLED = 0;
+ int TOPIC_ALIAS_MAXIMUM_MIN = 0;
+ int TOPIC_ALIAS_MAXIMUM_MAX = 0xFFFF;
int SERVER_KEEP_ALIVE_IS_NOT_SET = -1;
int SERVER_KEEP_ALIVE_DISABLED = 0;
@@ -66,4 +73,9 @@ public interface MqttProperties {
boolean SESSIONS_ENABLED_DEFAULT = true;
boolean KEEP_ALIVE_ENABLED_DEFAULT = false;
+
+ int WILL_DELAY_INTERVAL_IS_NOT_SET = -1;
+ int WILL_DELAY_INTERVAL_DEFAULT = 0;
+ int WILL_DELAY_INTERVAL_MIN = 0;
+ int WILL_DELAY_INTERVAL_MAX = 268_435_455;
}
diff --git a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java
index f66d1dc0..37dcfeaa 100644
--- a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java
+++ b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java
@@ -18,6 +18,9 @@ public interface MqttProtocolErrors {
String PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Shared Subscription Available'";
String PROVIDED_INVALID_SERVER_KEEP_ALIVE = "Provided invalid 'Server Keep Alive'";
String PROVIDED_INVALID_RESPONSE_TOPIC = "Provided invalid 'Response Topic'";
+ String PROVIDED_INVALID_REQUEST_RESPONSE_INFORMATION = "Provided invalid 'Request Response Information'";
+ String PROVIDED_INVALID_REQUEST_PROBLEM_INFORMATION = "Provided invalid 'Request Problem Information'";
+ String PROVIDED_INVALID_WILL_DELAY_INTERVAL = "Provided invalid 'Will Delay Interval'";
String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Provided unsupported 'QoS' or 'RetainHandling'";
String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'";
diff --git a/model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java b/model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java
index 46ae8745..ef317e9c 100644
--- a/model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java
+++ b/model/src/main/java/javasabr/mqtt/model/MqttServerConnectionConfig.java
@@ -13,7 +13,6 @@ public record MqttServerConnectionConfig(
int minKeepAliveTime,
int receiveMaxPublishes,
int topicAliasMaxValue,
- long defaultSessionExpiryInterval,
boolean keepAliveEnabled,
boolean sessionsEnabled,
boolean retainAvailable,
@@ -30,7 +29,6 @@ public MqttServerConnectionConfig(
int minKeepAliveTime,
int receiveMaxPublishes,
int topicAliasMaxValue,
- long defaultSessionExpiryInterval,
boolean keepAliveEnabled,
boolean sessionsEnabled,
boolean retainAvailable,
@@ -63,7 +61,6 @@ public MqttServerConnectionConfig(
topicAliasMaxValue,
MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED,
MqttProperties.TOPIC_ALIAS_MAX);
- this.defaultSessionExpiryInterval = defaultSessionExpiryInterval;
this.keepAliveEnabled = keepAliveEnabled;
this.sessionsEnabled = sessionsEnabled;
this.retainAvailable = retainAvailable;
diff --git a/model/src/main/java/javasabr/mqtt/model/MqttVersion.java b/model/src/main/java/javasabr/mqtt/model/MqttVersion.java
index df1d4189..55adc7c1 100644
--- a/model/src/main/java/javasabr/mqtt/model/MqttVersion.java
+++ b/model/src/main/java/javasabr/mqtt/model/MqttVersion.java
@@ -21,9 +21,7 @@ public enum MqttVersion {
private static final Map NAME_LEVEL_VERSIONS;
static {
-
var map = new HashMap();
-
for (MqttVersion mqttVersion : values()) {
if (mqttVersion.version < 0) {
continue;
@@ -62,19 +60,20 @@ public boolean isLowerThan(MqttVersion version) {
return ordinal() < version.ordinal();
}
+ public boolean isEqualOrHigherThan(MqttVersion version) {
+ return ordinal() >= version.ordinal();
+ }
+
public static MqttVersion of(String name, byte level) {
-
if (level < 0) {
return MqttVersion.UNKNOWN;
}
-
@Nullable MqttVersion[] availableVersions = NAME_LEVEL_VERSIONS.get(name);
if (availableVersions == null) {
return MqttVersion.UNKNOWN;
} else if (availableVersions.length <= level || availableVersions[level] == null) {
return MqttVersion.UNKNOWN;
}
-
//noinspection DataFlowIssue
return availableVersions[level];
}
diff --git a/model/src/main/java/javasabr/mqtt/model/exception/ConnectionRejectException.java b/model/src/main/java/javasabr/mqtt/model/exception/ConnectionRejectException.java
index d84e5ca1..401fe410 100644
--- a/model/src/main/java/javasabr/mqtt/model/exception/ConnectionRejectException.java
+++ b/model/src/main/java/javasabr/mqtt/model/exception/ConnectionRejectException.java
@@ -1,5 +1,6 @@
package javasabr.mqtt.model.exception;
+import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
import lombok.AccessLevel;
import lombok.Getter;
@@ -19,4 +20,9 @@ public ConnectionRejectException(Throwable cause, ConnectAckReasonCode reasonCod
super(cause);
this.reasonCode = reasonCode;
}
+
+ @Override
+ public String toString() {
+ return DebugUtils.toJsonString(this);
+ }
}
diff --git a/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java b/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java
index 273428d6..7237cedc 100644
--- a/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java
+++ b/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java
@@ -1,15 +1,16 @@
package javasabr.mqtt.model.session;
+import java.time.Duration;
+import org.jspecify.annotations.Nullable;
+
public interface MqttSession {
String clientId();
int generateMessageId();
-
- /**
- * @return the expiration time in ms or -1 if it should not be expired now.
- */
- long expirationTime();
+
+ @Nullable
+ Duration expiryInterval();
MessageTacker inMessageTracker();
MessageTacker outMessageTracker();
diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java
index d41e86bb..a3174739 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java
@@ -6,31 +6,45 @@
import javasabr.mqtt.base.util.DebugUtils;
import javasabr.mqtt.model.MqttMessageProperty;
import javasabr.mqtt.model.MqttProperties;
+import javasabr.mqtt.model.MqttProtocolErrors;
+import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.MqttVersion;
+import javasabr.mqtt.model.PayloadFormat;
+import javasabr.mqtt.model.QoS;
+import javasabr.mqtt.model.data.type.StringPair;
import javasabr.mqtt.model.exception.ConnectionRejectException;
+import javasabr.mqtt.model.exception.MalformedProtocolMqttException;
import javasabr.mqtt.model.message.MqttMessageType;
import javasabr.mqtt.model.reason.code.ConnectAckReasonCode;
import javasabr.mqtt.network.MqttConnection;
-import javasabr.rlib.common.util.ArrayUtils;
+import javasabr.mqtt.network.util.MqttDataUtils;
+import javasabr.rlib.collections.array.Array;
+import javasabr.rlib.collections.array.MutableArray;
import javasabr.rlib.common.util.NumberUtils;
-import javasabr.rlib.common.util.StringUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
+import org.jspecify.annotations.Nullable;
/**
* Connection request.
*/
@Getter
-@Accessors(fluent = true, chain = false)
+@Accessors
@FieldDefaults(level = AccessLevel.PRIVATE)
public class ConnectMqttInMessage extends MqttInMessage {
+ public static final byte MESSAGE_FLAGS = 0b0000_0000;
private static final byte MESSAGE_TYPE = (byte) MqttMessageType.CONNECT.ordinal();
static {
- DebugUtils.registerIncludedFields("clientId", "keepAlive", "cleanStart", "mqttVersion");
+ DebugUtils.registerIncludedFields(
+ "clientId",
+ "keepAlive",
+ "cleanStart",
+ "mqttVersion",
+ "sessionExpiryInterval");
}
private static final Set AVAILABLE_PROPERTIES = EnumSet.of(
@@ -194,35 +208,67 @@ public class ConnectMqttInMessage extends MqttInMessage {
MqttVersion mqttVersion = MqttVersion.MQTT_3_1_1;
- String clientId = StringUtils.EMPTY;
- String willTopic = StringUtils.EMPTY;
- byte[] willPayload = ArrayUtils.EMPTY_BYTE_ARRAY;
-
- String username = StringUtils.EMPTY;
- byte[] password = ArrayUtils.EMPTY_BYTE_ARRAY;
-
+ @Nullable
+ String clientId;
int keepAlive;
- int willQos;
- boolean willRetain;
boolean cleanStart;
boolean hasUserName;
+ @Nullable
+ String username;
boolean hasPassword;
+ byte @Nullable [] password;
+
boolean willFlag;
+ boolean willRetain;
+ @Nullable
+ QoS willQos;
+ @Nullable
+ String willTopic;
+ byte @Nullable [] willPayload;
// properties
- String authenticationMethod = StringUtils.EMPTY;
- byte[] authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY;
-
- long sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET;
- int receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET;
- int maxPacketSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET;
- int topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET;
- boolean requestResponseInformation = false;
- boolean requestProblemInformation = false;
+ @Nullable
+ String authenticationMethod;
+ byte @Nullable [] authenticationData;
+
+ long sessionExpiryInterval;
+ int receiveMaxPublishes;
+ int maxMessageSize;
+ int topicAliasMaxValue;
+
+ @Nullable
+ Boolean requestResponseInformation;
+ @Nullable
+ Boolean requestProblemInformation;
+
+ // will properties
+ @Nullable
+ PayloadFormat willPayloadFormat;
+
+ @Nullable
+ String willContentType;
+ @Nullable
+ String willResponseTopic;
+
+ byte @Nullable [] willCorrelationData;
+
+ int willDelayInterval;
+ long willMessageExpiryInterval;
+
+ @Nullable
+ MutableArray willUserProperties;
+
+ boolean readingWillProperties;
public ConnectMqttInMessage(byte messageFlags) {
super(messageFlags);
+ sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET;
+ receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET;
+ maxMessageSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET;
+ topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET;
+ willDelayInterval = MqttProperties.WILL_DELAY_INTERVAL_IS_NOT_SET;
+ willMessageExpiryInterval = MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET;
}
@Override
@@ -235,6 +281,11 @@ public MqttMessageType messageType() {
return MqttMessageType.CONNECT;
}
+ @Override
+ protected boolean validMessageFlags(byte messageFlags) {
+ return messageFlags == MESSAGE_FLAGS;
+ }
+
@Override
protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) {
var connectionConfig = connection.serverConnectionConfig();
@@ -248,59 +299,77 @@ protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer)
throw new ConnectionRejectException(ConnectAckReasonCode.UNSUPPORTED_PROTOCOL_VERSION);
}
- int flags = readByteUnsigned(buffer);
+ int connectFlags = readByteUnsigned(buffer);
+
/*
- Used to indicate whether the will message is a retained message.
+ Used to indicate whether the current connection is a new session or
+ a continuation of an existing session, which determines whether the server will directly
+ create a new session or attempt to reuse an existing session.
*/
- willRetain = NumberUtils.isSetBit(flags, 5);
+ cleanStart = (connectFlags & 0b0000_0010) != 0;
/*
- Used to indicate the QoS of the will message.
+ Used to indicate whether the Payload contains relevant fields of the will message.
*/
- willQos = (flags & 0x18) >> 3;
+ willFlag = (connectFlags & 0b0000_0100) != 0;
+
/*
- Used to indicate whether the current connection is a new session or
- a continuation of an existing session, which determines whether the server will directly
- create a new session or attempt to reuse an existing session.
+ Used to indicate the QoS of the will message.
*/
- cleanStart = NumberUtils.isSetBit(flags, 1);
-
+ int qosLevel = (connectFlags & 0x18) >> 3;
+ qosLevel = (connectFlags & 0b0001_1000) >> 3;
+ // If the Will Flag is set to 0, then the Will QoS MUST be set to 0 (0x00)
+ if (willFlag) {
+ willQos = QoS.ofCode(qosLevel);
+ } else if (qosLevel != 0) {
+ throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
+ }
+
+ /*
+ Used to indicate whether the will message is a retained message.
+ */
+ willRetain = (connectFlags & 0b0010_0000) != 0;
+ // If the Will Flag is set to 0, then the Will Retain Flag MUST be set to 0
+ if (!willFlag && willRetain) {
+ throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
+ }
+
// for mqtt 3.1.1+
- if (mqttVersion.ordinal() >= MqttVersion.MQTT_3_1_1.ordinal()) {
- boolean zeroReservedFlag = NumberUtils.isNotSetBit(flags, 0);
- if (!zeroReservedFlag) {
+ if (mqttVersion.isEqualOrHigherThan(MqttVersion.MQTT_3_1_1)) {
+ boolean zeroReservedFlag = (connectFlags & 0b0000_0001) != 0;
+ if (zeroReservedFlag) {
/*
The Server MUST validate that the reserved flag in the CONNECT packet is set to 0 [MQTT-3.1.2-3]. If
- the reserved flag is not 0 it is a Malformed Packet. Refer to section 4.13 for information about
- handling
- errors.
+ the reserved flag is not 0 it is a Malformed Packet.
*/
throw new ConnectionRejectException(ConnectAckReasonCode.MALFORMED_PACKET);
}
}
- hasUserName = NumberUtils.isSetBit(flags, 7);
- hasPassword = NumberUtils.isSetBit(flags, 6);
+ hasUserName = (connectFlags & 0b1000_0000) != 0;
+ hasPassword = (connectFlags & 0b0100_0000) != 0;
// for mqtt < 5 we cannot have password without user
- if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal() && !hasUserName && hasPassword) {
+ if (mqttVersion.isLowerThan(MqttVersion.MQTT_5) && !hasUserName && hasPassword) {
throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
}
- /*
- Used to indicate whether the Payload contains relevant fields of the will message.
- */
- willFlag = NumberUtils.isSetBit(flags, 2);
-
/*
This is a double-byte length unsigned integer used to indicate the maximum
time interval between two adjacent control packets sent by the client.
*/
keepAlive = readShortUnsigned(buffer);
+
+ // before MQTT 5 we can't configure session expiry interval so it's always infinity
+ if (mqttVersion.isLowerThan(MqttVersion.MQTT_5)) {
+ sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY;
+ }
}
@Override
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
- var connectionConfig = connection.serverConnectionConfig();
+ MqttServerConnectionConfig serverConfig = connection.serverConnectionConfig();
+ int maxBinarySize = serverConfig.maxBinarySize();
+ int maxStringLength = serverConfig.maxStringLength();
/*
The ClientID MUST be present and is the first field in the CONNECT packet Payload
The ClientID MUST be a UTF-8 Encoded String as defined in
@@ -319,26 +388,28 @@ protected void readPayload(MqttConnection connection, ByteBuffer buffer) {
If the Server rejects the ClientID it MAY respond to the CONNECT packet with a CONNACK using
Reason Code 0x85 (Client Identifier not valid) and then it MUST close the Network Connection
*/
- clientId = readString(buffer, Integer.MAX_VALUE);
-
- if (willFlag && mqttVersion.ordinal() >= MqttVersion.MQTT_5.ordinal()) {
- readProperties(connection, buffer, WILL_PROPERTIES);
- }
-
- if (willFlag) {
- willTopic = readString(buffer, Integer.MAX_VALUE);
+ clientId = readString(buffer, maxStringLength);
+
+ if (willFlag && mqttVersion.include(MqttVersion.MQTT_5)) {
+ readingWillProperties = true;
+ try {
+ readProperties(connection, buffer, WILL_PROPERTIES);
+ } finally {
+ readingWillProperties = false;
+ }
}
if (willFlag) {
- willPayload = readBytes(buffer, connectionConfig.maxBinarySize());
+ willTopic = readString(buffer, maxStringLength);
+ willPayload = readBytes(buffer, maxBinarySize);
}
-
+
if (hasUserName) {
- username = readString(buffer, Integer.MAX_VALUE);
+ username = readString(buffer, maxStringLength);
}
if (hasPassword) {
- password = readBytes(buffer, connectionConfig.maxBinarySize());
+ password = readBytes(buffer, maxBinarySize);
}
}
@@ -352,43 +423,208 @@ protected Set availableProperties() {
return AVAILABLE_PROPERTIES;
}
+ public long sessionExpiryInterval() {
+ return sessionExpiryInterval == MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET
+ ? MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
+ : sessionExpiryInterval;
+ }
+
+ public Array willUserProperties() {
+ return willUserProperties == null ? EMPTY_USER_PROPERTIES : willUserProperties;
+ }
+
+ public boolean requestResponseInformation() {
+ return requestResponseInformation == null || requestResponseInformation;
+ }
+
+ public boolean requestProblemInformation() {
+ return requestProblemInformation == null || requestProblemInformation;
+ }
+
+ @Override
+ protected void readProperties(MqttConnection connection, ByteBuffer buffer) {
+ MqttServerConnectionConfig serverConfig = connection.serverConnectionConfig();
+ int maxBinarySize = serverConfig.maxBinarySize();
+ int maxStringLength = serverConfig.maxStringLength();
+ // for this message client connection config is not available
+ super.readProperties(buffer, availableProperties(), maxStringLength, maxBinarySize);
+ }
+
@Override
protected void applyProperty(MqttMessageProperty property, byte[] value) {
+ if (readingWillProperties) {
+ applyWillProperty(property, value);
+ return;
+ }
switch (property) {
- case AUTHENTICATION_DATA -> authenticationData = value;
+ case AUTHENTICATION_DATA -> {
+ if (authenticationData != null) {
+ alreadyPresentedProperty(property);
+ }
+ authenticationData = value;
+ }
default -> unsupportedProperty(property);
}
}
@Override
protected void applyProperty(MqttMessageProperty property, String value) {
+ if (readingWillProperties) {
+ applyWillProperty(property, value);
+ return;
+ }
switch (property) {
- case AUTHENTICATION_METHOD -> authenticationMethod = value;
+ case AUTHENTICATION_METHOD -> {
+ if (authenticationMethod != null) {
+ alreadyPresentedProperty(property);
+ }
+ authenticationMethod = value;
+ }
default -> unsupportedProperty(property);
}
}
@Override
protected void applyProperty(MqttMessageProperty property, long value) {
+ if (readingWillProperties) {
+ applyWillProperty(property, value);
+ return;
+ }
+ switch (property) {
+ case REQUEST_RESPONSE_INFORMATION -> {
+ if (requestResponseInformation != null) {
+ alreadyPresentedProperty(property);
+ } else if (!MqttDataUtils.isValidBoolean(value)) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_REQUEST_RESPONSE_INFORMATION);
+ }
+ requestResponseInformation = NumberUtils.toBoolean(value);
+ }
+ case REQUEST_PROBLEM_INFORMATION -> {
+ if (requestProblemInformation != null) {
+ alreadyPresentedProperty(property);
+ } else if (!MqttDataUtils.isValidBoolean(value)) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_REQUEST_PROBLEM_INFORMATION);
+ }
+ requestProblemInformation = NumberUtils.toBoolean(value);
+ }
+ case RECEIVE_MAXIMUM_PUBLISHES -> {
+ if (receiveMaxPublishes != MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MIN
+ || value > MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MAX) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES);
+ }
+ receiveMaxPublishes = (int) value;
+ }
+ case TOPIC_ALIAS_MAXIMUM -> {
+ if (topicAliasMaxValue != MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.TOPIC_ALIAS_MAXIMUM_MIN || value > MqttProperties.TOPIC_ALIAS_MAXIMUM_MAX) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_TOPIC_ALIAS_MAX);
+ }
+ topicAliasMaxValue = (int) value;
+ }
+ case SESSION_EXPIRY_INTERVAL -> {
+ if (sessionExpiryInterval != MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.SESSION_EXPIRY_INTERVAL_MIN
+ || value > MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_SESSION_EXPIRY_INTERVAL);
+ }
+ sessionExpiryInterval = value;
+ }
+ case MAXIMUM_MESSAGE_SIZE -> {
+ if (maxMessageSize != MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN || value > MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE);
+ }
+ maxMessageSize = (int) value;
+ }
+ default -> unsupportedProperty(property);
+ }
+ }
+
+ @Override
+ protected void applyProperty(MqttMessageProperty property, StringPair value) {
+ if (readingWillProperties) {
+ applyWillProperty(property, value);
+ return;
+ }
+ super.applyProperty(property, value);
+ }
+
+ protected void applyWillProperty(MqttMessageProperty property, StringPair value) {
switch (property) {
- case REQUEST_RESPONSE_INFORMATION -> requestResponseInformation = NumberUtils.toBoolean(value);
- case REQUEST_PROBLEM_INFORMATION -> requestProblemInformation = NumberUtils.toBoolean(value);
- case RECEIVE_MAXIMUM_PUBLISHES -> receiveMaxPublishes = NumberUtils.validate(
- (int) value,
- MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MIN,
- MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MAX);
- case TOPIC_ALIAS_MAXIMUM -> topicAliasMaxValue = NumberUtils.validate(
- (int) value,
- MqttProperties.TOPIC_ALIAS_MIN,
- MqttProperties.TOPIC_ALIAS_MAX);
- case SESSION_EXPIRY_INTERVAL -> sessionExpiryInterval = NumberUtils.validate(
- value,
- MqttProperties.SESSION_EXPIRY_INTERVAL_MIN,
- MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY);
- case MAXIMUM_MESSAGE_SIZE -> maxPacketSize = NumberUtils.validate(
- (int) value,
- MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN,
- MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX);
+ case USER_PROPERTY: {
+ if (willUserProperties == null) {
+ willUserProperties = MutableArray.ofType(StringPair.class);
+ }
+ willUserProperties.add(value);
+ break;
+ }
+ }
+ }
+
+ private void applyWillProperty(MqttMessageProperty property, String value) {
+ switch (property) {
+ case CONTENT_TYPE -> {
+ if (willContentType != null) {
+ alreadyPresentedProperty(property);
+ }
+ willContentType = value;
+ }
+ case RESPONSE_TOPIC -> {
+ if (willResponseTopic != null) {
+ alreadyPresentedProperty(property);
+ }
+ willResponseTopic = value;
+ }
+ default -> unsupportedProperty(property);
+ }
+ }
+
+ private void applyWillProperty(MqttMessageProperty property, byte[] value) {
+ switch (property) {
+ case CORRELATION_DATA -> {
+ if (willCorrelationData != null) {
+ alreadyPresentedProperty(property);
+ }
+ willCorrelationData = value;
+ }
+ default -> unsupportedProperty(property);
+ }
+ }
+
+ private void applyWillProperty(MqttMessageProperty property, long value) {
+ switch (property) {
+ case WILL_DELAY_INTERVAL -> {
+ if (willDelayInterval != MqttProperties.WILL_DELAY_INTERVAL_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.WILL_DELAY_INTERVAL_MIN || value > MqttProperties.WILL_DELAY_INTERVAL_MAX) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_WILL_DELAY_INTERVAL);
+ }
+ willDelayInterval = (int) value;
+ }
+ case PAYLOAD_FORMAT_INDICATOR -> {
+ if (willPayloadFormat != null) {
+ alreadyPresentedProperty(property);
+ }
+ PayloadFormat payloadFormat = PayloadFormat.fromCode(value);
+ if (payloadFormat == PayloadFormat.UNDEFINED) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT);
+ }
+ willPayloadFormat = payloadFormat;
+ }
+ case MESSAGE_EXPIRY_INTERVAL -> {
+ if (willMessageExpiryInterval != MqttProperties.WILL_DELAY_INTERVAL_IS_NOT_SET) {
+ alreadyPresentedProperty(property);
+ } else if (value < MqttProperties.MESSAGE_EXPIRY_INTERVAL_MIN
+ || value > MqttProperties.MESSAGE_EXPIRY_INTERVAL_MAX) {
+ throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL);
+ }
+ willMessageExpiryInterval = (int) value;
+ }
default -> unsupportedProperty(property);
}
}
diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java
index de7225bd..4d28ecbc 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java
@@ -75,7 +75,7 @@ public DisconnectMqttInMessage(byte messageFlags) {
this.reasonCode = DisconnectReasonCode.NORMAL_DISCONNECTION;
this.reason = StringUtils.EMPTY;
this.serverReference = StringUtils.EMPTY;
- this.sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT;
+ this.sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED;
}
@Override
@@ -92,7 +92,7 @@ public MqttMessageType messageType() {
protected void readImpl(MqttConnection connection, ByteBuffer buffer) {
this.sessionExpiryInterval = connection
.clientConnectionConfig()
- .sessionExpiryInterval();
+ .sessionExpiryIntervalInSecs();
super.readImpl(connection, buffer);
}
diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java
index ecb16f8d..d385706c 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java
@@ -128,7 +128,21 @@ protected void readProperties(MqttConnection connection, ByteBuffer buffer) {
protected void readPayload(MqttConnection connection, ByteBuffer buffer) {}
- protected void readProperties(MqttConnection connection, ByteBuffer buffer, Set availableProperties) {
+ protected void readProperties(
+ MqttConnection connection,
+ ByteBuffer buffer,
+ Set availableProperties) {
+ MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
+ int maxStringLength = connectionConfig.maxStringLength();
+ int maxBinarySize = connectionConfig.maxBinarySize();
+ readProperties(buffer, availableProperties, maxStringLength, maxBinarySize);
+ }
+
+ protected void readProperties(
+ ByteBuffer buffer,
+ Set availableProperties,
+ int maxStringLength,
+ int maxBinarySize) {
int propertiesLength = MqttDataUtils.readMbi(buffer);
if (propertiesLength == MqttDataUtils.UNKNOWN_LENGTH) {
@@ -138,8 +152,6 @@ protected void readProperties(MqttConnection connection, ByteBuffer buffer, Set<
}
int lastPositionInBuffer = buffer.position() + propertiesLength;
- MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig();
-
while (buffer.position() < lastPositionInBuffer) {
MqttMessageProperty property = MqttMessageProperty.byId(readByteUnsigned(buffer));
if (!availableProperties.contains(property)) {
@@ -164,17 +176,17 @@ protected void readProperties(MqttConnection connection, ByteBuffer buffer, Set<
break;
}
case UTF_8_STRING: {
- applyProperty(property, readString(buffer, connectionConfig.maxStringLength()));
+ applyProperty(property, readString(buffer, maxStringLength));
break;
}
case UTF_8_STRING_PAIR: {
- String name = readString(buffer, connectionConfig.maxStringLength());
- String value = readString(buffer, connectionConfig.maxStringLength());
+ String name = readString(buffer, maxStringLength);
+ String value = readString(buffer, maxStringLength);
applyProperty(property, new StringPair(name, value));
break;
}
case BINARY: {
- applyProperty(property, readBytes(buffer, connectionConfig.maxBinarySize()));
+ applyProperty(property, readBytes(buffer, maxBinarySize));
break;
}
default: {
@@ -196,12 +208,13 @@ protected void applyProperty(MqttMessageProperty property, byte[] value) {}
protected void applyProperty(MqttMessageProperty property, StringPair value) {
switch (property) {
- case USER_PROPERTY:
+ case USER_PROPERTY: {
if (userProperties == null) {
userProperties = MutableArray.ofType(StringPair.class);
}
userProperties.add(value);
break;
+ }
}
}
@@ -270,7 +283,7 @@ protected void alreadyPresentedProperty(MqttMessageProperty property) {
throw new MalformedProtocolMqttException(
"Property:[%s] is already presented in message:[%s]".formatted(property, name()));
}
-
+
@Override
public String name() {
return messageType().name();
diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java
index f53180a7..5cca9c81 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java
@@ -1,6 +1,7 @@
package javasabr.mqtt.network.message.out;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.EnumSet;
import java.util.Set;
import javasabr.mqtt.base.util.DebugUtils;
@@ -315,7 +316,7 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) {
writeProperty(
buffer,
MqttMessageProperty.SESSION_EXPIRY_INTERVAL,
- connectionConfig.sessionExpiryInterval(),
+ connectionConfig.sessionExpiryIntervalInSecs(),
requestedSessionExpiryInterval);
if (requestedClientId != null) {
writeProperty(buffer, MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER, clientId, requestedClientId);
diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java
index aece8567..721b681b 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java
@@ -204,7 +204,7 @@ public ConnectMqtt5OutMessage(String clientId, int keepAlive) {
keepAlive,
false,
false,
- Array.empty(StringPair.class),
+ EMPTY_USER_PROPERTIES,
StringUtils.EMPTY,
ArrayUtils.EMPTY_BYTE_ARRAY,
MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET,
@@ -215,6 +215,28 @@ public ConnectMqtt5OutMessage(String clientId, int keepAlive) {
false);
}
+ public ConnectMqtt5OutMessage(String clientId, int keepAlive, long sessionExpiryInterval) {
+ this(
+ StringUtils.EMPTY,
+ StringUtils.EMPTY,
+ clientId,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ QoS.AT_MOST_ONCE,
+ keepAlive,
+ false,
+ false,
+ EMPTY_USER_PROPERTIES,
+ StringUtils.EMPTY,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ sessionExpiryInterval,
+ MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET,
+ MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET,
+ MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET,
+ false,
+ false);
+ }
+
public ConnectMqtt5OutMessage(
String username,
String willTopic,
diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java
index 2390bd48..aeab1f7b 100644
--- a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java
+++ b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java
@@ -89,7 +89,7 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) {
buffer,
MqttMessageProperty.SESSION_EXPIRY_INTERVAL,
sessionExpiryInterval,
- MqttProperties.SESSION_EXPIRY_INTERVAL_DEFAULT);
+ MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED);
}
}
}
diff --git a/network/src/main/java/javasabr/mqtt/network/session/ConfigurableNetworkMqttSession.java b/network/src/main/java/javasabr/mqtt/network/session/ConfigurableNetworkMqttSession.java
index dab637a8..7de455ca 100644
--- a/network/src/main/java/javasabr/mqtt/network/session/ConfigurableNetworkMqttSession.java
+++ b/network/src/main/java/javasabr/mqtt/network/session/ConfigurableNetworkMqttSession.java
@@ -1,6 +1,8 @@
package javasabr.mqtt.network.session;
+import java.time.Duration;
+
public interface ConfigurableNetworkMqttSession extends NetworkMqttSession {
- void expirationTime(long expirationTime);
+ void expiryInterval(Duration expiryInterval);
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy
index 302558ab..3c088969 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy
@@ -11,8 +11,8 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
def "should read message correctly as MQTT 5.0"() {
given:
def propertiesBuffer = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
}
@@ -27,16 +27,16 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
then:
result
inMessage.reasonCode() == AuthenticateReasonCode.SUCCESS
- inMessage.authenticationMethod() == authMethod
- inMessage.authenticationData() == authData
+ inMessage.authenticationMethod() == testAuthMethod
+ inMessage.authenticationData() == testAuthData
inMessage.reason() == reasonString
inMessage.userProperties() == testUserProperties
when:
def propertiesBuffer2 = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
}
def dataBuffer2 = BufferUtils.prepareBuffer(512) {
it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION)
@@ -48,14 +48,14 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
then:
result2
inMessage2.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION
- inMessage2.authenticationMethod() == authMethod
- inMessage2.authenticationData() == authData
+ inMessage2.authenticationMethod() == testAuthMethod
+ inMessage2.authenticationData() == testAuthData
inMessage2.reason() == reasonString
inMessage2.userProperties() == testUserProperties
when:
def propertiesBuffer3 = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
}
def dataBuffer3 = BufferUtils.prepareBuffer(512) {
it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION)
@@ -67,8 +67,8 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest {
then:
result3
inMessage3.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION
- inMessage3.authenticationMethod() == authMethod
- inMessage3.authenticationData() == authData
+ inMessage3.authenticationMethod() == testAuthMethod
+ inMessage3.authenticationData() == testAuthData
inMessage3.reason() == null
inMessage3.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
when:
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy
index 854c312d..c9adf804 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy
@@ -50,19 +50,19 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest {
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
it.putProperty(MqttMessageProperty.SERVER_REFERENCE, serverReference)
it.putProperty(MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER, mqtt311ClientId)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
- it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxMessageSize)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
+ it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, testMaxMessageSize)
it.putProperty(MqttMessageProperty.MAXIMUM_QOS, QoS.AT_LEAST_ONCE.ordinal())
- it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMaxPublishes)
+ it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, testReceiveMaxPublishes)
it.putProperty(MqttMessageProperty.RETAIN_AVAILABLE, retainAvailable)
it.putProperty(MqttMessageProperty.RESPONSE_INFORMATION, responseInformation)
it.putProperty(MqttMessageProperty.SERVER_KEEP_ALIVE, serverKeepAlive)
- it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)
+ it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, testSessionExpiryInterval)
it.putProperty(MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE, sharedSubscriptionAvailable)
it.putProperty(MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE, wildcardSubscriptionAvailable)
it.putProperty(MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE, subscriptionIdAvailable)
- it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, topicAliasMaxValue)
+ it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, testTopicAliasMaxValue)
}
def dataBuffer = BufferUtils.prepareBuffer(512) {
it.putBoolean(sessionPresent)
@@ -80,14 +80,14 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest {
inMessage.serverReference() == serverReference
inMessage.reason() == reasonString
inMessage.assignedClientId() == mqtt311ClientId
- inMessage.authenticationData() == authData
- inMessage.authenticationMethod() == authMethod
- inMessage.maxMessageSize() == maxMessageSize
+ inMessage.authenticationData() == testAuthData
+ inMessage.authenticationMethod() == testAuthMethod
+ inMessage.maxMessageSize() == testMaxMessageSize
inMessage.maxQos() == QoS.AT_LEAST_ONCE
- inMessage.receiveMaxPublishes() == receiveMaxPublishes
+ inMessage.receiveMaxPublishes() == testReceiveMaxPublishes
inMessage.responseInformation() == responseInformation
inMessage.serverKeepAlive() == serverKeepAlive
- inMessage.sessionExpiryInterval() == sessionExpiryInterval
+ inMessage.sessionExpiryInterval() == testSessionExpiryInterval
NumberUtils.toBoolean(inMessage.sharedSubscriptionAvailable()) == sharedSubscriptionAvailable
NumberUtils.toBoolean(inMessage.wildcardSubscriptionAvailable()) == wildcardSubscriptionAvailable
NumberUtils.toBoolean(inMessage.subscriptionIdAvailable()) == subscriptionIdAvailable
@@ -135,22 +135,22 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest {
inMessage.exception().message == "Property:[$property] is already presented in message:[$MqttMessageType.CONNECT_ACK]"
where:
property | value
- MqttMessageProperty.AUTHENTICATION_DATA | authData
+ MqttMessageProperty.AUTHENTICATION_DATA | testAuthData
MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER | mqtt5ClientId
MqttMessageProperty.REASON_STRING | reasonString
MqttMessageProperty.RESPONSE_INFORMATION | responseInformation
- MqttMessageProperty.AUTHENTICATION_METHOD | authMethod
+ MqttMessageProperty.AUTHENTICATION_METHOD | testAuthMethod
MqttMessageProperty.SERVER_REFERENCE | serverReference
MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE | wildcardSubscriptionAvailable
MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE | sharedSubscriptionAvailable
MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE | subscriptionIdAvailable
MqttMessageProperty.RETAIN_AVAILABLE | retainAvailable
- MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | receiveMaxPublishes
+ MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | testReceiveMaxPublishes
MqttMessageProperty.MAXIMUM_QOS | QoS.EXACTLY_ONCE.level()
MqttMessageProperty.SERVER_KEEP_ALIVE | serverKeepAlive
- MqttMessageProperty.TOPIC_ALIAS_MAXIMUM | topicAliasMaxValue
- MqttMessageProperty.SESSION_EXPIRY_INTERVAL | sessionExpiryInterval
- MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | maxMessageSize
+ MqttMessageProperty.TOPIC_ALIAS_MAXIMUM | testTopicAliasMaxValue
+ MqttMessageProperty.SESSION_EXPIRY_INTERVAL | testSessionExpiryInterval
+ MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | testMaxMessageSize
}
def "should validate invalid properties in message"(MqttMessageProperty property, Object value, String expectedMessage) {
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy
index a73d4d66..9f826b8b 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy
@@ -1,102 +1,301 @@
package javasabr.mqtt.network.message.in
import javasabr.mqtt.model.MqttMessageProperty
+import javasabr.mqtt.model.MqttProperties
+import javasabr.mqtt.model.MqttProtocolErrors
import javasabr.mqtt.model.MqttVersion
+import javasabr.mqtt.model.QoS
+import javasabr.mqtt.model.exception.ConnectionRejectException
import javasabr.mqtt.model.exception.MalformedProtocolMqttException
-import javasabr.rlib.common.util.ArrayUtils
+import javasabr.mqtt.model.message.MqttMessageType
+import javasabr.mqtt.model.reason.code.ConnectAckReasonCode
import javasabr.rlib.common.util.BufferUtils
class ConnectMqttInMessageTest extends BaseMqttInMessageTest {
- def "should read packet correctly as mqtt 3.1.1"() {
+ def "should read message correctly as MQTT 3.1.1"() {
given:
def dataBuffer = BufferUtils.prepareBuffer(512) {
it.putString("MQTT")
it.put(4 as byte)
it.put(0b11000010 as byte)
- it.putShort(keepAlive as short)
+ it.putShort(testKeepAlive as short)
it.putString(mqtt311ClientId)
- it.putString(userName)
- it.putBytes(userPassword)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
}
when:
- def packet = new ConnectMqttInMessage(0b0001_0000 as byte)
- def result = packet.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
then:
result
- packet.clientId() == mqtt311ClientId
- packet.mqttVersion() == MqttVersion.MQTT_3_1_1
- packet.password() == userPassword
- packet.username() == userName
- packet.willTopic() == ""
- packet.willQos() == 0
- packet.willPayload() == ArrayUtils.EMPTY_BYTE_ARRAY
+ with(inMessage) {
+ clientId() == mqtt311ClientId
+ mqttVersion() == MqttVersion.MQTT_3_1_1
+ password() == testUserPassword
+ username() == testUserName
+ cleanStart()
+ willTopic() == null
+ willQos() == null
+ willPayload() == null
+ sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY
+ }
+ }
+
+ def "should read message correctly with Will Topic as MQTT 3.1.1"() {
+ given:
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(4 as byte)
+ it.put(0b11110100 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putString(mqtt311ClientId)
+ it.putString(testWillTopic.rawTopic())
+ it.putBytes(testWillPayload)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
+ then:
+ result
+ with(inMessage) {
+ clientId() == mqtt311ClientId
+ mqttVersion() == MqttVersion.MQTT_3_1_1
+ password() == testUserPassword
+ username() == testUserName
+ !cleanStart()
+ willTopic() == testWillTopic.rawTopic()
+ willQos() == QoS.EXACTLY_ONCE
+ willRetain()
+ willPayload() == testWillPayload
+ sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY
+ }
}
- def "should read packet correctly as mqtt 5.0"() {
+ def "should read message correctly as MQTT 5.0"() {
given:
def propertiesBuffer = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)
- it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMaxPublishes)
- it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxMessageSize)
- it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, topicAliasMaxValue)
+ it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, testSessionExpiryInterval)
+ it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, testReceiveMaxPublishes)
+ it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, testMaxMessageSize)
+ it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, testTopicAliasMaxValue)
it.putProperty(MqttMessageProperty.REQUEST_RESPONSE_INFORMATION, requestResponseInformation ? 1 : 0)
it.putProperty(MqttMessageProperty.REQUEST_PROBLEM_INFORMATION, requestProblemInformation ? 1 : 0)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod)
- it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
}
def dataBuffer = BufferUtils.prepareBuffer(512) {
it.putString("MQTT")
it.put(5 as byte)
it.put(0b11000010 as byte)
- it.putShort(keepAlive as short)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(propertiesBuffer.limit())
+ it.put(propertiesBuffer)
+ it.putString(mqtt5ClientId)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ result
+ with(inMessage) {
+ keepAlive() == testKeepAlive
+ authenticationMethod() == testAuthMethod
+ authenticationData() == testAuthData
+ clientId() == mqtt5ClientId
+ mqttVersion() == MqttVersion.MQTT_5
+ maxMessageSize() == testMaxMessageSize
+ password() == testUserPassword
+ username() == testUserName
+ topicAliasMaxValue() == testTopicAliasMaxValue
+ sessionExpiryInterval() == testSessionExpiryInterval
+ receiveMaxPublishes() == testReceiveMaxPublishes
+ willTopic() == null
+ willQos() == null
+ willPayload() == null
+ userProperties() == testUserProperties
+ }
+ }
+
+ def "should read message correctly with Will Topic as MQTT 5.0"() {
+ given:
+ def propertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, testSessionExpiryInterval)
+ it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, testReceiveMaxPublishes)
+ it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, testMaxMessageSize)
+ it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, testTopicAliasMaxValue)
+ it.putProperty(MqttMessageProperty.REQUEST_RESPONSE_INFORMATION, requestResponseInformation ? 1 : 0)
+ it.putProperty(MqttMessageProperty.REQUEST_PROBLEM_INFORMATION, requestProblemInformation ? 1 : 0)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, testAuthMethod)
+ it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, testAuthData)
+ it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
+ }
+ def willPropertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(MqttMessageProperty.WILL_DELAY_INTERVAL, testWillDelayInterval)
+ it.putProperty(MqttMessageProperty.PAYLOAD_FORMAT_INDICATOR, testWillPayloadFormat)
+ it.putProperty(MqttMessageProperty.MESSAGE_EXPIRY_INTERVAL, testWillMessageExpiryInterval)
+ it.putProperty(MqttMessageProperty.CONTENT_TYPE, testWillContentType)
+ it.putProperty(MqttMessageProperty.RESPONSE_TOPIC, testWillResponseTopic.rawTopic())
+ it.putProperty(MqttMessageProperty.CORRELATION_DATA, testWillCorrelationData)
+ it.putProperty(MqttMessageProperty.USER_PROPERTY, testWillUserProperties)
+ }
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b11110100 as byte)
+ it.putShort(testKeepAlive as short)
it.putMbi(propertiesBuffer.limit())
it.put(propertiesBuffer)
+ it.putString(mqtt5ClientId)
+ it.putMbi(willPropertiesBuffer.limit())
+ it.put(willPropertiesBuffer)
+ it.putString(testWillTopic.rawTopic())
+ it.putBytes(testWillPayload)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
+ then:
+ result
+ with(inMessage) {
+ clientId() == mqtt5ClientId
+ mqttVersion() == MqttVersion.MQTT_5
+ password() == testUserPassword
+ username() == testUserName
+ !cleanStart()
+ willTopic() == testWillTopic.rawTopic()
+ willQos() == QoS.EXACTLY_ONCE
+ willRetain()
+ willPayload() == testWillPayload
+ sessionExpiryInterval() == testSessionExpiryInterval
+ willContentType() == testWillContentType
+ willCorrelationData() == testWillCorrelationData
+ willResponseTopic() == testWillResponseTopic.rawTopic()
+ willDelayInterval() == testWillDelayInterval
+ willPayload() == testWillPayload
+ willMessageExpiryInterval() == testWillMessageExpiryInterval
+ willDelayInterval() == testWillDelayInterval
+ willUserProperties() == testWillUserProperties
+ }
+ }
+
+ def "should correctly validate header"(
+ String protocolName,
+ byte protocolVersion,
+ byte headerVariables,
+ ConnectAckReasonCode expectedError) {
+ given:
+ def testClientId = "someClientId"
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString(protocolName)
+ it.put(protocolVersion)
+ it.put(headerVariables)
+ it.putShort(testKeepAlive as short)
+ it.putString(testClientId)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
+ then:
+ !result
+ with(inMessage.exception() as ConnectionRejectException) {
+ reasonCode() == expectedError
+ }
+ where:
+ protocolName | protocolVersion | headerVariables | expectedError
+ // invalid protocols
+ "Invalid" | 4 | 0b11000010 | ConnectAckReasonCode.UNSUPPORTED_PROTOCOL_VERSION
+ "MQTT" | 2 | 0b11000010 | ConnectAckReasonCode.UNSUPPORTED_PROTOCOL_VERSION
+ "MQTT" | 9 | 0b11000010 | ConnectAckReasonCode.UNSUPPORTED_PROTOCOL_VERSION
+ // not zero reserved bit
+ "MQTT" | 4 | 0b11000011 | ConnectAckReasonCode.MALFORMED_PACKET
+ // password is presented without username for MQTT 3.1.1
+ "MQTT" | 4 | 0b01000010 | ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD
+ // invalid will QoS
+ "MQTT" | 4 | 0b11011000 | ConnectAckReasonCode.MALFORMED_PACKET
+ // not zero will QoS without will flag
+ "MQTT" | 4 | 0b11101000 | ConnectAckReasonCode.MALFORMED_PACKET
+ "MQTT" | 4 | 0b11110000 | ConnectAckReasonCode.MALFORMED_PACKET
+ // not zero will retain without will flag
+ "MQTT" | 4 | 0b11100000 | ConnectAckReasonCode.MALFORMED_PACKET
+ }
+
+ def "should return infinity session expiry interval for MQTT 3.1.1"() {
+ given:
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(4 as byte)
+ it.put(0b00000010 as byte)
+ it.putShort(testKeepAlive as short)
it.putString(mqtt311ClientId)
- it.putString(userName)
- it.putBytes(userPassword)
}
when:
- def packet = new ConnectMqttInMessage(0b0001_0000 as byte)
- def result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ result
+ with(inMessage) {
+ keepAlive() == testKeepAlive
+ clientId() == mqtt311ClientId
+ mqttVersion() == MqttVersion.MQTT_3_1_1
+ sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY
+ }
+ }
+
+ def "should return disabled session expiry interval when it's not set in MQTT 5.0"() {
+ given:
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b11000010 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(0)
+ it.putString(mqtt5ClientId)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
then:
result
- packet.keepAlive() == keepAlive
- packet.authenticationMethod() == authMethod
- packet.authenticationData() == authData
- packet.clientId() == mqtt311ClientId
- packet.mqttVersion() == MqttVersion.MQTT_5
- packet.maxPacketSize() == maxMessageSize
- packet.password() == userPassword
- packet.username() == userName
- packet.topicAliasMaxValue() == topicAliasMaxValue
- packet.sessionExpiryInterval() == sessionExpiryInterval
- packet.receiveMaxPublishes() == receiveMaxPublishes
- packet.willTopic() == ""
- packet.willQos() == 0
- packet.willPayload() == ArrayUtils.EMPTY_BYTE_ARRAY
- packet.userProperties() == testUserProperties
+ with(inMessage) {
+ keepAlive() == testKeepAlive
+ clientId() == mqtt5ClientId
+ mqttVersion() == MqttVersion.MQTT_5
+ password() == testUserPassword
+ username() == testUserName
+ sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED
+ }
}
- def "should not read packet correctly with invalid UTF8 strings"(byte[] stringBytes) {
+ def "should not read message correctly with invalid UTF8 strings"(byte[] stringBytes) {
given:
def dataBuffer = BufferUtils.prepareBuffer(512) {
it.putString("MQTT")
it.put(5 as byte)
it.put(0b11000010 as byte)
- it.putShort(keepAlive as short)
+ it.putShort(testKeepAlive as short)
it.putMbi(0)
it.putBytes(stringBytes)
- it.putString(userName)
- it.putBytes(userPassword)
+ it.putString(testUserName)
+ it.putBytes(testUserPassword)
}
when:
- def packet = new ConnectMqttInMessage(0b0001_0000 as byte)
+ def packet = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
def result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
then:
!result
- packet.exception() instanceof MalformedProtocolMqttException
+ MalformedProtocolMqttException.isCase(packet.exception())
where:
stringBytes << [
// https://www.cl.cam.ac.uk/~mgk25/ucs/examples/UTF-8-test.txt
@@ -106,4 +305,140 @@ class ConnectMqttInMessageTest extends BaseMqttInMessageTest {
[0xED, 0xAF, 0xBF, 0xED, 0xBF, 0xBF] as byte[],
]
}
+
+ def "should not allow duplicated properties in message"(MqttMessageProperty property, Object value) {
+ given:
+ def propertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(property, value)
+ it.putProperty(property, value)
+ }
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b00000010 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(propertiesBuffer.limit())
+ it.put(propertiesBuffer)
+ it.putString(mqtt5ClientId)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ !result
+ with(inMessage) {
+ exception() instanceof MalformedProtocolMqttException
+ exception().message == "Property:[$property] is already presented in message:[$MqttMessageType.CONNECT]"
+ }
+ where:
+ property | value
+ MqttMessageProperty.AUTHENTICATION_METHOD | testAuthMethod
+ MqttMessageProperty.AUTHENTICATION_DATA | testAuthData
+ MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | testReceiveMaxPublishes
+ MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | testMaxMessageSize
+ MqttMessageProperty.TOPIC_ALIAS_MAXIMUM | testTopicAliasMaxValue
+ MqttMessageProperty.REQUEST_RESPONSE_INFORMATION | false
+ MqttMessageProperty.REQUEST_PROBLEM_INFORMATION | false
+ }
+
+ def "should not allow duplicated will properties in message"(MqttMessageProperty property, Object value) {
+ given:
+ def willPropertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(property, value)
+ it.putProperty(property, value)
+ }
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b0011_0100 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(0)
+ it.putString(mqtt5ClientId)
+ it.putMbi(willPropertiesBuffer.limit())
+ it.put(willPropertiesBuffer)
+ it.putString(testWillTopic.rawTopic())
+ it.putBytes(testWillPayload)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ !result
+ with(inMessage) {
+ exception() instanceof MalformedProtocolMqttException
+ exception().message == "Property:[$property] is already presented in message:[$MqttMessageType.CONNECT]"
+ }
+ where:
+ property | value
+ MqttMessageProperty.WILL_DELAY_INTERVAL | testWillDelayInterval
+ MqttMessageProperty.PAYLOAD_FORMAT_INDICATOR | testWillPayloadFormat
+ MqttMessageProperty.MESSAGE_EXPIRY_INTERVAL | testWillMessageExpiryInterval
+ MqttMessageProperty.CONTENT_TYPE | testWillContentType
+ MqttMessageProperty.RESPONSE_TOPIC | testWillResponseTopic.rawTopic()
+ MqttMessageProperty.CORRELATION_DATA | testWillCorrelationData
+ }
+
+ def "should validate properties in message"(MqttMessageProperty property, Object value, String expectedError) {
+ given:
+ def propertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(property, value)
+ }
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b00000010 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(propertiesBuffer.limit())
+ it.put(propertiesBuffer)
+ it.putString(mqtt5ClientId)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ !result
+ with(inMessage) {
+ exception() instanceof MalformedProtocolMqttException
+ exception().message == expectedError
+ }
+ where:
+ property | value | expectedError
+ MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | 0 | MqttProtocolErrors.PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES
+ MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | 0 | MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE
+ MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | -1 | MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE
+ MqttMessageProperty.REQUEST_RESPONSE_INFORMATION | (3 as byte) | MqttProtocolErrors.PROVIDED_INVALID_REQUEST_RESPONSE_INFORMATION
+ MqttMessageProperty.REQUEST_PROBLEM_INFORMATION | (3 as byte) | MqttProtocolErrors.PROVIDED_INVALID_REQUEST_PROBLEM_INFORMATION
+ }
+
+ def "should validate will properties in message"(MqttMessageProperty property, Object value, String expectedError) {
+ given:
+ def willPropertiesBuffer = BufferUtils.prepareBuffer(512) {
+ it.putProperty(property, value)
+ }
+ def dataBuffer = BufferUtils.prepareBuffer(512) {
+ it.putString("MQTT")
+ it.put(5 as byte)
+ it.put(0b0011_0100 as byte)
+ it.putShort(testKeepAlive as short)
+ it.putMbi(0)
+ it.putString(mqtt5ClientId)
+ it.putMbi(willPropertiesBuffer.limit())
+ it.put(willPropertiesBuffer)
+ it.putString(testWillTopic.rawTopic())
+ it.putBytes(testWillPayload)
+ }
+ when:
+ def inMessage = new ConnectMqttInMessage(ConnectMqttInMessage.MESSAGE_FLAGS)
+ def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
+ then:
+ !result
+ with(inMessage) {
+ exception() instanceof MalformedProtocolMqttException
+ exception().message == expectedError
+ }
+ where:
+ property | value | expectedError
+ MqttMessageProperty.WILL_DELAY_INTERVAL | -1 | MqttProtocolErrors.PROVIDED_INVALID_WILL_DELAY_INTERVAL
+ MqttMessageProperty.PAYLOAD_FORMAT_INDICATOR | 5 | MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT
+ }
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/DisconnectMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/DisconnectMqttInMessageTest.groovy
index 362fb29a..a2a24d59 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/in/DisconnectMqttInMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/DisconnectMqttInMessageTest.groovy
@@ -9,7 +9,7 @@ class DisconnectMqttInMessageTest extends BaseMqttInMessageTest {
def 'should read message correctly as mqtt 5.0'() {
given:
def propertiesBuffer = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)
+ it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, testSessionExpiryInterval)
it.putProperty(MqttMessageProperty.REASON_STRING, reasonString)
it.putProperty(MqttMessageProperty.SERVER_REFERENCE, serverReference)
it.putProperty(MqttMessageProperty.USER_PROPERTY, testUserProperties)
@@ -27,11 +27,11 @@ class DisconnectMqttInMessageTest extends BaseMqttInMessageTest {
inMessage.reason() == reasonString
inMessage.serverReference() == serverReference
inMessage.reasonCode() == DisconnectReasonCode.QUOTA_EXCEEDED
- inMessage.sessionExpiryInterval() == sessionExpiryInterval
+ inMessage.sessionExpiryInterval() == testSessionExpiryInterval
inMessage.userProperties() == testUserProperties
when:
propertiesBuffer = BufferUtils.prepareBuffer(512) {
- it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval)
+ it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, testSessionExpiryInterval)
it.putProperty(MqttMessageProperty.SERVER_REFERENCE, serverReference)
}
dataBuffer = BufferUtils.prepareBuffer(512) {
@@ -46,7 +46,7 @@ class DisconnectMqttInMessageTest extends BaseMqttInMessageTest {
inMessage.reason() == ""
inMessage.serverReference() == serverReference
inMessage.reasonCode() == DisconnectReasonCode.PACKET_TOO_LARGE
- inMessage.sessionExpiryInterval() == sessionExpiryInterval
+ inMessage.sessionExpiryInterval() == testSessionExpiryInterval
inMessage.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES
}
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy
index 0999a9e7..b6fa6ca7 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy
@@ -13,8 +13,8 @@ class AuthenticationMqtt5OutMessageTest extends BaseMqttOutMessageTest {
def outMessage = new AuthenticationMqtt5OutMessage(
AuthenticateReasonCode.CONTINUE_AUTHENTICATION,
reasonString,
- authMethod,
- authData,
+ testAuthMethod,
+ testAuthData,
testUserProperties)
when:
def typeAndFlags = outMessage.messageTypeAndFlags()
@@ -33,8 +33,8 @@ class AuthenticationMqtt5OutMessageTest extends BaseMqttOutMessageTest {
result
reader.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION
reader.reason() == reasonString
- reader.authenticationMethod() == authMethod
- reader.authenticationData() == authData
+ reader.authenticationMethod() == testAuthMethod
+ reader.authenticationData() == testAuthData
reader.userProperties() == testUserProperties
}
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy
index 4f1250bc..749f115d 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy
@@ -25,7 +25,7 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
MqttVersion.MQTT_5,
240,
250,
- maxMessageSize,
+ testMaxMessageSize,
300,
30,
false,
@@ -49,8 +49,8 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
reasonString,
serverReference,
responseInformation,
- authMethod,
- authData,
+ testAuthMethod,
+ testAuthData,
testUserProperties)
when:
def typeAndFlags = outMessage.messageTypeAndFlags()
@@ -71,7 +71,7 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
inMessage.sessionPresent() == sessionPresent
inMessage.sessionExpiryInterval() == 240
inMessage.receiveMaxPublishes() == 250
- inMessage.maxMessageSize() == maxMessageSize
+ inMessage.maxMessageSize() == testMaxMessageSize
inMessage.assignedClientId() == mqtt5ClientId
inMessage.topicAliasMaxValue() == 300
inMessage.reason() == reasonString
@@ -79,8 +79,8 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
inMessage.serverKeepAlive() == 30
inMessage.responseInformation() == responseInformation
inMessage.serverReference() == serverReference
- inMessage.authenticationData() == authData
- inMessage.authenticationMethod() == authMethod
+ inMessage.authenticationData() == testAuthData
+ inMessage.authenticationMethod() == testAuthMethod
!NumberUtils.toBoolean(inMessage.wildcardSubscriptionAvailable())
!NumberUtils.toBoolean(inMessage.subscriptionIdAvailable())
!NumberUtils.toBoolean(inMessage.sharedSubscriptionAvailable())
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessageTest.groovy
index 86dc3ae4..b5a973d4 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessageTest.groovy
@@ -1,38 +1,46 @@
package javasabr.mqtt.network.message.out
import javasabr.mqtt.model.QoS
+import javasabr.mqtt.model.message.MqttMessageType
import javasabr.mqtt.network.message.in.ConnectMqttInMessage
import javasabr.rlib.common.util.ArrayUtils
import javasabr.rlib.common.util.BufferUtils
+import javasabr.rlib.common.util.NumberUtils
class ConnectMqtt311OutMessageTest extends BaseMqttOutMessageTest {
- def "should write packet correctly"() {
+ def "should write message correctly"() {
given:
- def packet = new ConnectMqtt311OutMessage(
- userName,
+ def outMessage = new ConnectMqtt311OutMessage(
+ testUserName,
"",
mqtt311ClientId,
- userPassword,
+ testUserPassword,
ArrayUtils.EMPTY_BYTE_ARRAY,
QoS.AT_MOST_ONCE,
- keepAlive,
+ testKeepAlive,
willRetain,
cleanStart)
+ when:
+ def typeAndFlags = outMessage.messageTypeAndFlags()
+ byte type = NumberUtils.getHighByteBits(typeAndFlags);
+ byte info = NumberUtils.getLowByteBits(typeAndFlags);
+ then:
+ MqttMessageType.fromByte(type) == MqttMessageType.CONNECT
when:
def dataBuffer = BufferUtils.prepareBuffer(512) {
- packet.write(defaultMqtt311Connection, it)
+ outMessage.write(defaultMqtt311Connection, it)
}
- def reader = new ConnectMqttInMessage(0b0001_0000 as byte)
+ def reader = new ConnectMqttInMessage(info)
def result = reader.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit())
then:
result
- reader.username() == userName
+ reader.username() == testUserName
reader.clientId() == mqtt311ClientId
- reader.password() == userPassword
- reader.keepAlive() == keepAlive
+ reader.password() == testUserPassword
+ reader.keepAlive() == testKeepAlive
reader.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES
reader.cleanStart() == cleanStart
- reader.willRetain == willRetain
+ reader.willRetain() == willRetain
}
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy
index 1e1c9a67..9d3f7086 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy
@@ -1,54 +1,62 @@
package javasabr.mqtt.network.message.out
import javasabr.mqtt.model.QoS
+import javasabr.mqtt.model.message.MqttMessageType
import javasabr.mqtt.network.message.in.ConnectMqttInMessage
import javasabr.rlib.common.util.ArrayUtils
import javasabr.rlib.common.util.BufferUtils
+import javasabr.rlib.common.util.NumberUtils
class ConnectMqtt5OutMessageTest extends BaseMqttOutMessageTest {
def "should write message correctly"() {
given:
def outMessage = new ConnectMqtt5OutMessage(
- userName,
+ testUserName,
"",
mqtt311ClientId,
- userPassword,
+ testUserPassword,
ArrayUtils.EMPTY_BYTE_ARRAY,
QoS.AT_MOST_ONCE,
- keepAlive,
+ testKeepAlive,
willRetain,
cleanStart,
testUserProperties,
- authMethod,
- authData,
- sessionExpiryInterval,
- receiveMaxPublishes,
- maxMessageSize,
- topicAliasMaxValue,
+ testAuthMethod,
+ testAuthData,
+ testSessionExpiryInterval,
+ testReceiveMaxPublishes,
+ testMaxMessageSize,
+ testTopicAliasMaxValue,
requestResponseInformation,
requestProblemInformation)
+ when:
+ def typeAndFlags = outMessage.messageTypeAndFlags()
+ byte type = NumberUtils.getHighByteBits(typeAndFlags);
+ byte info = NumberUtils.getLowByteBits(typeAndFlags);
+ then:
+ MqttMessageType.fromByte(type) == MqttMessageType.CONNECT
when:
def dataBuffer = BufferUtils.prepareBuffer(512) {
outMessage.write(defaultMqtt5Connection, it)
}
- def reader = new ConnectMqttInMessage(0b0001_0000 as byte)
+ def reader = new ConnectMqttInMessage(info)
def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit())
then:
result
- reader.username() == userName
+ reader.username() == testUserName
reader.clientId() == mqtt311ClientId
- reader.password() == userPassword
- reader.keepAlive() == keepAlive
+ reader.password() == testUserPassword
+ reader.keepAlive() == testKeepAlive
reader.userProperties() == testUserProperties
reader.cleanStart() == cleanStart
reader.willRetain() == willRetain
- reader.authenticationMethod() == authMethod
- reader.authenticationData() == authData
- reader.sessionExpiryInterval() == sessionExpiryInterval
- reader.receiveMaxPublishes() == receiveMaxPublishes
- reader.maxPacketSize() == maxMessageSize
- reader.topicAliasMaxValue() == topicAliasMaxValue
+ reader.authenticationMethod() == testAuthMethod
+ reader.authenticationData() == testAuthData
+ reader.sessionExpiryInterval() == testSessionExpiryInterval
+ reader.receiveMaxPublishes() == testReceiveMaxPublishes
+ reader.maxMessageSize() == testMaxMessageSize
+ reader.topicAliasMaxValue() == testTopicAliasMaxValue
reader.requestResponseInformation() == requestResponseInformation
reader.requestProblemInformation() == requestProblemInformation
}
diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/DisconnectAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/DisconnectAckMqtt5OutMessageTest.groovy
index fe176d22..3a2c72ce 100644
--- a/network/src/test/groovy/javasabr/mqtt/network/message/out/DisconnectAckMqtt5OutMessageTest.groovy
+++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/DisconnectAckMqtt5OutMessageTest.groovy
@@ -13,7 +13,7 @@ class DisconnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
testUserProperties,
reasonString,
serverReference,
- sessionExpiryInterval)
+ testSessionExpiryInterval)
when:
def dataBuffer = BufferUtils.prepareBuffer(512) {
packet.write(defaultMqtt5Connection, it)
@@ -26,6 +26,6 @@ class DisconnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest {
reader.userProperties() == testUserProperties
reader.reason == reasonString
reader.serverReference == serverReference
- reader.sessionExpiryInterval == sessionExpiryInterval
+ reader.sessionExpiryInterval == testSessionExpiryInterval
}
}
diff --git a/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy b/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy
index 9d99d42a..edd662b7 100644
--- a/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy
+++ b/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy
@@ -1,9 +1,9 @@
package javasabr.mqtt.network
-
import javasabr.mqtt.model.MqttClientConnectionConfig
import javasabr.mqtt.model.MqttServerConnectionConfig
import javasabr.mqtt.model.MqttVersion
+import javasabr.mqtt.model.PayloadFormat
import javasabr.mqtt.model.QoS
import javasabr.mqtt.model.SubscribeRetainHandling
import javasabr.mqtt.model.data.type.StringPair
@@ -20,6 +20,7 @@ import javasabr.rlib.collections.array.IntArray
import spock.lang.Shared
import java.nio.charset.StandardCharsets
+import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
class NetworkUnitSpecification extends UnitSpecification {
@@ -38,29 +39,33 @@ class NetworkUnitSpecification extends UnitSpecification {
public static final mqtt311ClientId = "testMqtt311ClientId"
public static final mqtt5ClientId = "testMqtt5ClientId"
public static final testMessageId = 1234 as short
- public static final userName = "testUser"
- public static final userPassword = "testPassword".getBytes(StandardCharsets.UTF_8)
- public static final keepAlive = 120
- public static final sessionExpiryInterval = 300
+ public static final testUserName = "testUser"
+ public static final testUserPassword = "testPassword".getBytes(StandardCharsets.UTF_8)
+ public static final testKeepAlive = 120
+ public static final testSessionExpiryInterval = 300
public static final testMessageExpiryInterval = 60
+ public static final testWillMessageExpiryInterval = 90
+ public static final testWillDelayInterval = 10
public static final testTopicAlias = 252
- public static final receiveMaxPublishes = 10
- public static final maxMessageSize = 1024
+ public static final testReceiveMaxPublishes = 10
+ public static final testMaxMessageSize = 1024
public static final maxStringLength = 256
public static final maxBinarySize = 1024
public static final maxTopicLevels = 10
- public static final topicAliasMaxValue = 32
+ public static final testTopicAliasMaxValue = 32
public static final subscriptionId = 637
public static final subscriptionId2 = 623
public static final serverKeepAlive = 1200
public static final requestResponseInformation = true
public static final requestProblemInformation = true
public static final responseInformation = "responseInformation"
- public static final authMethod = "testAuthMethod"
- public static final authData = "testAuthData".getBytes(StandardCharsets.UTF_8)
+ public static final testAuthMethod = "testAuthMethod"
+ public static final testAuthData = "testAuthData".getBytes(StandardCharsets.UTF_8)
public static final reasonString = "reasonString"
public static final publishTopic = TopicName.valueOf("publish/Topic")
public static final testResponseTopic = TopicName.valueOf("response/Topic")
+ public static final testWillTopic = TopicName.valueOf("will/Topic")
+ public static final testWillResponseTopic = TopicName.valueOf("will/response/Topic")
public static final topicFilter = "topic/Filter"
public static final topicFilter1Obj311 = Subscription.minimal(TopicFilter.valueOf(topicFilter), QoS.AT_LEAST_ONCE)
public static final topicFilter1Obj5 = new Subscription(
@@ -85,6 +90,7 @@ class NetworkUnitSpecification extends UnitSpecification {
public static final serverReference = "serverReference"
public static final testContentType = "application/json"
+ public static final testWillContentType = "application/json"
public static final subscribeAckReasonCodes = Array.typed(
SubscribeAckReasonCode,
SubscribeAckReasonCode.GRANTED_QOS_1,
@@ -103,12 +109,20 @@ class NetworkUnitSpecification extends UnitSpecification {
new StringPair("key2", "val2"),
new StringPair("key3", "val3"))
+ public static final testWillUserProperties = Array.typed(
+ StringPair,
+ new StringPair("will_key1", "val1"),
+ new StringPair("will_key2", "val2"))
+
public static final testSubscriptionIds = IntArray.of(subscriptionId, subscriptionId2)
public static final topicFilters = Array.of(topicFilter, topicFilter2)
public static final subscriptionsObj311 = Array.of(topicFilter1Obj311, topicFilter2Obj311)
public static final topicFiltersObj5 = Array.of(topicFilter1Obj5, topicFilter2Obj5)
public static final publishPayload = "publishPayload".getBytes(StandardCharsets.UTF_8)
public static final testCorrelationData = "correlationData".getBytes(StandardCharsets.UTF_8)
+ public static final testWillPayload = "willPayload".getBytes(StandardCharsets.UTF_8)
+ public static final testWillPayloadFormat = PayloadFormat.UTF8_STRING.code() as byte
+ public static final testWillCorrelationData = "willCorrelationData".getBytes(StandardCharsets.UTF_8)
public static final clientIdGenerator = new AtomicInteger(1)
@Shared
@@ -129,14 +143,13 @@ class NetworkUnitSpecification extends UnitSpecification {
MqttServerConnectionConfig defaultServerConnectionConfig() {
return serverConnectionConfig(
maxQos,
- maxMessageSize,
+ testMaxMessageSize,
maxStringLength,
maxBinarySize,
maxTopicLevels,
serverKeepAlive,
- receiveMaxPublishes,
- topicAliasMaxValue,
- sessionExpiryInterval,
+ testReceiveMaxPublishes,
+ testTopicAliasMaxValue,
keepAliveEnabled,
sessionsEnabled,
retainAvailable,
@@ -150,11 +163,11 @@ class NetworkUnitSpecification extends UnitSpecification {
defaultServerConnectionConfig,
maxQos,
MqttVersion.MQTT_3_1_1,
- sessionExpiryInterval,
- receiveMaxPublishes,
- maxMessageSize,
- topicAliasMaxValue,
- keepAlive,
+ testSessionExpiryInterval,
+ testReceiveMaxPublishes,
+ testMaxMessageSize,
+ testTopicAliasMaxValue,
+ testKeepAlive,
false,
false)
}
@@ -164,11 +177,11 @@ class NetworkUnitSpecification extends UnitSpecification {
defaultServerConnectionConfig,
maxQos,
MqttVersion.MQTT_5,
- sessionExpiryInterval,
- receiveMaxPublishes,
- maxMessageSize,
- topicAliasMaxValue,
- keepAlive,
+ testSessionExpiryInterval,
+ testReceiveMaxPublishes,
+ testMaxMessageSize,
+ testTopicAliasMaxValue,
+ testKeepAlive,
false,
false)
}
@@ -196,7 +209,6 @@ class NetworkUnitSpecification extends UnitSpecification {
int serverKeepAlive,
int receiveMaxPublishes,
int topicAliasMaxValue,
- long sessionExpiryInterval,
boolean keepAliveEnabled,
boolean sessionsEnabled,
boolean retainAvailable,
@@ -212,7 +224,6 @@ class NetworkUnitSpecification extends UnitSpecification {
serverKeepAlive,
receiveMaxPublishes,
topicAliasMaxValue,
- sessionExpiryInterval,
keepAliveEnabled,
sessionsEnabled,
retainAvailable,
@@ -236,7 +247,7 @@ class NetworkUnitSpecification extends UnitSpecification {
serverConnectionConfig,
maxQos,
mqttVersion,
- sessionExpiryInterval,
+ Duration.ofSeconds(sessionExpiryInterval),
receiveMaxPublishes,
maxPacketSize,
topicAliasMaxValue,
diff --git a/test-support/build.gradle b/test-support/build.gradle
index 3762e91c..565024ab 100644
--- a/test-support/build.gradle
+++ b/test-support/build.gradle
@@ -3,7 +3,7 @@ plugins {
id("groovy")
}
-description = "Provide all nesessary dependencies for writing tests"
+description = "Provide all necessary dependencies for writing tests"
dependencies {
api libs.rlib.logger.impl
@@ -15,4 +15,5 @@ dependencies {
api libs.groovy.all
api libs.byte.buddy.dep
api libs.objenesis
-}
\ No newline at end of file
+ api libs.project.reactor.core
+}
diff --git a/test-support/src/main/groovy/javasabr/mqtt/test/support/BaseSpecification.groovy b/test-support/src/main/groovy/javasabr/mqtt/test/support/BaseSpecification.groovy
new file mode 100644
index 00000000..e671d4e7
--- /dev/null
+++ b/test-support/src/main/groovy/javasabr/mqtt/test/support/BaseSpecification.groovy
@@ -0,0 +1,25 @@
+package javasabr.mqtt.test.support
+
+import reactor.core.publisher.Mono
+import spock.lang.Specification
+
+import java.util.concurrent.CompletionStage
+
+class BaseSpecification extends Specification {
+
+ protected R fromAsync(Mono mono) {
+ return fromAsync(mono.toFuture())
+ }
+
+ protected R fromAsync(CompletionStage completionStage) {
+ return completionStage.toCompletableFuture().join()
+ }
+
+ protected void waitForAsync(Mono> mono) {
+ waitForAsync(mono.toFuture())
+ }
+
+ protected void waitForAsync(CompletionStage> completionStage) {
+ completionStage.toCompletableFuture().join()
+ }
+}
diff --git a/test-support/src/main/groovy/javasabr/mqtt/test/support/UnitSpecification.groovy b/test-support/src/main/groovy/javasabr/mqtt/test/support/UnitSpecification.groovy
index fa481c41..97d05862 100644
--- a/test-support/src/main/groovy/javasabr/mqtt/test/support/UnitSpecification.groovy
+++ b/test-support/src/main/groovy/javasabr/mqtt/test/support/UnitSpecification.groovy
@@ -1,6 +1,4 @@
package javasabr.mqtt.test.support
-import spock.lang.Specification
-
-class UnitSpecification extends Specification {
+class UnitSpecification extends BaseSpecification {
}