From 1289f86d99102f3eba8bf05ecad97db011d1decb Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Mon, 30 Dec 2024 21:57:25 +0100 Subject: [PATCH 1/3] also route chat messages through the study batcher since chat messages are versioned, having them bypass the batcher results in broken ordering, if other versioned messages are being batched while the chat message goes through --- src/main/scala/LilaHandler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/LilaHandler.scala b/src/main/scala/LilaHandler.scala index c875df97..ecc3ef42 100644 --- a/src/main/scala/LilaHandler.scala +++ b/src/main/scala/LilaHandler.scala @@ -125,6 +125,8 @@ final class LilaHandler( _ match case t: TellRoomVersion => batcher.add(t.roomId, ClientIn.Versioned(t.json, t.version, t.troll)) + case TellRoomChat(roomId, version, troll, payload) => + batcher.add(roomId, ClientIn.Versioned(payload, version, troll)) case LilaOut.RoomIsPresent(reqId, roomId, userId) => lila.emit.study(LilaIn.ReqResponse(reqId, roomCrowd.isPresent(roomId, userId).toString)) case LilaBoot => roomBoot(_.idFilter.study, lila.emit.study) From bb0c8aaf29f537d1fc33a71b0df5ebbe6be156c6 Mon Sep 17 00:00:00 2001 From: Niklas Fiekas Date: Mon, 30 Dec 2024 22:16:02 +0100 Subject: [PATCH 2/3] keep batch locked until emit --- src/main/scala/util/Batcher.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/scala/util/Batcher.scala b/src/main/scala/util/Batcher.scala index a06c3914..b12e3a65 100644 --- a/src/main/scala/util/Batcher.scala +++ b/src/main/scala/util/Batcher.scala @@ -19,18 +19,25 @@ final class Batcher[Key, Elem, Batch]( private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity) def add(key: Key, elem: Elem): Unit = - val newBuffer = buffers.compute( + buffers.compute( key, (_, buffer) => val prev = Option(buffer) if prev.isEmpty then scheduler.scheduleOnce(timeout, () => emitAndRemove(key)) - Buffer( + val newBuffer = Buffer( append(prev.map(_.batch), elem), prev.fold(1)(_.counter + 1) ) + if newBuffer.counter >= maxBatchSize then + emit(key, newBuffer.batch) + null + else newBuffer ) - if newBuffer.counter >= maxBatchSize then emitAndRemove(key) private def emitAndRemove(key: Key): Unit = - Option(buffers.remove(key)).foreach: buffer => - emit(key, buffer.batch) + buffers.computeIfPresent( + key, + (_, buffer) => + emit(key, buffer.batch) + null + ) From 12a8690dafcde6cef8378fcec86aa17bb3583d51 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Mon, 30 Dec 2024 23:53:42 +0100 Subject: [PATCH 3/3] monitor more relays helps with quick succession of blitz rounds --- src/main/scala/RelayCrowd.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/RelayCrowd.scala b/src/main/scala/RelayCrowd.scala index 7ddeca90..b8353728 100644 --- a/src/main/scala/RelayCrowd.scala +++ b/src/main/scala/RelayCrowd.scala @@ -66,7 +66,7 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe ) ), BSONDocument("$sort" -> BSONDocument("order" -> 1)), - BSONDocument("$limit" -> 3), + BSONDocument("$limit" -> 5), BSONDocument("$project" -> BSONDocument("_id" -> true)) ) )