diff --git a/core/src/main/scala/com/evolutiongaming/catshelper/Serial.scala b/core/src/main/scala/com/evolutiongaming/catshelper/Serial.scala index e53eb8bc..144a16c6 100644 --- a/core/src/main/scala/com/evolutiongaming/catshelper/Serial.scala +++ b/core/src/main/scala/com/evolutiongaming/catshelper/Serial.scala @@ -1,6 +1,5 @@ package com.evolutiongaming.catshelper -import cats.data.{NonEmptyList => Nel} import cats.effect.Concurrent import cats.effect.concurrent.{Deferred, Ref} import cats.effect.implicits._ @@ -19,44 +18,29 @@ object Serial { def of[F[_]: Concurrent]: F[Serial[F]] = { Ref[F] - .of(none[List[F[Unit]]]) + .of(().pure[F]) .map { ref => new Serial[F] { - def apply[A](fa: F[A]) = { - - def start(f: F[Unit]): F[Unit] = { - Nel - .of(f) - .tailRecM[F, Unit] { fs => - fs - .reverse - .foldMapM(identity) - .productR { - ref.modify { - case Some(f :: fs) => (List.empty[F[Unit]].some, Nel(f, fs).asLeft[Unit]) - case _ => (none[List[F[Unit]]], ().asRight[Nel[F[Unit]]]) - } - } - } - .start - .void - } + def apply[A](fa: F[A]): F[F[A]] = { Concurrent[F].uncancelable { for { - d <- Deferred.uncancelable[F, Either[Throwable, A]] - f = fa.attempt.flatMap { a => d.complete(a) } - r <- ref.modify { - case Some(fs) => ((f :: fs).some, ().pure[F]) - case None => (List.empty[F[Unit]].some, start(f)) - } - _ <- r - } yield for { - a <- d.get - a <- a.liftTo[F] - } yield a + deferred <- Deferred.uncancelable[F, Either[Throwable, A]] + gate <- Deferred.uncancelable[F, Unit] + next = gate.get + prev <- ref.modify { prev => (next, prev) } + task = for { + _ <- prev + a <- fa.attempt + _ <- gate.complete(()) + a <- deferred.complete(a) + } yield a + _ <- task.start + } yield { + deferred.get.rethrow + } } } }