Skip to content

Commit

Permalink
feat: add support for automatic device token rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
OpenSrcerer committed Apr 16, 2024
1 parent 81b9f55 commit 986dc9b
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 15 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
implementation("io.quarkus:quarkus-resteasy-reactive")
implementation("io.quarkus:quarkus-config-yaml")

// HiveMQ MQTT Client
implementation("com.hivemq:hivemq-mqtt-client-reactor:1.3.3")

// Kotlin-Specific
implementation("io.quarkus:quarkus-kotlin")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
Expand Down
14 changes: 0 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ networks:
driver: bridge

services:
paddy-backend:
container_name: paddy-backend
networks:
paddy-bridge:
aliases:
- paddy.backend.io
build: .
restart: on-failure
ports:
- "443:80" # Redirect https traffic to http because we are using a terminating LB
- "80:80"
env_file:
- ".env"

paddy-auth:
container_name: paddy-auth
networks:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package online.danielstefani.paddy.mqtt

import io.smallrye.config.ConfigMapping
import io.smallrye.config.WithDefault

@ConfigMapping(prefix = "mqtt")
interface MqttConfiguration {

@WithDefault("localhost")
fun host(): String

@WithDefault("1883")
fun port(): Int

@WithDefault("paddy-auth")
fun clientId(): String

@WithDefault("daemon/+/v1/reads")
fun deviceReadTopic(): String
}
141 changes: 141 additions & 0 deletions src/main/kotlin/online/danielstefani/paddy/mqtt/RxMqttClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package online.danielstefani.paddy.mqtt

import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient
import com.hivemq.client.mqtt.mqtt5.advanced.Mqtt5ClientAdvancedConfig
import com.hivemq.client.mqtt.mqtt5.message.auth.Mqtt5SimpleAuth
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult
import io.quarkus.logging.Log
import io.quarkus.runtime.StartupEvent
import io.reactivex.*
import io.reactivex.Observable
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Observes
import online.danielstefani.paddy.jwt.JwtService
import online.danielstefani.paddy.jwt.dto.JwtType
import reactor.core.publisher.Flux
import reactor.util.retry.Retry
import java.time.Duration
import java.util.*

@ApplicationScoped
class RxMqttClient(
private val mqttConfig: MqttConfiguration,
private val jwtService: JwtService
) {
// Singleton
private var mqttClient: Mqtt5RxClient? = null
private val mqttClientId = UUID.randomUUID()

private var jwtUsername: String? = null

// Build the client on startup
fun startup(@Observes event: StartupEvent) {
// Since this is the auth service it generates its own credential
jwtUsername = jwtService.makeJwt("paddy-backend", JwtType.ADMIN, null).jwt
rebuildMqttClient()
}

fun publish(
daemonId: String,
action: String,
message: String = "",
qos: MqttQos = MqttQos.AT_MOST_ONCE,
): Flowable<Mqtt5PublishResult>? {
val topic = mqttConfig.deviceReadTopic().replace("+", daemonId) + "/$action"

return mqttClient?.publish(
Flowable.just(Mqtt5Publish.builder()
.topic(topic)
.qos(qos)
.payload(message.toByteArray())
.build()))
?.map { if (it.error.isPresent) throw it.error.get() else it }
}

/**
* Rebuilds (replaces) current singleton client.
*/
fun rebuildMqttClient(): Mqtt5RxClient {
shutdownClient().blockingAwait() // Kill current client (null checks inside)

// ---- Build Client ----
val client = Mqtt5Client.builder()
.identifier(
"${mqttConfig.clientId()}-scheduler-$mqttClientId".apply {
Log.info("[client->mqtt->reaper] // Building new MQTT client: $this")
}
)
.serverHost(mqttConfig.host())
.serverPort(mqttConfig.port())
.also {
it.simpleAuth(
Mqtt5SimpleAuth.builder()
.username(jwtUsername!!)
.build()
)
}
.advancedConfig(
Mqtt5ClientAdvancedConfig.builder()
.allowServerReAuth(true)
.build()
)
.buildRx()

return client.apply {
mqttClient = this
mqttClient!!
.connectScenario()
// Need to do this because RxJava's
// error handling mechanism is futile
// Project Reactor >>>>
.`as` { Flux.from(it.toFlowable(BackpressureStrategy.BUFFER)) }
.retryWhen(
Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
.doBeforeRetry {
Log.info("[client->mqtt] // " +
"Connecting to... ${mqttConfig.host()}:${mqttConfig.port()}")
}
)
.subscribe()
}
}

/**
* Define connection rules for MQTT Client building above.
*/
private fun Mqtt5RxClient.connectScenario(): Observable<Mqtt5ConnAck> {
return this.connectWith()
.cleanStart(true)
.applyConnect()
.doOnSubscribe { Log.info("[client->mqtt] // " +
"Connecting to... ${mqttConfig.host()}:${mqttConfig.port()}") }
.doOnSuccess { Log.info("[client->mqtt] // " +
"Connected to ${mqttConfig.host()}:${mqttConfig.port()}, ${it.reasonCode}") }
.doOnError { Log.error("[client->mqtt] // " +
"Connection failed to ${mqttConfig.host()}:${mqttConfig.port()}, ${it.message}") }
.onErrorResumeNext {
// if client is already connected, continue with a single that never
// emits to stop reconnections
if (mqttClient?.state?.isConnected == true)
return@onErrorResumeNext Single.never<Mqtt5ConnAck>()

Single.error(it)
}
.toObservable()
}

private fun shutdownClient(): Completable {
if (mqttClient == null || !mqttClient!!.state.isConnected)
return Completable.complete()
.doOnComplete { Log.info("[client->mqtt->reaper] // " +
"Old MQTT client was null or disconnected, ignoring it") }

return mqttClient!!.disconnect()
.doOnComplete { Log.info("[client->mqtt->reaper] // " +
"Reaped old MQTT client ${mqttClient!!.config.clientIdentifier.get()}") }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package online.danielstefani.paddy.security.mqtt

import com.hivemq.client.mqtt.datatypes.MqttQos
import io.quarkus.logging.Log
import jakarta.ws.rs.Consumes
import jakarta.ws.rs.POST
Expand All @@ -8,17 +9,25 @@ import jakarta.ws.rs.Produces
import jakarta.ws.rs.core.MediaType
import online.danielstefani.paddy.security.AbstractAuthorizationController
import online.danielstefani.paddy.jwt.JwtService
import online.danielstefani.paddy.jwt.dto.JwtType
import online.danielstefani.paddy.mqtt.RxMqttClient
import online.danielstefani.paddy.security.dto.AuthenticationRequestDto
import online.danielstefani.paddy.security.dto.AuthenticationResultDto
import org.jboss.resteasy.reactive.RestResponse
import java.time.Instant

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
class MqttAuthorizationController(
private val jwtService: JwtService
private val jwtService: JwtService,
private val mqttClient: RxMqttClient
) : AbstractAuthorizationController() {

companion object {
const val SECONDS_WEEK = 604800
}

@POST
@Path("/verify")
fun verifyIncomingMqttJwt(authDto: AuthenticationRequestDto): RestResponse<AuthenticationResultDto> {
Expand All @@ -28,6 +37,14 @@ class MqttAuthorizationController(
return forbid("<missing/invalid jwt>", authDto.topic!!)

val sub = jwt.getJsonObject("payload").getString("sub")
val exp = jwt.getJsonObject("payload").getString("exp")

// If JWT on the device is expiring in one week, rotate it
if (exp.toLong() <= Instant.now().epochSecond + SECONDS_WEEK) {
val newJwt = jwtService.makeJwt(sub, JwtType.DAEMON, null).jwt
mqttClient.publish(sub, "rotate", newJwt, qos = MqttQos.EXACTLY_ONCE)
?.subscribe()
}

// Special case: Check if the token is for the backend
if (sub.equals("paddy-backend")) {
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
quarkus:
native:
additional-build-args:
- '--initialize-at-run-time=com.hivemq.client.internal.mqtt.codec.encoder.MqttPingReqEncoder'
- '--initialize-at-run-time=com.hivemq.client.internal.mqtt.codec.encoder.mqtt3.Mqtt3DisconnectEncoder'

devservices:
enabled: false

http:
port: 80
root-path: /auth/v1
Expand Down

0 comments on commit 986dc9b

Please sign in to comment.