Skip to content

Conversation

@HeartLinked
Copy link

@HeartLinked HeartLinked commented Jun 16, 2025

This PR introduces a crucial option to run the RocketMQ-MQTT proxy layer in a configuration that does not depend on an external stateful meta service.

The primary motivation is to simplify deployment and reduce operational overhead for users who do not require stateful features like Retain Messages and Will Messages. By disabling this dependency, users can deploy the stateless MQTT proxy cluster without the need to set up, manage, and operate a separate, Raft-based meta cluster.

Key Changes

This feature is implemented through a series of coordinated changes across the common, ds, and cs modules:

  1. New Configuration Options:
  • enableMetaModule (in service.conf): A boolean switch (true by default) to enable or disable the entire meta module and its dependent features.
  • staticFirstTopics (in service.conf): A comma-separated list of topics, required when enableMetaModule is false.
  • localAddress (in service.conf): The address for internal RPC loopback, required when enableMetaModule is false.
  1. Conditional Logic in NotifyManager:

The NotifyManager has been refactored to operate in two modes.When meta is disabled, it subscribes to the topics listed in staticFirstTopics at startup, instead of dynamically refreshing from the meta service. The message notification logic (notifyMessage) is adjusted to perform a local RPC loopback to itself, instead of broadcasting to a cluster of nodes.

  1. Feature Gates for will and retain Messages:

To ensure system stability, features dependent on the meta module are now gated by the enableMetaModule switch.

  • Will Messages: The MqttConnectHandler now checks the switch. If a client attempts to set a Will Message while the meta module is disabled, the server will log an error and close the connection.
  • Retain Messages: The PublishProcessor now checks the switch. If a client attempts to publish a message with the retain flag set while the meta module is disabled, the request is actively rejected.

How to Configure and Test Standalone Mode

  • In service.conf, set enableMetaModule=false.
  • Configure staticFirstTopics with a comma-separated list of all your application's first-level topics (e.g., staticFirstTopics=topicA,iot/devices).
  • Configure localAddress (e.g., localAddress=127.0.0.1).
  • Start the service using only sh mqtt.sh start. The meta.sh process is not needed.
  • To Test Rejection:
    Connect a client with a Will Message set. The server should refuse the connection.
    With an active connection, publish a message with the retain flag set to true. The server should close the connection.

Related Issue

fix #305

@DongyuanPan DongyuanPan changed the title feat: Introduce Standalone Mode by Allowing Meta Module Disabling [ISSUE #305]feat: Introduce Standalone Mode by Allowing Meta Module Disabling Jun 16, 2025
enableMultiDispatch = true
```

### Deployment Modes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be called a deployment mode.


+ Cluster-Ready Mode (Meta Enabled, Default): This is the default and full-featured mode. It relies on a meta service (based on RocketMQ's KV storage) for dynamic configuration, cluster node discovery, and advanced features like Retain Messages and Will Messages. This mode is suitable for production and high-availability environments.

+ Standalone Mode (Meta Disabled): This is a simplified mode designed for single-node deployments or scenarios where advanced features are not required. It operates without any dependency on the meta service, making configuration and deployment much simpler. In this mode, features like Retain Messages and Will Messages are disabled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mqtt proxy is still deployed in a cluster

@DongyuanPan
Copy link
Contributor

plz commit to the develop branch


private boolean enableMetaModule = true;
private String staticFirstTopics;
private String localAddress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need staticFirstTopics and localAddress?

public CompletableFuture<StoreResult> put(MqttMessageUpContext context, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;

if (mqttPublishMessage.fixedHeader().isRetain()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can be simplified and less intrusive. For example

if (mqttPublishMessage.fixedHeader().isRetain() && !serviceConf.isEnableMetaModule()) {
logger.error("Client [{}] on topic [{}] tried to publish a Retain Message, but the meta module is disabled. Rejecting request.",
                        context.getClientId(), mqttPublishMessage.variableHeader().topicName());

                MqttRetainException exception = new MqttRetainException("Retain Message feature is disabled.");

                CompletableFuture<StoreResult> failedFuture = new CompletableFuture<>();
                failedFuture.completeExceptionally(exception);
                return failedFuture;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

我是单机部署的RocketMQ,想要禁用RocketMQ-mqtt中用不上的代码,比如raft,应该如何做

2 participants