Skip to content

Commit

Permalink
Merge pull request #46 from davenverse/genericConnection
Browse files Browse the repository at this point in the history
Make Connection into an abstracted trait
  • Loading branch information
ChristopherDavenport authored Apr 17, 2022
2 parents de010ef + 171f33f commit 293c825
Showing 1 changed file with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,59 @@ import _root_.io.chrisdavenport.rediculous.cluster.ClusterCommands.ClusterSlots
import fs2.io.net.SocketGroupCompanionPlatform
import scodec.bits.ByteVector

sealed trait RedisConnection[F[_]]
trait RedisConnection[F[_]]{
def runRequest(
inputs: Chunk[NonEmptyList[ByteVector]],
key: Option[ByteVector]
): F[Chunk[Resp]]
}
object RedisConnection{

private[rediculous] case class Queued[F[_]](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]
private[rediculous] case class PooledConnection[F[_]](
private[rediculous] case class Queued[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Resp)]], usePool: Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, resp)))).flatMap{ c =>
queue.offer(c.map(_._2)) >> {
val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get }
val y: F[Chunk[Resp]] = x.flatMap(_.sequence.liftTo[F])
y
}
}
}
}
private[rediculous] case class PooledConnection[F[_]: Concurrent](
pool: KeyPool[F, Unit, (Socket[F], F[Unit])]
) extends RedisConnection[F]
) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)
Functor[KeyPool[F, Unit, *]].map(pool)(_._1).take(()).use{
m => withSocket(m.value).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
}.rethrow
}
}

private[rediculous] case class DirectConnection[F[_]](socket: Socket[F]) extends RedisConnection[F]
private[rediculous] case class DirectConnection[F[_]: Concurrent](socket: Socket[F]) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)
withSocket(socket)
}
}

private[rediculous] case class Cluster[F[_]](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]
private[rediculous] case class Cluster[F[_]: Concurrent](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c =>
queue.offer(c.map(_._2)) >> {
c.traverse(_._1.get).flatMap(_.sequence.liftTo[F])
}
}
}
}

// Guarantees With Socket That Each Call Receives a Response
// Chunk must be non-empty but to do so incurs a penalty
Expand All @@ -49,32 +91,7 @@ object RedisConnection{
def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])(
inputs: Chunk[NonEmptyList[ByteVector]],
key: Option[ByteVector]
): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)

connection match {
case PooledConnection(pool) => Functor[KeyPool[F, Unit, *]].map(pool)(_._1).take(()).use{
m => withSocket(m.value).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
}.rethrow
case DirectConnection(socket) => withSocket(socket)
case Queued(queue, _) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, resp)))).flatMap{ c =>
queue.offer(c.map(_._2)) >> {
val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get }
val y: F[Chunk[Resp]] = x.flatMap(_.sequence.liftTo[F])
y
}
}
case Cluster(queue, _, _) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c =>
queue.offer(c.map(_._2)) >> {
c.traverse(_._1.get).flatMap(_.sequence.liftTo[F])
}
}
}
}
): F[Chunk[Resp]] = connection.runRequest(inputs, key)

def toNel[F[_]: ApplicativeThrow](chunk: Chunk[Resp]): F[NonEmptyList[Resp]] =
chunk.toNel.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))
Expand Down Expand Up @@ -123,8 +140,8 @@ object RedisConnection{

class DirectConnectionBuilder[F[_]: Concurrent] private[RedisConnection](
private val sg: SocketGroup[F],
private val host: Host,
private val port: Port,
val host: Host,
val port: Port,
private val tlsContext: Option[TLSContext[F]],
private val tlsParameters: TLSParameters
) { self =>
Expand Down Expand Up @@ -168,8 +185,8 @@ object RedisConnection{

class PooledConnectionBuilder[F[_]: Async] private[RedisConnection] (
private val sg: SocketGroup[F],
private val host: Host,
private val port: Port,
val host: Host,
val port: Port,
private val tlsContext: Option[TLSContext[F]],
private val tlsParameters: TLSParameters
) { self =>
Expand Down Expand Up @@ -219,8 +236,8 @@ object RedisConnection{

class QueuedConnectionBuilder[F[_]: Async] private[RedisConnection](
private val sg: SocketGroup[F],
private val host: Host,
private val port: Port,
val host: Host,
val port: Port,
private val tlsContext: Option[TLSContext[F]],
private val tlsParameters: TLSParameters,
private val maxQueued: Int,
Expand Down Expand Up @@ -320,8 +337,8 @@ object RedisConnection{

class ClusterConnectionBuilder[F[_]: Async] private[RedisConnection] (
private val sg: SocketGroup[F],
private val host: Host,
private val port: Port,
val host: Host,
val port: Port,
private val tlsContext: Option[TLSContext[F]],
private val tlsParameters: TLSParameters,
private val maxQueued: Int,
Expand Down

0 comments on commit 293c825

Please sign in to comment.