diff --git a/build.sbt b/build.sbt index 7f6515d7..6dcf636e 100644 --- a/build.sbt +++ b/build.sbt @@ -37,7 +37,7 @@ lazy val `lila-ws` = project .classifier(s"linux-$arch_"), ("io.netty" % s"netty-transport-native-kqueue" % nettyVersion) .classifier(s"osx-$arch_"), - "org.lichess" %% "scalalib-lila" % "11.3.2", + "org.lichess" %% "scalalib-lila" % "11.5.0", "org.lichess" %% "scalachess" % chessVersion, "org.lichess" %% "scalachess-play-json" % chessVersion, "org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion, diff --git a/src/main/scala/evalCache/EvalCacheApi.scala b/src/main/scala/evalCache/EvalCacheApi.scala index 0d9b8c90..d42bcdcc 100644 --- a/src/main/scala/evalCache/EvalCacheApi.scala +++ b/src/main/scala/evalCache/EvalCacheApi.scala @@ -1,6 +1,7 @@ package lila.ws package evalCache +import com.softwaremill.macwire.* import cats.syntax.all.* import chess.ErrorStr import chess.format.Fen @@ -14,13 +15,13 @@ import java.time.LocalDateTime import lila.ws.ipc.ClientIn import lila.ws.ipc.ClientOut.{ EvalGet, EvalGetMulti, EvalPut } -final class EvalCacheApi(mongo: Mongo)(using +final class EvalCacheApi(mongo: Mongo, settings: util.SettingStore)(using Executor, org.apache.pekko.actor.typed.Scheduler ): - private val truster = EvalCacheTruster(mongo) - private val upgrade = EvalCacheUpgrade() + private val truster = wire[EvalCacheTruster] + private val upgrade = wire[EvalCacheUpgrade] private val multi = EvalCacheMulti() import EvalCacheEntry.* @@ -72,8 +73,9 @@ final class EvalCacheApi(mongo: Mongo)(using depth = e.depth, by = user, trust = trust - ) - ).foreach(putTrusted(sri, user, _)) + ), + sri + ).foreach(putTrusted(user, _)) // reduce the number of evals stored and, // perhaps more importantly, distributed to subscribers @@ -104,7 +106,7 @@ final class EvalCacheApi(mongo: Mongo)(using if res.isDefined then mongo.evalCacheUsedNow(id) res - private def putTrusted(sri: Sri, user: User.Id, input: Input): Future[Unit] = + private def putTrusted(user: User.Id, input: Input): Future[Unit] = mongo.evalCacheColl.flatMap: c => validate(input).match case Left(error) => @@ -124,7 +126,7 @@ final class EvalCacheApi(mongo: Mongo)(using .one(entry) .recover(mongo.ignoreDuplicateKey) .map: _ => - afterPut(input, sri, entry) + afterPut(input, entry) case Some(oldEntry) => val entry = oldEntry.add(input.eval) if entry.similarTo(oldEntry) then Future.successful(()) @@ -132,13 +134,13 @@ final class EvalCacheApi(mongo: Mongo)(using c.update .one(BSONDocument("_id" -> entry.id), entry, upsert = true) .map: _ => - afterPut(input, sri, entry) + afterPut(input, entry) - private def afterPut(input: Input, sri: Sri, entry: EvalCacheEntry): Unit = + private def afterPut(input: Input, entry: EvalCacheEntry): Unit = cache.put(input.id, Future.successful(entry.some)) // todo: debounce upgrades in hot rooms - upgrade.onEval(input, sri) - multi.onEval(input, sri) + upgrade.onEval(input) + multi.onEval(input) private def validate(in: EvalCacheEntry.Input): Either[ErrorStr, Unit] = in.eval.pvs.traverse_ { pv => diff --git a/src/main/scala/evalCache/EvalCacheEntry.scala b/src/main/scala/evalCache/EvalCacheEntry.scala index 8049a555..5ecd1008 100644 --- a/src/main/scala/evalCache/EvalCacheEntry.scala +++ b/src/main/scala/evalCache/EvalCacheEntry.scala @@ -52,7 +52,7 @@ case class EvalCacheEntry( object EvalCacheEntry: - case class Input(id: Id, fen: Fen.Full, situation: Situation, eval: Eval) + case class Input(id: Id, fen: Fen.Full, situation: Situation, eval: Eval, sri: Sri) case class Eval(pvs: NonEmptyList[Pv], knodes: Knodes, depth: Depth, by: User.Id, trust: Trust): @@ -83,10 +83,10 @@ object EvalCacheEntry: def truncate = copy(moves = Moves.truncate(moves)) - def makeInput(variant: Variant, fen: Fen.Full, eval: Eval) = + def makeInput(variant: Variant, fen: Fen.Full, eval: Eval, sri: Sri) = Fen .read(variant, fen) .filter(_.playable(false)) .ifTrue(eval.looksValid) .map: situation => - Input(Id(situation), fen, situation, eval.truncatePvs) + Input(Id(situation), fen, situation, eval.truncatePvs, sri) diff --git a/src/main/scala/evalCache/EvalCacheMulti.scala b/src/main/scala/evalCache/EvalCacheMulti.scala index b429304e..094544c5 100644 --- a/src/main/scala/evalCache/EvalCacheMulti.scala +++ b/src/main/scala/evalCache/EvalCacheMulti.scala @@ -40,7 +40,7 @@ final private class EvalCacheMulti(using evals.compute(id, (_, prev) => Option(prev).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri))) expirableSris.put(sri) - def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit = + def onEval(input: EvalCacheEntry.Input): Unit = Option( evals.computeIfPresent( input.id, @@ -50,7 +50,7 @@ final private class EvalCacheMulti(using ) ).filter(_.depth == input.eval.depth) .foreach: eval => - val sris = eval.sris.filter(_ != fromSri) + val sris = eval.sris.filter(_ != input.sri) if sris.nonEmpty then val hit = EvalHitMulti: EvalCacheJsonHandlers.writeMultiHit(input.fen, input.eval) diff --git a/src/main/scala/evalCache/EvalCacheUpgrade.scala b/src/main/scala/evalCache/EvalCacheUpgrade.scala index 15861e6a..48285c2d 100644 --- a/src/main/scala/evalCache/EvalCacheUpgrade.scala +++ b/src/main/scala/evalCache/EvalCacheUpgrade.scala @@ -1,10 +1,10 @@ package lila.ws package evalCache +import java.util.concurrent.ConcurrentHashMap import chess.format.UciPath import play.api.libs.json.JsString - -import java.util.concurrent.ConcurrentHashMap +import scalalib.DebouncerFunction import lila.ws.ipc.ClientIn.EvalHit import lila.ws.ipc.ClientOut.EvalGet @@ -14,7 +14,9 @@ import lila.ws.util.ExpireCallbackMemo * by remembering the last evalGet of each socket member, * and listening to new evals stored. */ -final private class EvalCacheUpgrade(using +final private class EvalCacheUpgrade( + settings: util.SettingStore +)(using ec: Executor, scheduler: org.apache.pekko.actor.typed.Scheduler ): @@ -24,6 +26,11 @@ final private class EvalCacheUpgrade(using private val evals = ConcurrentHashMap[SetupId, EvalState](1024) private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 3 minutes, expire) + private def debouncerSetting = + settings.makeSetting[Boolean]("lila-ws.EvalCacheUpgrade.debouncerEnable", false) + + private val debouncer = DebouncerFunction[SetupId](scheduler.scheduleOnce(5.seconds, _), 64) + private val upgradeMon = Monitor.evalCache.single.upgrade def register(sri: Sri, e: EvalGet): Unit = @@ -42,27 +49,33 @@ final private class EvalCacheUpgrade(using ) expirableSris.put(sri) - def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit = + def onEval(input: EvalCacheEntry.Input): Unit = (1 to input.eval.multiPv.value).foreach: multiPv => val setupId = SetupId(input.id, MultiPv(multiPv)) - Option( - evals.computeIfPresent( - setupId, - (_, ev) => - if ev.depth >= input.eval.depth then ev - else ev.copy(depth = input.eval.depth) - ) - ).filter(_.depth == input.eval.depth) - .foreach: eval => - val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => Option(members.get(sri.value))) - if wms.nonEmpty then - val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen) - wms - .groupBy(_.path) - .map: (path, members) => - val hit = EvalHit(evalJson + ("path" -> JsString(path.value))) - members.foreach(m => Bus.publish(_.sri(m.sri), hit)) - upgradeMon.count.increment(wms.size) + if debouncerSetting.get() + then debouncer.push(setupId)(() => publishEval(setupId, input)) + else publishEval(setupId, input) + + private def publishEval(setupId: SetupId, input: EvalCacheEntry.Input) = + val newEvalState = Option: + evals.computeIfPresent( + setupId, + (_, stored) => + if stored.depth >= input.eval.depth then stored + else stored.copy(depth = input.eval.depth) + ) + newEvalState + .filter(_.depth == input.eval.depth) // ensure the new one from input + .foreach: eval => + val wms = eval.sris.withFilter(_ != input.sri).flatMap(sri => Option(members.get(sri.value))) + if wms.nonEmpty then + val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen) + wms + .groupBy(_.path) + .map: (path, members) => + val hit = EvalHit(evalJson + ("path" -> JsString(path.value))) + members.foreach(m => Bus.publish(_.sri(m.sri), hit)) + upgradeMon.count.increment(wms.size) private def expire(sri: Sri): Unit = Option(members.remove(sri.value)).foreach: m => diff --git a/src/main/scala/util/SettingStore.scala b/src/main/scala/util/SettingStore.scala index 5704032c..ce502a01 100644 --- a/src/main/scala/util/SettingStore.scala +++ b/src/main/scala/util/SettingStore.scala @@ -17,7 +17,7 @@ import reactivemongo.api.bson.* * db.setting.updateOne({_id:'dogs'},{$set:{value:50}}) */ -case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Option[A]])(using +final class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Option[A]])(using ec: Executor, scheduler: Scheduler ):