From e3682c9158e1e0b6aecfc7a7cf10e14f35a1f95d Mon Sep 17 00:00:00 2001 From: duncte123 Date: Thu, 8 Feb 2024 14:47:33 +0100 Subject: [PATCH] Transfer orphaned players when possible --- .../arbjerg/lavalink/client/LavalinkClient.kt | 42 ++++++++++++++++++- .../arbjerg/lavalink/client/LavalinkNode.kt | 5 +++ .../arbjerg/lavalink/client/LavalinkPlayer.kt | 10 +++++ .../dev/arbjerg/lavalink/client/Link.kt | 3 ++ .../lavalink/internal/LavalinkSocket.kt | 16 +++---- .../main/java/me/duncte123/testbot/Main.java | 4 +- 6 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt index b035085..0327804 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt @@ -125,6 +125,32 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { */ fun getLinkIfCached(guildId: Long): Link? = linkMap[guildId] + /** + * Finds all players on unavailable nodes and transfers them to [node]. + */ + internal fun transferOrphansTo(node: LavalinkNode) { + // This *should* never happen, but just in case... + if (!node.available) { + return + } + + val orphans = findOrphanedPlayers() + + orphans.mapNotNull { linkMap[it.guildId] } + .forEach { link -> + link.transferNode(node) + } + } + + /** + * Finds all players that are on unavailable nodes. + */ + private fun findOrphanedPlayers(): List { + val unavailableNodes = nodes.filter { !it.available } + + return unavailableNodes.flatMap { it.playerCache.values } + } + internal fun onNodeDisconnected(node: LavalinkNode) { // Don't do anything if we are shutting down. if (!clientOpen) { @@ -138,9 +164,21 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable { return } + // If we have no nodes available, don't attempt to load-balance. + if (nodes.all { !it.available }) { + linkMap.filter { (_, link) -> link.node == node } + .forEach { (_, link) -> + link.state = LinkState.DISCONNECTED + } + return + } + linkMap.forEach { (_, link) -> - if (link.node == node) { - link.transferNode(loadBalancer.selectNode(region = null)) + if (link.node == node) { + val voiceRegion = link.cachedPlayer?.voiceRegion + + link.state = LinkState.CONNECTING + link.transferNode(loadBalancer.selectNode(region = voiceRegion)) } } } diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt index 64fcecd..d2adf27 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt @@ -134,6 +134,7 @@ class LavalinkNode( return rest.getPlayer(guildId) .map { it.toLavalinkPlayer(this) } + // TODO: check for 404 status code, if not 404, don't create .onErrorResume { createOrUpdatePlayer(guildId) } .doOnSuccess { // Update the player internally upon retrieving it. @@ -399,6 +400,10 @@ class LavalinkNode( */ fun getCachedPlayer(guildId: Long): LavalinkPlayer? = playerCache[guildId] + internal fun transferOrphansToSelf() { + lavalink.transferOrphansTo(this) + } + override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkPlayer.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkPlayer.kt index b80cd2c..e16c287 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkPlayer.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkPlayer.kt @@ -1,5 +1,6 @@ package dev.arbjerg.lavalink.client +import dev.arbjerg.lavalink.client.loadbalancing.VoiceRegion import dev.arbjerg.lavalink.client.protocol.Track import dev.arbjerg.lavalink.client.protocol.toCustom import dev.arbjerg.lavalink.protocol.v4.* @@ -44,6 +45,15 @@ class LavalinkPlayer(private val node: LavalinkNode, protocolPlayer: Player) : I } } + val voiceRegion: VoiceRegion? + get() { + if (voiceState.endpoint.isBlank()) { + return null + } + + return VoiceRegion.fromEndpoint(voiceState.endpoint) + } + override fun setTrack(track: Track?) = PlayerUpdateBuilder(node, guildId) .setTrack(track) diff --git a/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt b/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt index 3d7e7b5..6bf57a4 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt @@ -1,6 +1,7 @@ package dev.arbjerg.lavalink.client import dev.arbjerg.lavalink.protocol.v4.VoiceState +import java.time.Duration import java.util.function.Consumer /** @@ -67,6 +68,8 @@ class Link( node.removeCachedPlayer(guildId) newNode.createOrUpdatePlayer(guildId) .applyBuilder(player.stateToBuilder()) + // Delay by 500ms to hopefully prevent a race-condition from triggering + .delayElement(Duration.ofMillis(500)) .subscribe() } diff --git a/src/main/kotlin/dev/arbjerg/lavalink/internal/LavalinkSocket.kt b/src/main/kotlin/dev/arbjerg/lavalink/internal/LavalinkSocket.kt index acbfad5..3a5313c 100644 --- a/src/main/kotlin/dev/arbjerg/lavalink/internal/LavalinkSocket.kt +++ b/src/main/kotlin/dev/arbjerg/lavalink/internal/LavalinkSocket.kt @@ -60,6 +60,9 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos .setNoReplace(false) .subscribe() } + + // Move players from older, unavailable nodes to ourselves. + node.transferOrphansToSelf() } Message.Op.Stats -> { @@ -126,19 +129,13 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { - node.lavalink.onNodeDisconnected(node) - when(t) { is EOFException -> { logger.debug("Got disconnected from ${node.name}, trying to reconnect", t) - node.available = false - open = false } is SocketTimeoutException -> { logger.debug("Got disconnected from ${node.name} (timeout), trying to reconnect", t) - node.available = false - open = false } is ConnectException -> { @@ -153,14 +150,17 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos } logger.warnOrTrace("Socket error on ${node.name}, reconnecting in ${reconnectInterval / 1000} seconds", t) - node.available = false - open = false } else -> { logger.error("Unknown error on ${node.name}", t) } } + + node.available = false + open = false + + node.lavalink.onNodeDisconnected(node) } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { diff --git a/testbot/src/main/java/me/duncte123/testbot/Main.java b/testbot/src/main/java/me/duncte123/testbot/Main.java index 6b49c54..69bd5f8 100644 --- a/testbot/src/main/java/me/duncte123/testbot/Main.java +++ b/testbot/src/main/java/me/duncte123/testbot/Main.java @@ -38,12 +38,12 @@ public static void main(String[] args) throws InterruptedException { private static void registerLavalinkNodes(LavalinkClient client) { List.of( - /*client.addNode( + client.addNode( "Testnode", URI.create("ws://localhost:2333"), "youshallnotpass", RegionGroup.EUROPE - )*/ + ), client.addNode( "Pi-local",