Skip to content

Commit

Permalink
Transfer orphaned players when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
duncte123 committed Feb 8, 2024
1 parent 4cfbb1e commit e3682c9
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 12 deletions.
42 changes: 40 additions & 2 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
*/
fun getLinkIfCached(guildId: Long): Link? = linkMap[guildId]

/**
* Finds all players on unavailable nodes and transfers them to [node].
*/
internal fun transferOrphansTo(node: LavalinkNode) {
// This *should* never happen, but just in case...
if (!node.available) {
return
}

val orphans = findOrphanedPlayers()

orphans.mapNotNull { linkMap[it.guildId] }
.forEach { link ->
link.transferNode(node)
}
}

/**
* Finds all players that are on unavailable nodes.
*/
private fun findOrphanedPlayers(): List<LavalinkPlayer> {
val unavailableNodes = nodes.filter { !it.available }

return unavailableNodes.flatMap { it.playerCache.values }
}

internal fun onNodeDisconnected(node: LavalinkNode) {
// Don't do anything if we are shutting down.
if (!clientOpen) {
Expand All @@ -138,9 +164,21 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
return
}

// If we have no nodes available, don't attempt to load-balance.
if (nodes.all { !it.available }) {
linkMap.filter { (_, link) -> link.node == node }
.forEach { (_, link) ->
link.state = LinkState.DISCONNECTED
}
return
}

linkMap.forEach { (_, link) ->
if (link.node == node) {
link.transferNode(loadBalancer.selectNode(region = null))
if (link.node == node) {
val voiceRegion = link.cachedPlayer?.voiceRegion

link.state = LinkState.CONNECTING
link.transferNode(loadBalancer.selectNode(region = voiceRegion))
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class LavalinkNode(

return rest.getPlayer(guildId)
.map { it.toLavalinkPlayer(this) }
// TODO: check for 404 status code, if not 404, don't create
.onErrorResume { createOrUpdatePlayer(guildId) }
.doOnSuccess {
// Update the player internally upon retrieving it.
Expand Down Expand Up @@ -399,6 +400,10 @@ class LavalinkNode(
*/
fun getCachedPlayer(guildId: Long): LavalinkPlayer? = playerCache[guildId]

internal fun transferOrphansToSelf() {
lavalink.transferOrphansTo(this)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
Expand Down
10 changes: 10 additions & 0 deletions src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkPlayer.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.arbjerg.lavalink.client

import dev.arbjerg.lavalink.client.loadbalancing.VoiceRegion
import dev.arbjerg.lavalink.client.protocol.Track
import dev.arbjerg.lavalink.client.protocol.toCustom
import dev.arbjerg.lavalink.protocol.v4.*
Expand Down Expand Up @@ -44,6 +45,15 @@ class LavalinkPlayer(private val node: LavalinkNode, protocolPlayer: Player) : I
}
}

val voiceRegion: VoiceRegion?
get() {
if (voiceState.endpoint.isBlank()) {
return null
}

return VoiceRegion.fromEndpoint(voiceState.endpoint)
}

override fun setTrack(track: Track?) = PlayerUpdateBuilder(node, guildId)
.setTrack(track)

Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.arbjerg.lavalink.client

import dev.arbjerg.lavalink.protocol.v4.VoiceState
import java.time.Duration
import java.util.function.Consumer

/**
Expand Down Expand Up @@ -67,6 +68,8 @@ class Link(
node.removeCachedPlayer(guildId)
newNode.createOrUpdatePlayer(guildId)
.applyBuilder(player.stateToBuilder())
// Delay by 500ms to hopefully prevent a race-condition from triggering
.delayElement(Duration.ofMillis(500))
.subscribe()
}

Expand Down
16 changes: 8 additions & 8 deletions src/main/kotlin/dev/arbjerg/lavalink/internal/LavalinkSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos
.setNoReplace(false)
.subscribe()
}

// Move players from older, unavailable nodes to ourselves.
node.transferOrphansToSelf()
}

Message.Op.Stats -> {
Expand Down Expand Up @@ -126,19 +129,13 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
node.lavalink.onNodeDisconnected(node)

when(t) {
is EOFException -> {
logger.debug("Got disconnected from ${node.name}, trying to reconnect", t)
node.available = false
open = false
}

is SocketTimeoutException -> {
logger.debug("Got disconnected from ${node.name} (timeout), trying to reconnect", t)
node.available = false
open = false
}

is ConnectException -> {
Expand All @@ -153,14 +150,17 @@ class LavalinkSocket(private val node: LavalinkNode) : WebSocketListener(), Clos
}

logger.warnOrTrace("Socket error on ${node.name}, reconnecting in ${reconnectInterval / 1000} seconds", t)
node.available = false
open = false
}

else -> {
logger.error("Unknown error on ${node.name}", t)
}
}

node.available = false
open = false

node.lavalink.onNodeDisconnected(node)
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
Expand Down
4 changes: 2 additions & 2 deletions testbot/src/main/java/me/duncte123/testbot/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public static void main(String[] args) throws InterruptedException {

private static void registerLavalinkNodes(LavalinkClient client) {
List.of(
/*client.addNode(
client.addNode(
"Testnode",
URI.create("ws://localhost:2333"),
"youshallnotpass",
RegionGroup.EUROPE
)*/
),

client.addNode(
"Pi-local",
Expand Down

0 comments on commit e3682c9

Please sign in to comment.