Skip to content

Commit

Permalink
Fix reference's leak in fast publish of Qos0 and slow subscriber use …
Browse files Browse the repository at this point in the history
…case (#649)

Handling PUB message is a multistep process, it regards forwarding the payload/message to appropriate target sessions event queues and notify the registered interceptors.
When the front processing of the message is terminated, i has to free appropriately the message, decrementing the reference count. This PR fixes a missed decrement.
  • Loading branch information
andsel authored Feb 5, 2022
1 parent a8dfb1b commit 9ed6ca9
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 17 deletions.
3 changes: 1 addition & 2 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,8 @@ Future<? extends Object> processPublish(MqttPublishMessage msg) {
switch (qos) {
case AT_MOST_ONCE:
return postOffice.receivedPublishQos0(topic, username, clientId, msg);
case AT_LEAST_ONCE: {
case AT_LEAST_ONCE:
return postOffice.receivedPublishQos1(this, topic, username, messageID, msg);
}
case EXACTLY_ONCE: {
final CompletableFuture<PostOffice.RouteResult> firstStepFuture = postOffice.routeCommand(clientId, () -> {
bindedSession.receivedPublishQos2(messageID, msg);
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
}

interceptor.notifyTopicPublished(msg, clientID, username);
ReferenceCountUtil.release(msg);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.moquette.integration;

import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;

abstract class AbstractIntegration {

String dbPath;
MqttAsyncClient publisher;

protected MqttAsyncClient createClient(String clientName, Path tempFolder) throws IOException, MqttException {
final String dataPath = IntegrationUtils.newFolder(tempFolder, clientName).getAbsolutePath();
MqttClientPersistence clientDataStore = new MqttDefaultFilePersistence(dataPath);
return new MqttAsyncClient("tcp://localhost:1883", clientName, clientDataStore);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package io.moquette.integration;

import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import jdk.nashorn.internal.ir.annotations.Ignore;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.security.SecureRandom;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Fast publisher sending to slow ACK subscriber which alternates between clean session true/false
* Inspired by issue https://github.com/moquette-io/moquette/issues/608.
* */
public class FastPublisherSlowSubscriberTest extends AbstractIntegration {

private static final Logger LOG = LoggerFactory.getLogger(FastPublisherSlowSubscriberTest.class);

@TempDir
Path tempFolder;
private String dbPath;
private Server broker;
private MqttAsyncClient subscriber;
private ScheduledExecutorService publisherPool;
private final SecureRandom random = new SecureRandom();

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
broker.startServer(new MemoryConfig(configProps));
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);

publisher = createClient("publisher", tempFolder);
publisher.connect().waitForCompletion(1_000);

subscriber = createClient("slow_subscriber", tempFolder);
subscriber.connect().waitForCompletion(1_000);
publisherPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() * 2);
}

@Ignore
@Test
public void publisherAtFixedRate() throws MqttException, InterruptedException {
CountDownLatch stopTest = new CountDownLatch(1);
publisherPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final int delta = random.nextInt(10);
int temp = 15 + delta;
try {
publisher.publish("/temperature", (temp + "°C").getBytes(UTF_8), 0, false);
} catch (MqttException e) {
e.printStackTrace();
stopTest.countDown();
throw new RuntimeException(e);
}
}
}, 1000, 100, TimeUnit.MILLISECONDS);

slowSubscribe(1);

stopTest.await();
}

@Ignore
@Test
public void asFastAsItCan() throws MqttException, InterruptedException {
final Thread publisherTask = new Thread() {
@Override
public void run() {
while(!isInterrupted()) {
final int delta = random.nextInt(10);
int temp = 15 + delta;
try {
publisher.publish("/temperature", (temp + "°C").getBytes(UTF_8), 0, false);
} catch (MqttException e) {
e.printStackTrace();
interrupt();
}
}
}
};
publisherTask.start();

slowSubscribe(2);

publisherTask.join();
}

private void slowSubscribe(int qos) throws MqttException {
subscriber.subscribe("/temperature", qos, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Thread.currentThread().sleep(500);
final String temp = new String(message.getPayload(), UTF_8);
LOG.info("Received temp: {}", temp);
}
});
}

@AfterEach
public void tearDown() throws Exception {
IntegrationUtils.disconnectClient(this.publisher);
IntegrationUtils.disconnectClient(this.subscriber);

// this.broker.stopServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,7 +36,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

// inspired by ServerIntegrationPahoTest
public class PublishToManySubscribersUseCaseTest {
public class PublishToManySubscribersUseCaseTest extends AbstractIntegration {

private static final Logger LOG = LoggerFactory.getLogger(PublishToManySubscribersUseCaseTest.class);

Expand All @@ -47,19 +45,16 @@ public class PublishToManySubscribersUseCaseTest {
private static final int EVENT_LOOPS = Runtime.getRuntime().availableProcessors();
public static final int NUM_SUBSCRIBERS = COMMAND_QUEUE_SIZE * EVENT_LOOPS * 4;
private Server broker;
private IConfig brokerConfig;

@TempDir
Path tempFolder;
private String dbPath;
private MqttAsyncClient publisher;
private List<IMqttAsyncClient> subscribers;

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
configProps.put(BrokerConstants.SESSION_QUEUE_SIZE, Integer.toString(COMMAND_QUEUE_SIZE));
brokerConfig = new MemoryConfig(configProps);
IConfig brokerConfig = new MemoryConfig(configProps);
broker.startServer(brokerConfig);
}

Expand All @@ -73,7 +68,7 @@ public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);

publisher = createClient("publisher");
publisher = createClient("publisher", tempFolder);
publisher.connect().waitForCompletion(1_000);

subscribers = createSubscribers(NUM_SUBSCRIBERS);
Expand Down Expand Up @@ -120,17 +115,11 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
private List<IMqttAsyncClient> createSubscribers(int numSubscribers) throws MqttException, IOException {
List<IMqttAsyncClient> clients = new ArrayList<>(numSubscribers);
for (int i = 0; i < numSubscribers; i++) {
clients.add(createClient("subscriber_" + i));
clients.add(createClient("subscriber_" + i, tempFolder));
}
return clients;
}

private MqttAsyncClient createClient(String clientName) throws IOException, MqttException {
final String dataPath = IntegrationUtils.newFolder(tempFolder, clientName).getAbsolutePath();
MqttClientPersistence clientDataStore = new MqttDefaultFilePersistence(dataPath);
return new MqttAsyncClient("tcp://localhost:1883", clientName, clientDataStore);
}

@AfterEach
public void tearDown() throws Exception {
IntegrationUtils.disconnectClient(this.publisher);
Expand Down

0 comments on commit 9ed6ca9

Please sign in to comment.