Skip to content

Commit

Permalink
Improve restart behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Mar 27, 2024
1 parent 4761b11 commit d67f2f4
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 56 deletions.
117 changes: 67 additions & 50 deletions src/main/scala/alpakka/mqtt/MqttEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.mqtt.streaming._
import org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.{ActorMqttClientSession, Mqtt}
import org.apache.pekko.stream.scaladsl.{Keep, RestartFlow, Sink, Source, SourceQueueWithComplete, Tcp}
import org.apache.pekko.stream.{OverflowStrategy, RestartSettings, ThrottleMode}
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete, Tcp}
import org.apache.pekko.stream.{OverflowStrategy, ThrottleMode}
import org.apache.pekko.util.ByteString
import org.slf4j.{Logger, LoggerFactory}

Expand All @@ -21,13 +21,15 @@ import scala.util.{Failure, Success, Try}
* Doc:
* https://pekko.apache.org/docs/pekko-connectors/current/mqtt-streaming.html
*
* Works, but still has restart-issue on lost tcp connection, see:
* Works, with (partially) resolved restart-issue on startup as well as during operation, see:
* https://discuss.lightbend.com/t/alpakka-mqtt-streaming-client-does-not-complain-when-there-is-no-tcp-connection/7113/4
*
* TODO Remaining issue: clientPublisher flow restarts during operation lead to:
* - a new source is starting re-sending all elements (Idea: save the pointer from the last msg in atomic param)
* - the current source continues to run (Idea: stop with kill switch)
*
* Additional inspirations:
* TODO Handling ConnAck is key to success
* https://github.com/michalstutzmann/scala-util/tree/master/src/main/scala/com/github/mwegrz/scalautil/mqtt
* https://github.com/ASSIST-IoT-SRIPAS/scala-mqtt-wrapper/blob/main/examples/PekkoMain.scala
*
* Prerequisites:
* Start the docker MQTT broker from: /docker/docker-compose.yml
Expand All @@ -48,54 +50,75 @@ object MqttEcho extends App {
implicit val ec = system.dispatcher

val topic = "myTopic"
val clientId = s"pub-$id"
val pubClient = client(clientId, sys, host, port)
val clientId = s"Pub-$id"
val connAckPromise = Promise[Unit]

val pubClient = client(clientId, sys, host, port, connAckPromise)

// A received ConAck confirms that we are connected
connAckPromise.future.onComplete { _: Try[Unit] =>
logger.info(s"$clientId bound to: $host:$port")

Source(1 to 100)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.map(each => s"$id-$each")
.wireTap(each => logger.info(s"$clientId sending payload: $each"))
.map {
msg =>
// On the server each new retained message overwrites the previous one
val publish = Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString(msg.toString))
pubClient.session ! Command(publish, None)
}.runWith(Sink.ignore)
}

// TODO Handle ConAck
pubClient.done.onComplete {
case Success(value) => logger.info(s"Client: $clientId stopped with: $value. Probably lost tcp connection")
case Failure(exception) => logger.error(s"Client: $clientId has no tcp connection on startup: ", exception)
case Success(value) =>
logger.info(s"$clientId stopped with: $value. Probably lost tcp connection. Restarting...")
clientPublisher(id, system, host, port)
case Failure(exception) => logger.error(s"$clientId has no tcp connection on startup. Ex: ${exception.getMessage}. Restarting...")
Thread.sleep(1000)
clientPublisher(id, system, host, port)
}

logger.info(s"Client: $clientId bound to: $host:$port")

Source(1 to 100)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.wireTap(each => logger.info(s"Client: $clientId sending: $each"))
.map {
msg =>
val promise = Promise[None.type]()
// On the server each new retained message overwrites the previous one
val publish = Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString(msg.toString))
// WIP
// https://github.com/akka/alpakka/issues/1581
pubClient.session ! Command(publish, () => promise.complete(Try(None)))
promise.future
}.runWith(Sink.ignore)
}

def clientSubscriber(id: Int, system: ActorSystem, host: String, port: Int): Unit = {
implicit val sys = system
implicit val ec = system.dispatcher

val topic = "myTopic"
val clientId = s"sub-$id"
val subClient = client(clientId, sys, host, port)

// Delay the subscription to get a "last known good value" eg 6
Thread.sleep(5000)
val topicFilters: Seq[(String, ControlPacketFlags)] = List((topic, ControlPacketFlags.QoSAtMostOnceDelivery))
logger.info(s"Client: $clientId send Subscribe for topic: $topic")
subClient.commands.offer(Command(Subscribe(topicFilters)))
val clientId = s"Sub-$id"
val connAckPromise = Promise[Unit]
val subClient = client(clientId, sys, host, port, connAckPromise)

// A received ConAck confirms that we are connected
connAckPromise.future.onComplete { _: Try[Unit] =>
logger.info(s"$clientId bound to: $host:$port")

// Delay the subscription to get a "last known good value" eg 6
Thread.sleep(5000)
val topicFilters: Seq[(String, ControlPacketFlags)] = List((topic, ControlPacketFlags.QoSAtMostOnceDelivery))
logger.info(s"$clientId send Subscribe for topic: $topic")
subClient.commands.offer(Command(Subscribe(topicFilters)))
}

subClient.done.onComplete {
case Success(value) =>
logger.info(s"$clientId stopped with: $value. Probably lost tcp connection. Restarting...")
Thread.sleep(2000)
clientSubscriber(id, system, host, port)
case Failure(exception) => logger.error(s"$clientId has no tcp connection on startup. Ex: ${exception.getMessage}. Restarting...")
Thread.sleep(2000)
clientSubscriber(id, system, host, port)
}
}


// Common client for Publisher/Subscriber
private def client(clientId: String, system: ActorSystem, host: String, port: Int): MqttClient = {
private def client(clientId: String, system: ActorSystem, host: String, port: Int, connAckPromise: Promise[Unit]): MqttClient = {
implicit val sys = system
implicit val ec: ExecutionContextExecutor = system.dispatcher

logger.info(s"Client: $clientId starting...")
logger.info(s"$clientId starting...")

val settings = MqttSessionSettings()
val clientSession = ActorMqttClientSession(settings)
Expand All @@ -107,40 +130,34 @@ object MqttEcho extends App {
.clientSessionFlow(clientSession, ByteString(clientId))
.join(connection)

val restartSettings = RestartSettings(1.second, 10.seconds, 0.2).withMaxRestarts(10, 1.minute)
val restartFlow = RestartFlow.onFailuresWithBackoff(restartSettings)(() => mqttFlow)

val (commands, done) = {
Source
.queue(10, OverflowStrategy.backpressure, 10)
.via(restartFlow)
.via(mqttFlow)
// Filter the Ack events
.filter {
case Right(Event(_: ConnAck, _)) =>
logger.info(s"Client: $clientId received ConnAck")
logger.info(s"$clientId received ConnAck")
connAckPromise.complete(Success(()))
false
case Right(Event(_: SubAck, _)) =>
logger.info(s"Client: $clientId received SubAck")
logger.info(s"$clientId received SubAck")
false
case Right(Event(pa: PubAck, Some(carry))) =>
logger.info(s"Client: $clientId received PubAck for: ${pa.packetId}")
// WIP carry is of type Nothing
// https://github.com/akka/alpakka/pull/1908
// carry.asInstanceOf[Promise[Done]].success(Done)
case Right(Event(pa: PubAck, _)) =>
logger.info(s"$clientId received PubAck for: ${pa.packetId}")
false
case _ => true
}

// Only the Publish events are interesting for the subscriber
.collect { case Right(Event(p: Publish, _)) => p }
.wireTap(event => logger.info(s"Client: $clientId received: ${event.payload.utf8String}"))
.wireTap(event => logger.info(s"$clientId received payload: ${event.payload.utf8String}"))
.toMat(Sink.ignore)(Keep.both)
.run()
}

logger.info(s"$clientId. About to send connect cmd...")
val connectCommand = Command(Connect(clientId, ConnectFlags.CleanSession))

logger.info("About to connect...")
commands.offer(connectCommand)

MqttClient(session = clientSession, commands = commands, done = done)
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/alpakka/mqtt/MqttPahoEcho.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ object MqttPahoEcho extends App {
val messages = (0 to 100).flatMap(i => Seq(MqttMessage(topic, ByteString(s"$clientId-$i"))))

val publisherSink = wrapWithRestartSink(
MqttSink(connectionSettings.withClientId(s"Pub: $clientId"), MqttQoS.AtLeastOnce))
MqttSink(connectionSettings.withClientId(s"Pub-$clientId"), MqttQoS.AtLeastOnce))

Source(messages)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.wireTap(each => logger.info(s"Pub: $clientId sending payload: ${each.payload.utf8String}"))
.wireTap(each => logger.info(s"Pub-$clientId sending payload: ${each.payload.utf8String}"))
.runWith(publisherSink)
}

Expand All @@ -78,15 +78,15 @@ object MqttPahoEcho extends App {
val subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce)

val subscriberSource = wrapWithRestartSource(
MqttSource.atMostOnce(connectionSettings.withClientId(s"Sub: $clientId"), subscriptions, 8))
MqttSource.atMostOnce(connectionSettings.withClientId(s"Sub-$clientId"), subscriptions, 8))

val (subscribed, streamCompletion) = subscriberSource
.wireTap(msg => logger.info(s"Sub: $clientId received payload: ${msg.payload.utf8String}"))
.wireTap(msg => logger.debug(s"Sub: $clientId received payload: ${msg.payload.utf8String}. Details: ${msg.toString()}"))
.wireTap(msg => logger.info(s"Sub-$clientId received payload: ${msg.payload.utf8String}"))
.wireTap(msg => logger.debug(s"Sub-$clientId received payload: ${msg.payload.utf8String}. Details: ${msg.toString()}"))
.toMat(Sink.ignore)(Keep.both)
.run()

subscribed.onComplete(each => logger.info(s"Sub: $clientId subscribed: $each"))
subscribed.onComplete(each => logger.info(s"Sub-$clientId subscribed: $each"))
}

private def wrapWithRestartSource[M](source: => Source[M, Future[Done]]): Source[M, Future[Done]] = {
Expand Down

0 comments on commit d67f2f4

Please sign in to comment.