Skip to content

Commit

Permalink
Merge pull request #27 from davenverse/connectionAcquisitionError
Browse files Browse the repository at this point in the history
Handle Errors on Connection Acquisition
  • Loading branch information
ChristopherDavenport authored Sep 23, 2021
2 parents e75ad75 + c9cf991 commit d798008
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,14 @@ object RedisConnection{
Stream.fromQueueUnterminatedChunk(queue).chunks.map{chunk =>
val s = if (chunk.nonEmpty) {
Stream.eval(
Functor[({type M[A] = KeyPool[F, Unit, A]})#M].map(keypool)(_._1).take(()).use{m =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
Functor[({type M[A] = KeyPool[F, Unit, A]})#M].map(keypool)(_._1).take(()).attempt.use{
case Right(m) =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
case l@Left(_) => l.rightCast[List[Resp]].pure[F]
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
Expand Down Expand Up @@ -241,12 +243,14 @@ object RedisConnection{
}.toSeq
).evalMap{
case (server, rest) =>
Functor[({type M[A] = KeyPool[F, (Host, Port), A]})#M].map(keypool)(_._1).take(server).use{m =>
val out = Chunk.seq(rest.map(_._5))
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
Functor[({type M[A] = KeyPool[F, (Host, Port), A]})#M].map(keypool)(_._1).take(server).attempt.use{
case Right(m) =>
val out = Chunk.seq(rest.map(_._5))
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
case l@Left(_) => l.rightCast[List[Resp]].pure[F]
}.flatMap{
case Right(n) =>
n.zipWithIndex.traverse_{
Expand Down

0 comments on commit d798008

Please sign in to comment.