Modelling of ephemeral topics. #2355
Answered
by
ValdemarGr
ValdemarGr
asked this question in
Q&A
-
I have narrowed a problem I am working on to the following points;
The code turned out pretty complicated and I am not sure it is even entirely safe. import fs2.concurrent.Topic
import fs2.Stream
import cats.effect.concurrent.Deferred
import cats.effect.concurrent.Ref
import cats.effect.Concurrent
import cats.effect.Sync
import cats.data.StateT
import scala.collection.concurrent.TrieMap
import java.util.UUID
import cats.effect.concurrent.Semaphore
import fs2.concurrent.SignallingRef
import cats.effect.Async
import cats.effect.IO
import cats.effect.ContextShift
import cats.effect.Timer
sealed trait EntityState[A] {
type MessageTopic = Topic[IO, Message]
final case class Message(
currentValue: A,
)
final case class StateMap(
state: TrieMap[UUID, Deferred[IO, Either[Throwable, MessageTopic]]],
sem: Semaphore[IO]
)
def get(id: UUID): IO[Message]
import cats.implicits._
import cats.effect.syntax._
def getCached(
id: UUID,
sm: StateMap
)(implicit cs: ContextShift[IO], conc: Concurrent[IO], sync: Sync[IO], timer: Timer[IO]): IO[MessageTopic] =
(sm.sem withPermit (sync.delay(sm.state.get(id)) flatMap {
case Some(x) => (x.get >>= IO.fromEither).start
case None =>
import scala.concurrent.duration._
val rem = sync.delay(sm.state.remove(id)) *> IO.unit
def tryKill(t: Topic[IO, Message]): Stream[IO, Unit] =
Stream.eval(sm.sem.acquire) ++ t.subscribers.head.flatMap { i =>
val maybeRem = (if (i == 0) Stream.eval(rem) else Stream.empty)
val maybeContiue = (if (i == 0) Stream.empty else timeout(t))
maybeRem ++ Stream.eval(sm.sem.release) ++ maybeContiue
}
def timeout(t: Topic[IO, Message]): Stream[IO, Unit] =
t.subscribers.dropWhile(_ > 0).head.as(()) ++ Stream.fixedDelay(1 minute) ++
tryKill(t)
def startWithDeferred(d: Deferred[IO, Either[Throwable, MessageTopic]]) =
(get(id) >>= Topic[IO, Message]).attemptTap {
case r @ Right(_) => timeout(r.value).compile.drain.start *> d.complete(r)
case l @ Left(_) => sm.sem withPermit ((rem, d.complete(l)).parTupled)
}
for {
d <- Deferred[IO, Either[Throwable, MessageTopic]]
_ <- sync.delay(sm.state.update(id, d))
f <- startWithDeferred(d).start
} yield f
})).flatMap(_.join)
} I was hoping someone more experienced could give me some pointers in the right direction. Thank you. |
Beta Was this translation helpful? Give feedback.
Answered by
ValdemarGr
Apr 14, 2021
Replies: 1 comment
-
I ended up implementing it with import cats.effect._
import cats.effect.concurrent.Ref
import cats.effect.concurrent.Deferred
import cats.data.OptionT
import java.util.UUID
import cats.Monad
abstract class ManagedRefMap[F[_]: Concurrent, K, V] {
import ManagedRefMap._
def alloc: K => F[V]
val state: State[F, K, V]
import cats.implicits._
def get(id: K) =
state.get.map(_.get(id)).flatMap[Option[V]] {
case None => Concurrent[F].pure(None)
case Some(v) => v.value.get.flatMap(Concurrent[F].fromEither).map(x => Some(x))
}
def use(id: K): Resource[F, V] =
useUnderlying(id, None).map{ case (v, _) => v }
def useInitial(id: K, initial: F[V]): Resource[F, (V, AllocationResult)] =
useUnderlying(id, Some(initial))
def useUnderlying(id: K, initial: Option[F[V]]): Resource[F, (V, AllocationResult)] = {
val valueOrAllocated = for {
d <- Deferred[F, EffectResult[V]]
uuid <- Concurrent[F].delay(UUID.randomUUID)
v <- state.modify { m =>
m.get(id) match {
case Some(value) =>
val nv = id -> value.copy(refs = value.refs + 1)
(m + nv, (value.value, value.instanceId, Noop))
case None =>
(m + (id -> ManagedValue(d, uuid, 1)), (d, uuid, AddedNewValue))
}
}
} yield v
def remMaybeInstance(instanceId: UUID, force: Boolean = false) =
state.update { m =>
m.get(id) match {
case Some(value) if instanceId == value.instanceId =>
if (force || value.refs <= 1) {
m - id
} else {
m + (id -> value.copy(refs = value.refs - 1))
}
case None =>
m
}
}
val er: F[(EffectResult[(V, AllocationResult)], UUID)] = for {
v <- valueOrAllocated
(d, uuid, op) = v
av <- op match {
case Noop =>
d.get.map(v => v.map(x => x -> NoAllocation))
case AddedNewValue =>
for {
e <- (initial.getOrElse(alloc(id))).attempt
u <- d.complete(e)
_ <- if (e.isLeft) remMaybeInstance(uuid, force = true) else Concurrent[F].unit
} yield e.map(v => v -> Allocated)
}
} yield (av, uuid)
Resource
.make(er) { case (_, id) => remMaybeInstance(id) }
.map { case (v, _) => v }
.evalMap(Concurrent[F].fromEither)
}
}
object ManagedRefMap {
sealed trait AllocationResult
case object NoAllocation extends AllocationResult
case object Allocated extends AllocationResult
sealed trait StateInsert
case object AddedNewValue extends StateInsert
case object Noop extends StateInsert
type EffectResult[V] = Either[Throwable, V]
type DefV[F[_], V] = Deferred[F, EffectResult[V]]
final case class ManagedValue[F[_], V](
value: DefV[F, V],
instanceId: UUID,
refs: Long
)
type State[F[_], K, V] = Ref[F, Map[K, ManagedValue[F, V]]]
import cats.implicits._
def apply[F[_]: Concurrent, K, V](allocate: K => F[V]) = Ref.of[F, Map[K, ManagedValue[F, V]]](Map.empty).map { r =>
new ManagedRefMap[F, K, V] {
override def alloc: K => F[V] = allocate
override val state: State[F, K, V] = r
}
}
} I also wrote a test for it if it can help anyone. import munit.CatsEffectSuite
import cats.effect.IO
import cats.effect.concurrent.Deferred
import cats.effect.concurrent.Ref
class ManagedRefMapTest extends CatsEffectSuite {
import cats.implicits._
def allocatedSharedMap(name: String, f: String => IO[String])(implicit loc: munit.Location): IO[ManagedRefMap[IO, String, String]] = {
val mrfd = Deferred.unsafe[IO, ManagedRefMap[IO, String, String]]
val mrf = mrfd.get
test(name) {
ManagedRefMap[IO, String, String](f) >>= mrfd.complete
}
mrfd.get
}
val mrf = allocatedSharedMap("should allocate an immediate shared ManagedRefMap", x => IO(x))
val key = "key"
def checkKey(exp: Option[String])(implicit loc: munit.Location) =
assertIO(mrf flatMap (_.get(key)), exp)
val use = mrf flatMap (_.use(key).allocated)
test(s"ref map should be empty for key '$key'") {
checkKey(None)
}
val releasers = Ref.unsafe[IO, List[IO[Unit]]](Nil)
val reset = releasers.modify(xs => (Nil, xs.parSequence)).flatten
def add(rel: IO[Unit]) = releasers.update(rel :: _)
test(s"ref map should push a value when a useage is requsted") {
val instance: IO[(String, IO[Unit])] = mrf flatMap (_.use(key).allocated)
instance.flatMap {
case (str, release) =>
add(release) as assertEquals(str, key)
}
}
test(s"ref map should now contain the key '$key'") {
checkKey(Some(key))
}
test(s"ref map should not release if another user allocates") {
use.flatMap {
case (str, release) =>
reset *> add(release) *> checkKey(Some(key))
}
}
test(s"ref map should be empty once final holder releases") {
checkKey(Some(key)) *> reset *> checkKey(None)
}
val blocker = Deferred.unsafe[IO, Boolean]
val blockedFailingMrf = allocatedSharedMap("should allocate a failing shared ManagedRefMap",
x => blocker.get *> IO.raiseError(new Exception(s"failed with k $x")))
test(s"should allocate two instances of $key and then fail them both") {
val getter = blockedFailingMrf.flatMap(_.use(key).allocated).attempt
val bothFail = (getter, getter).parMapN{
case (r1, r2) =>
assert(clue(r1).isLeft && clue(r2).isLeft)
}
(bothFail, blocker.complete(true)).parTupled
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
ValdemarGr
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I ended up implementing it with
Ref
andDeferred
and lease counting.