Skip to content

Commit

Permalink
adopt new upstream interface
Browse files Browse the repository at this point in the history
  • Loading branch information
source-c committed May 22, 2023
1 parent 529277a commit af4d311
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
:description "A Clojure wrapper to the Moquette Broker library"
:url "https://github.com/dkdhub/clj-mqtt-broker"
:license {:name "Built In Project License"}
:dependencies [[io.moquette/moquette-broker "0.17-SNAPSHOT-1"
:dependencies [[io.moquette/moquette-broker "0.17-SNAPSHOT"
:exclusions [com.bugsnag/bugsnag
org.slf4j/slf4j-api
org.slf4j/slf4j-log4j12]]])
9 changes: 6 additions & 3 deletions src-clj/clj_mqtt_broker/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
(close [o])
(send [o from to data qos retain?])
(clients [o])
(disconnect [o ^String client]))
(disconnect
[o ^String client]
[o ^String client ^Boolean flush?]))

(deftype Broker [^IBroker instance]
CljBroker
Expand All @@ -66,9 +68,10 @@
(if (instance? AdvancedBroker instance)
(map #(->> % (into {}) clojure.walk/keywordize-keys) (.clients instance))
nil))
(disconnect [_ client]
(disconnect [_ client] (disconnect _ client false))
(disconnect [_ client flush?]
(if (instance? AdvancedBroker instance)
(.disconnect instance client)
(.disconnect instance client (boolean flush?))
false)))

(defmulti create-broker (fn [x] (class x)))
Expand Down
6 changes: 3 additions & 3 deletions src-java/com/dkdhub/mqtt_broker/AdvancedBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public void send(String from, String topic, byte[] data, MqttQoS qos, Boolean re
}
}

public boolean disconnect(String client) {
public boolean disconnect(String client, boolean flush) {
if (client == null) return false;

LOG.debug("Will disconnect client {}", client);
boolean res = m_server.disconnectClient(client);
LOG.debug("Will disconnect client {}, flush state: {}", client, flush);
boolean res = flush ? m_server.disconnectClient(client) : m_server.disconnectAndPurgeClientState(client);
LOG.debug("Disconnect result: {}", res);
return res;
}
Expand Down
30 changes: 29 additions & 1 deletion test/basic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@

(deftest check-advanced-disconnects
(testing "Checking advanced client for disconnects")
(log/info "--------- MQTT Advanced Broker check disconnect ---------")
(log/info "--------- MQTT Advanced Broker check disconnect (keep state) ---------")
(is (let [b (Broker. (AdvancedBroker. advanced-config))
results (atom [])]
(start b (TraceHandlers. "3456"))
Expand All @@ -145,6 +145,34 @@
:when client-id]
(swap! results conj (disconnect b client-id)))

;; TODO: add here check state test part

(println "======> MQTT Advanced Broker goes down .........")
(println "======> MQTT Clients:" (clients b))
(Thread/sleep 1000)

(stop b)
(println "======> MQTT Results:" @results)
(or (empty? @results)
(every? true? @results))))

(log/info "--------- MQTT Advanced Broker check disconnect (state flush) ---------")
(is (let [b (Broker. (AdvancedBroker. advanced-config))
results (atom [])]
(start b (TraceHandlers. "3456"))

(println "======> MQTT Advanced Broker wait .........")
(Thread/sleep 10000)
(println "======> MQTT Clients:" (clients b))
(println "======> MQTT Advanced Broker disconnecting .........")

(doseq [client (clients b)
:let [client-id (:id client)]
:when client-id]
(swap! results conj (disconnect b client-id true)))

;; TODO: add here check state test part

(println "======> MQTT Advanced Broker goes down .........")
(println "======> MQTT Clients:" (clients b))
(Thread/sleep 1000)
Expand Down

0 comments on commit af4d311

Please sign in to comment.