Skip to content

Commit e5a6e7e

Browse files
authored
Merge branch 'main' into remove-deprecated
2 parents 7a22b8c + 3047756 commit e5a6e7e

File tree

6 files changed

+51
-27
lines changed

6 files changed

+51
-27
lines changed

src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkClient.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import reactor.core.Disposable
99
import reactor.core.publisher.Flux
1010
import reactor.core.publisher.Sinks
1111
import java.io.Closeable
12+
import java.time.Duration
1213
import java.util.concurrent.ConcurrentHashMap
1314
import java.util.concurrent.CopyOnWriteArrayList
1415
import java.util.concurrent.Executors
@@ -30,8 +31,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
3031
get() = linkMap.values.toList()
3132

3233
// Events forwarded from all nodes.
33-
private val sink: Sinks.Many<ClientEvent<*>> = Sinks.many().multicast().onBackpressureBuffer()
34-
val flux: Flux<ClientEvent<*>> = sink.asFlux()
34+
private val sink: Sinks.Many<ClientEvent> = Sinks.many().multicast().onBackpressureBuffer()
35+
val flux: Flux<ClientEvent> = sink.asFlux()
3536
private val reference: Disposable = flux.subscribe()
3637

3738
/**
@@ -170,7 +171,8 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
170171
val voiceRegion = link.cachedPlayer?.voiceRegion
171172

172173
link.state = LinkState.CONNECTING
173-
link.transferNode(loadBalancer.selectNode(region = voiceRegion))
174+
// The delay is used to prevent a race condition in Discord, causing close code 4006
175+
link.transferNode(loadBalancer.selectNode(region = voiceRegion), delay = Duration.ofMillis(1000))
174176
}
175177
}
176178
}
@@ -183,7 +185,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
183185
*
184186
* @return a [Flux] of [ClientEvent]s
185187
*/
186-
fun <T : ClientEvent<*>> on(type: Class<T>): Flux<T> {
188+
fun <T : ClientEvent> on(type: Class<T>): Flux<T> {
187189
return flux.ofType(type)
188190
}
189191

@@ -192,7 +194,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
192194
*
193195
* @return a [Flux] of [ClientEvent]s
194196
*/
195-
inline fun <reified T : ClientEvent<*>> on() = on(T::class.java)
197+
inline fun <reified T : ClientEvent> on() = on(T::class.java)
196198

197199
/**
198200
* Close the client and disconnect all nodes.
@@ -213,7 +215,7 @@ class LavalinkClient(val userId: Long) : Closeable, Disposable {
213215
}
214216

215217
private fun listenForNodeEvent(node: LavalinkNode) {
216-
node.on<ClientEvent<Message>>()
218+
node.on<ClientEvent>()
217219
.subscribe {
218220
try {
219221
sink.tryEmitNext(it)

src/main/kotlin/dev/arbjerg/lavalink/client/LavalinkNode.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ class LavalinkNode(
4949

5050
internal val httpClient = OkHttpClient.Builder().callTimeout(nodeOptions.httpTimeout, TimeUnit.MILLISECONDS).build()
5151

52-
internal val sink: Many<ClientEvent<*>> = Sinks.many().multicast().onBackpressureBuffer()
53-
val flux: Flux<ClientEvent<*>> = sink.asFlux()
52+
internal val sink: Many<ClientEvent> = Sinks.many().multicast().onBackpressureBuffer()
53+
val flux: Flux<ClientEvent> = sink.asFlux()
5454
private val reference: Disposable = flux.subscribe()
5555

5656
internal val rest = LavalinkRestClient(this)
@@ -89,7 +89,7 @@ class LavalinkNode(
8989
*
9090
* @return a [Flux] of [ClientEvent]s
9191
*/
92-
fun <T : ClientEvent<*>> on(type: Class<T>): Flux<T> {
92+
fun <T : ClientEvent> on(type: Class<T>): Flux<T> {
9393
return flux.ofType(type)
9494
}
9595

@@ -98,7 +98,7 @@ class LavalinkNode(
9898
*
9999
* @return a [Flux] of [ClientEvent]s
100100
*/
101-
inline fun <reified T : ClientEvent<*>> on() = on(T::class.java)
101+
inline fun <reified T : ClientEvent> on() = on(T::class.java)
102102

103103
/**
104104
* Retrieves a list of all players from the lavalink node.

src/main/kotlin/dev/arbjerg/lavalink/client/Link.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,14 @@ class Link(
5252
*/
5353
fun loadItem(identifier: String) = node.loadItem(identifier)
5454

55-
internal fun transferNode(newNode: LavalinkNode) {
55+
internal fun transferNode(newNode: LavalinkNode, delay: Duration = Duration.ZERO) {
5656
val player = node.getCachedPlayer(guildId)
5757

5858
if (player != null) {
5959
node.removeCachedPlayer(guildId)
6060
newNode.createOrUpdatePlayer(guildId)
6161
.applyBuilder(player.stateToBuilder())
62-
// Delay by 500ms to hopefully prevent a race-condition from triggering
63-
.delayElement(Duration.ofMillis(500))
62+
.delaySubscription(delay)
6463
.subscribe()
6564
}
6665

src/main/kotlin/dev/arbjerg/lavalink/client/events.kt

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ internal fun Message.toClientEvent(node: LavalinkNode) = when (this) {
1717
is Message.StatsEvent -> StatsEvent(node, frameStats, players, playingPlayers, uptime, memory, cpu)
1818
}
1919

20-
sealed class ClientEvent<T : Message>(open val node: LavalinkNode)
20+
sealed class ClientEvent(open val node: LavalinkNode)
2121

2222
// Normal events
2323
data class ReadyEvent(override val node: LavalinkNode, val resumed: Boolean, val sessionId: String)
24-
: ClientEvent<Message.ReadyEvent>(node)
24+
: ClientEvent(node)
2525

2626
data class PlayerUpdateEvent(override val node: LavalinkNode, val guildId: Long, val state: PlayerState)
27-
: ClientEvent<Message.PlayerUpdateEvent>(node)
27+
: ClientEvent(node)
2828

2929
data class StatsEvent(
3030
override val node: LavalinkNode,
@@ -34,23 +34,23 @@ data class StatsEvent(
3434
val uptime: Long,
3535
val memory: Memory,
3636
val cpu: Cpu
37-
) : ClientEvent<Message.StatsEvent>(node)
37+
) : ClientEvent(node)
3838

3939
// Player events
40-
sealed class EmittedEvent<T : Message.EmittedEvent>(override val node: LavalinkNode, open val guildId: Long)
41-
: ClientEvent<T>(node)
40+
sealed class EmittedEvent(override val node: LavalinkNode, open val guildId: Long)
41+
: ClientEvent(node)
4242

4343
data class TrackStartEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track)
44-
: EmittedEvent<Message.EmittedEvent.TrackStartEvent>(node, guildId)
44+
: EmittedEvent(node, guildId)
4545

4646
data class TrackEndEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val endReason: AudioTrackEndReason)
47-
: EmittedEvent<Message.EmittedEvent.TrackEndEvent>(node, guildId)
47+
: EmittedEvent(node, guildId)
4848

4949
data class TrackExceptionEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val exception: TrackException)
50-
: EmittedEvent<Message.EmittedEvent.TrackExceptionEvent>(node, guildId)
50+
: EmittedEvent(node, guildId)
5151

5252
data class TrackStuckEvent(override val node: LavalinkNode, override val guildId: Long, val track: Track, val thresholdMs: Long)
53-
: EmittedEvent<Message.EmittedEvent.TrackStuckEvent>(node, guildId)
53+
: EmittedEvent(node, guildId)
5454

5555
data class WebSocketClosedEvent(override val node: LavalinkNode, override val guildId: Long, val code: Int, val reason: String, val byRemote: Boolean)
56-
: EmittedEvent<Message.EmittedEvent.WebSocketClosedEvent>(node, guildId)
56+
: EmittedEvent(node, guildId)

src/test/kotlin/testScript.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ fun main() {
4848
println("[event 2] Node '${event.node.name}' has stats, current players: ${event.playingPlayers}/${event.players}")
4949
}
5050

51-
client.on<EmittedEvent<*>>()
51+
client.on<EmittedEvent>()
5252
.subscribe { event ->
5353
if (event is TrackStartEvent) {
5454
println("Is a track start event!")

testbot/src/main/java/me/duncte123/testbot/Main.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dev.arbjerg.lavalink.client.loadbalancing.builtin.VoiceRegionPenaltyProvider;
66
import dev.arbjerg.lavalink.libraries.jda.JDAVoiceUpdateListener;
77
import net.dv8tion.jda.api.JDABuilder;
8+
import net.dv8tion.jda.api.entities.channel.unions.AudioChannelUnion;
89
import net.dv8tion.jda.api.requests.GatewayIntent;
910
import net.dv8tion.jda.api.utils.cache.CacheFlag;
1011
import org.slf4j.Logger;
@@ -15,6 +16,8 @@
1516
public class Main {
1617
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
1718

19+
private static final int SESSION_INVALID = 4006;
20+
1821
public static void main(String[] args) throws InterruptedException {
1922
final var token = System.getenv("BOT_TOKEN");
2023
final LavalinkClient client = new LavalinkClient(Helpers.getUserIdFromToken(token));
@@ -24,15 +27,35 @@ public static void main(String[] args) throws InterruptedException {
2427
registerLavalinkListeners(client);
2528
registerLavalinkNodes(client);
2629

27-
JDABuilder.createDefault(token)
30+
final var jda = JDABuilder.createDefault(token)
2831
.setVoiceDispatchInterceptor(new JDAVoiceUpdateListener(client))
2932
.enableIntents(GatewayIntent.GUILD_VOICE_STATES)
3033
.enableCache(CacheFlag.VOICE_STATE)
3134
.addEventListeners(new JDAListener(client))
3235
.build()
3336
.awaitReady();
34-
}
3537

38+
// Got a lot of 4006 closecodes? Try this "fix"
39+
client.on(WebSocketClosedEvent.class).subscribe((event) -> {
40+
if (event.getCode() == SESSION_INVALID) {
41+
final var guildId = event.getGuildId();
42+
final var guild = jda.getGuildById(guildId);
43+
44+
if (guild == null) {
45+
return;
46+
}
47+
48+
final var connectedChannel = guild.getSelfMember().getVoiceState().getChannel();
49+
50+
// somehow
51+
if (connectedChannel == null) {
52+
return;
53+
}
54+
55+
jda.getDirectAudioController().reconnect(connectedChannel);
56+
}
57+
});
58+
}
3659

3760

3861
private static void registerLavalinkNodes(LavalinkClient client) {

0 commit comments

Comments
 (0)