Skip to content

Commit

Permalink
Merge branch 'master' of github.com:lichess-org/lila-ws
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Dec 31, 2024
2 parents 1a471d8 + 12a8690 commit e539b61
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions src/main/scala/LilaHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ final class LilaHandler(

_ match
case t: TellRoomVersion => batcher.add(t.roomId, ClientIn.Versioned(t.json, t.version, t.troll))
case TellRoomChat(roomId, version, troll, payload) =>
batcher.add(roomId, ClientIn.Versioned(payload, version, troll))
case LilaOut.RoomIsPresent(reqId, roomId, userId) =>
lila.emit.study(LilaIn.ReqResponse(reqId, roomCrowd.isPresent(roomId, userId).toString))
case LilaBoot => roomBoot(_.idFilter.study, lila.emit.study)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/RelayCrowd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ final private class RelayCrowd(roomCrowd: RoomCrowd, mongo: Mongo)(using ex: Exe
)
),
BSONDocument("$sort" -> BSONDocument("order" -> 1)),
BSONDocument("$limit" -> 3),
BSONDocument("$limit" -> 5),
BSONDocument("$project" -> BSONDocument("_id" -> true))
)
)
Expand Down
17 changes: 12 additions & 5 deletions src/main/scala/util/Batcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@ final class Batcher[Key, Elem, Batch](
private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity)

def add(key: Key, elem: Elem): Unit =
val newBuffer = buffers.compute(
buffers.compute(
key,
(_, buffer) =>
val prev = Option(buffer)
if prev.isEmpty then scheduler.scheduleOnce(timeout, () => emitAndRemove(key))
Buffer(
val newBuffer = Buffer(
append(prev.map(_.batch), elem),
prev.fold(1)(_.counter + 1)
)
if newBuffer.counter >= maxBatchSize then
emit(key, newBuffer.batch)
null
else newBuffer
)
if newBuffer.counter >= maxBatchSize then emitAndRemove(key)

private def emitAndRemove(key: Key): Unit =
Option(buffers.remove(key)).foreach: buffer =>
emit(key, buffer.batch)
buffers.computeIfPresent(
key,
(_, buffer) =>
emit(key, buffer.batch)
null
)

0 comments on commit e539b61

Please sign in to comment.