1
1
package lila .ws
2
2
package evalCache
3
3
4
+ import java .util .concurrent .ConcurrentHashMap
5
+ import org .apache .pekko .actor .typed .Scheduler
4
6
import chess .format .Fen
5
7
import chess .variant .Variant
6
-
7
- import java .util .concurrent .ConcurrentHashMap
8
+ import scalalib .zeros .given
8
9
9
10
import lila .ws .ipc .ClientIn .EvalHitMulti
10
11
import lila .ws .ipc .ClientOut .EvalGetMulti
11
12
import lila .ws .util .ExpireCallbackMemo
13
+ import lila .ws .util .ExpireMemo
12
14
13
15
/* Compared to EvalCacheUpgrade, accepts multiple positions per member,
14
16
* only sends cp/mate
15
17
*/
16
- final private class EvalCacheMulti (using
17
- ec : Executor ,
18
- scheduler : org.apache.pekko.actor.typed.Scheduler
19
- ):
18
+ final private class EvalCacheMulti private (makeExpirableSris : (Sri => Unit ) => ExpireMemo [Sri ]):
20
19
import EvalCacheMulti .*
21
20
import EvalCacheUpgrade .{ EvalState , SriString }
22
21
23
- private val members = ConcurrentHashMap [SriString , WatchingMember ](4096 )
24
- private val evals = ConcurrentHashMap [Id , EvalState ](1024 )
25
- private val expirableSris = ExpireCallbackMemo [Sri ](scheduler, 1 minute, expire)
22
+ private val members = ConcurrentHashMap [SriString , WatchingMember ](4096 )
23
+ private val evals = ConcurrentHashMap [Id , EvalState ](1024 )
24
+ private val expirableSris : ExpireMemo [Sri ] = makeExpirableSris( expire)
26
25
27
- private val upgradeMon = Monitor .evalCache.multi.upgrade
28
26
29
27
def register (sri : Sri , e : EvalGetMulti ): Unit =
30
28
members
@@ -41,22 +39,24 @@ final private class EvalCacheMulti(using
41
39
expirableSris.put(sri)
42
40
43
41
def onEval (input : EvalCacheEntry .Input ): Unit =
44
- Option (
42
+ val sris = onEvalSrisToUpgrade(input)
43
+ if sris.nonEmpty then
44
+ val hit = EvalHitMulti :
45
+ EvalCacheJsonHandlers .writeMultiHit(input.fen, input.eval)
46
+ sris.foreach: sri =>
47
+ Bus .publish(_.sri(sri), hit)
48
+ upgradeMon.count.increment(sris.size)
49
+
50
+ private def onEvalSrisToUpgrade (input : EvalCacheEntry .Input ): Set [Sri ] =
51
+ val newEval = Option (
45
52
evals.computeIfPresent(
46
53
input.id,
47
54
(_, ev) =>
48
55
if ev.depth >= input.eval.depth then ev
49
56
else ev.copy(depth = input.eval.depth)
50
57
)
51
58
).filter(_.depth == input.eval.depth)
52
- .foreach: eval =>
53
- val sris = eval.sris.filter(_ != input.sri)
54
- if sris.nonEmpty then
55
- val hit = EvalHitMulti :
56
- EvalCacheJsonHandlers .writeMultiHit(input.fen, input.eval)
57
- sris.foreach: sri =>
58
- Bus .publish(_.sri(sri), hit)
59
- upgradeMon.count.increment(sris.size)
59
+ newEval.so(_.sris.filter(_ != input.sri))
60
60
61
61
private def expire (sri : Sri ): Unit =
62
62
Option (members.remove(sri.value)).foreach:
@@ -71,14 +71,19 @@ final private class EvalCacheMulti(using
71
71
else eval.copy(sris = newSris)
72
72
)
73
73
74
- scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
75
- upgradeMon.members.update(members.size)
76
- upgradeMon.evals.update(evals.size)
77
- upgradeMon.expirable.update(expirableSris.count)
78
-
79
74
private object EvalCacheMulti :
80
75
81
76
import EvalCacheUpgrade .*
82
77
83
78
case class WatchingMember (sri : Sri , variant : Variant , fens : List [Fen .Full ]):
84
79
def setups : List [Id ] = fens.flatMap(Id .from(variant, _))
80
+
81
+ private val upgradeMon = Monitor .evalCache.multi.upgrade
82
+
83
+ def withMonitoring ()(using ec : Executor , scheduler : Scheduler ): EvalCacheMulti =
84
+ val instance = EvalCacheMulti (expire => ExpireCallbackMemo [Sri ](1 minute, expire))
85
+ scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
86
+ upgradeMon.members.update(instance.members.size)
87
+ upgradeMon.evals.update(instance.evals.size)
88
+ upgradeMon.expirable.update(instance.expirableSris.count)
89
+ instance
0 commit comments