diff --git a/src/main/scala/LilaHandler.scala b/src/main/scala/LilaHandler.scala index 387abb43..ffcd7d26 100644 --- a/src/main/scala/LilaHandler.scala +++ b/src/main/scala/LilaHandler.scala @@ -124,6 +124,8 @@ final class LilaHandler( _ match case t: TellRoomVersion => batcher.add(t.roomId, ClientIn.Versioned(t.json, t.version, t.troll)) + case TellRoomChat(roomId, version, troll, payload) => + batcher.add(roomId, ClientIn.Versioned(payload, version, troll)) case LilaOut.RoomIsPresent(reqId, roomId, userId) => lila.emit.study(LilaIn.ReqResponse(reqId, roomCrowd.isPresent(roomId, userId).toString)) case LilaBoot => roomBoot(_.idFilter.study, lila.emit.study) diff --git a/src/main/scala/RelayCrowd.scala b/src/main/scala/RelayCrowd.scala index 7ddeca90..b8353728 100644 --- a/src/main/scala/RelayCrowd.scala +++ b/src/main/scala/RelayCrowd.scala @@ -66,7 +66,7 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe ) ), BSONDocument("$sort" -> BSONDocument("order" -> 1)), - BSONDocument("$limit" -> 3), + BSONDocument("$limit" -> 5), BSONDocument("$project" -> BSONDocument("_id" -> true)) ) ) diff --git a/src/main/scala/util/Batcher.scala b/src/main/scala/util/Batcher.scala index a06c3914..b12e3a65 100644 --- a/src/main/scala/util/Batcher.scala +++ b/src/main/scala/util/Batcher.scala @@ -19,18 +19,25 @@ final class Batcher[Key, Elem, Batch]( private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity) def add(key: Key, elem: Elem): Unit = - val newBuffer = buffers.compute( + buffers.compute( key, (_, buffer) => val prev = Option(buffer) if prev.isEmpty then scheduler.scheduleOnce(timeout, () => emitAndRemove(key)) - Buffer( + val newBuffer = Buffer( append(prev.map(_.batch), elem), prev.fold(1)(_.counter + 1) ) + if newBuffer.counter >= maxBatchSize then + emit(key, newBuffer.batch) + null + else newBuffer ) - if newBuffer.counter >= maxBatchSize then emitAndRemove(key) private def emitAndRemove(key: Key): Unit = - Option(buffers.remove(key)).foreach: buffer => - emit(key, buffer.batch) + buffers.computeIfPresent( + key, + (_, buffer) => + emit(key, buffer.batch) + null + )