Skip to content

Commit

Permalink
Merge pull request #43 from davenverse/binaryCommands
Browse files Browse the repository at this point in the history
Switch to fully binary command support
  • Loading branch information
ChristopherDavenport authored Mar 10, 2022
2 parents 34a7b40 + 3297a19 commit e076007
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import cats.data.{NonEmptyList => NEL}
import RedisProtocol._
import _root_.io.chrisdavenport.rediculous.implicits._
import scala.collection.immutable.Nil
import RedisCtx.syntax.all._
import scodec.bits.ByteVector

object RedisCommands {

Expand Down Expand Up @@ -141,6 +143,16 @@ object RedisCommands {
RedisCtx[F].keyed(key, NEL("SET", key.encode :: value.encode :: ex ::: px ::: condition ::: keepTTL))
}



def setBV[F[_]: RedisCtx](key: ByteVector, value: ByteVector, setOpts: SetOpts = SetOpts.default): F[Option[Status]] = {
val ex = setOpts.setSeconds.toList.flatMap(l => List("EX", l.encode)).map(toBV)
val px = setOpts.setMilliseconds.toList.flatMap(l => List("PX", l.encode)).map(toBV)
val condition = setOpts.setCondition.toList.map(_.encode).map(toBV)
val keepTTL = Alternative[List].guard(setOpts.keepTTL).as("KEEPTTL").map(toBV)
RedisCtx[F].keyedBV(key, NEL(toBV("SET"), key :: value :: ex ::: px ::: condition ::: keepTTL))
}

final case class ZAddOpts(
condition: Option[Condition],
change: Boolean,
Expand Down Expand Up @@ -550,6 +562,9 @@ object RedisCommands {
def get[F[_]: RedisCtx](key: String): F[Option[String]] =
RedisCtx[F].keyed(key, NEL.of("GET", key.encode))

def getBV[F[_]: RedisCtx](key: ByteVector): F[Option[ByteVector]] =
RedisCtx[F].keyedBV(key, NEL.of(toBV("GET"), key))

def getrange[F[_]: RedisCtx](key: String, start: Long, end: Long): F[String] =
RedisCtx[F].keyed(key, NEL.of("GETRANGE", key.encode, start.encode, end.encode))

Expand Down Expand Up @@ -738,4 +753,5 @@ object RedisCommands {
def publish[F[_]: RedisCtx](channel: String, message: String): F[Int] =
RedisCtx[F].unkeyed[Int](cats.data.NonEmptyList.of("PUBLISH", channel, message))

private def toBV(s: String): ByteVector = ByteVector.encodeUtf8(s).fold(throw _, identity(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object RedisConnection{

private[rediculous] case class DirectConnection[F[_]](socket: Socket[F]) extends RedisConnection[F]

private[rediculous] case class Cluster[F[_]](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[String], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]
private[rediculous] case class Cluster[F[_]](queue: Queue[F, Chunk[(Either[Throwable, Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]], slots: F[ClusterSlots], usePool: (Host, Port) => Resource[F, Managed[F, Socket[F]]]) extends RedisConnection[F]

// Guarantees With Socket That Each Call Receives a Response
// Chunk must be non-empty but to do so incurs a penalty
Expand All @@ -47,8 +47,8 @@ object RedisConnection{
}

def runRequestInternal[F[_]: Concurrent](connection: RedisConnection[F])(
inputs: Chunk[NonEmptyList[String]],
key: Option[String]
inputs: Chunk[NonEmptyList[ByteVector]],
key: Option[ByteVector]
): F[Chunk[Resp]] = {
val chunk = Chunk.seq(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)
Expand All @@ -66,7 +66,7 @@ object RedisConnection{
val x: F[Chunk[Either[Throwable, Resp]]] = c.traverse{ case (d, _) => d.get }
val y: F[Chunk[Resp]] = x.flatMap(_.sequence.liftTo[F])
y
}
}
}
case Cluster(queue, _, _) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map(d => (d, ({(e: Either[Throwable, Resp]) => d.complete(e).void}, key, None, 0, resp)))).flatMap{ c =>
queue.offer(c.map(_._2)) >> {
Expand All @@ -83,10 +83,10 @@ object RedisConnection{
chunk.head.liftTo[F](RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))

// Can Be used to implement any low level protocols.
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[String], key: Option[String]): F[Either[Resp, A]] =
def runRequest[F[_]: Concurrent, A: RedisResult](connection: RedisConnection[F])(input: NonEmptyList[ByteVector], key: Option[ByteVector]): F[Either[Resp, A]] =
runRequestInternal(connection)(Chunk.singleton(input), key).flatMap(head[F]).map(resp => RedisResult[A].decode(resp))

def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[String], key: Option[String]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) =>
def runRequestTotal[F[_]: Concurrent, A: RedisResult](input: NonEmptyList[ByteVector], key: Option[ByteVector]): Redis[F, A] = Redis(Kleisli{(connection: RedisConnection[F]) =>
runRequest(connection)(input, key).flatMap{
case Right(a) => a.pure[F]
case Left(e@Resp.Error(_)) => ApplicativeError[F, Throwable].raiseError[A](e)
Expand Down Expand Up @@ -409,7 +409,7 @@ object RedisConnection{
.flatMap(s => Clock[F].realTime.map(_.toMillis).flatMap(now => refTopology.set((s,now))))
}
)
queue <- Resource.eval(Queue.bounded[F, Chunk[(Either[Throwable,Resp] => F[Unit], Option[String], Option[(Host, Port)], Int, Resp)]](maxQueued))
queue <- Resource.eval(Queue.bounded[F, Chunk[(Either[Throwable,Resp] => F[Unit], Option[ByteVector], Option[(Host, Port)], Int, Resp)]](maxQueued))
cluster = Cluster(queue, refTopology.get.map(_._1), {case(host, port) => keypool.take((host, port)).map(_.map(_._1))})
_ <-
Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk =>
Expand Down Expand Up @@ -450,7 +450,7 @@ object RedisConnection{
serverRedirect match {
case s@Some(_) => // This is a Special One Off, Requires a Redirect
// Deferred[F, Either[Throwable, Resp]].flatMap{d => // No One Cares About this Callback
val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of("ASKING"))) // Never Repeat Asking
val asking = ({(_: Either[Throwable, Resp]) => Applicative[F].unit}, key, s, 6, Resp.renderRequest(NonEmptyList.of(ByteVector.encodeAscii("ASKING").fold(throw _, identity(_))))) // Never Repeat Asking
val repeat = (toSet, key, s, retries + 1, initialCommand)
val chunk = Chunk(asking, repeat)
cluster.queue.tryOffer(chunk) // Offer To Have it reprocessed.
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/scala/io/chrisdavenport/rediculous/RedisCtx.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.chrisdavenport.rediculous
import cats.data.NonEmptyList
import cats.effect.Concurrent
import scala.annotation.implicitNotFound
import scodec.bits.ByteVector

/**
* RedisCtx is the Context in Which RedisOperations operate.
Expand All @@ -14,18 +15,38 @@ If you are leveraging a custom context not provided by rediculous,
please consult your library documentation.
""")
trait RedisCtx[F[_]]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): F[A]
def unkeyed[A: RedisResult](command: NonEmptyList[String]): F[A]
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): F[A]
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): F[A]
}

object RedisCtx {

def apply[F[_]](implicit ev: RedisCtx[F]): ev.type = ev

object syntax {
object all extends StringSyntax

trait StringSyntax {
implicit class RedisContext[F[_]](private val ctx: RedisCtx[F]){
private def encodeUnsafe(s: String): ByteVector = ByteVector.encodeUtf8(s).fold(throw _, identity(_))
// UTF8 String
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): F[A] = {
val k = encodeUnsafe(key)
val c = command.map(encodeUnsafe)
ctx.keyedBV(k, c)
}
def unkeyed[A: RedisResult](command: NonEmptyList[String]): F[A] = {
val c = command.map(encodeUnsafe(_))
ctx.unkeyedBV(c)
}
}
}
}

implicit def redis[F[_]: Concurrent]: RedisCtx[Redis[F, *]] = new RedisCtx[Redis[F, *]]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): Redis[F,A] =
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): Redis[F,A] =
RedisConnection.runRequestTotal(command, Some(key))
def unkeyed[A: RedisResult](command: NonEmptyList[String]): Redis[F, A] =
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): Redis[F, A] =
RedisConnection.runRequestTotal(command, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats._
import cats.implicits._
import cats.data._
import cats.effect._
import scodec.bits.ByteVector

/**
* For When you don't trust automatic pipelining.
Expand All @@ -21,15 +22,15 @@ final case class RedisPipeline[A](value: RedisTransaction.RedisTxState[RedisTran
object RedisPipeline {

implicit val ctx: RedisCtx[RedisPipeline] = new RedisCtx[RedisPipeline]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): RedisPipeline[A] =
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): RedisPipeline[A] =
RedisPipeline(RedisTransaction.RedisTxState{for {
s1 <- State.get[(Int, List[NonEmptyList[String]], Option[String])]
s1 <- State.get[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector])]
(i, base, value) = s1
_ <- State.set((i + 1, command :: base, value.orElse(Some(key))))
} yield RedisTransaction.Queued(l => RedisResult[A].decode(l(i)))})

def unkeyed[A: RedisResult](command: NonEmptyList[String]): RedisPipeline[A] = RedisPipeline(RedisTransaction.RedisTxState{for {
out <- State.get[(Int, List[NonEmptyList[String]], Option[String])]
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): RedisPipeline[A] = RedisPipeline(RedisTransaction.RedisTxState{for {
out <- State.get[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector])]
(i, base, value) = out
_ <- State.set((i + 1, command :: base, value))
} yield RedisTransaction.Queued(l => RedisResult[A].decode(l(i)))})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import _root_.io.chrisdavenport.rediculous.implicits._
import org.typelevel.keypool.Reusable
import scodec.bits._
import java.nio.charset.StandardCharsets
import RedisCtx.syntax.all._

/**
* A RedisPubSub Represent an connection or group of connections
Expand Down Expand Up @@ -170,7 +171,7 @@ object RedisPubSub {
def nonMessages(cb: RedisPubSub.PubSubReply => F[Unit]): F[Unit] = onNonMessage.set(cb)

private def encodeResp(nel: NonEmptyList[String]): F[Chunk[Byte]] = {
val resp = Resp.renderRequest(nel)
val resp = Resp.renderRequest(nel.map(ByteVector.encodeUtf8(_).fold(throw _, identity(_))))
Resp.CodecUtils.codec.encode(resp)
.toEither
.leftMap(err => new RuntimeException(s"Encoding Error - $err"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.chrisdavenport.rediculous

import cats._
import cats.implicits._
import scodec.bits.ByteVector

trait RedisResult[+A]{
def decode(resp: Resp): Either[Resp, A]
Expand All @@ -27,6 +28,13 @@ object RedisResult extends RedisResultLowPriority{
}
}

implicit val bytevector: RedisResult[ByteVector] = new RedisResult[ByteVector] {
def decode(resp: Resp): Either[Resp,ByteVector] = resp match {
case Resp.BulkString(Some(value)) => value.asRight
case otherwise => otherwise.asLeft
}
}

implicit def option[A: RedisResult]: RedisResult[Option[A]] = new RedisResult[Option[A]] {
def decode(resp: Resp): Either[Resp,Option[A]] = resp match {
case Resp.BulkString(None) => None.asRight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import cats.data._
import cats.effect._
import fs2.Chunk
import RedisProtocol._
import RedisCtx.syntax.all._
import scodec.bits.ByteVector


/**
Expand Down Expand Up @@ -43,15 +45,15 @@ final case class RedisTransaction[A](value: RedisTransaction.RedisTxState[RedisT
object RedisTransaction {

implicit val ctx: RedisCtx[RedisTransaction] = new RedisCtx[RedisTransaction]{
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): RedisTransaction[A] =
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): RedisTransaction[A] =
RedisTransaction(RedisTxState{for {
out <- State.get[(Int, List[NonEmptyList[String]], Option[String])]
out <- State.get[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector])]
(i, base, value) = out
_ <- State.set((i + 1, command :: base, value.orElse(Some(key))))
} yield Queued(l => RedisResult[A].decode(l(i)))})

def unkeyed[A: RedisResult](command: NonEmptyList[String]): RedisTransaction[A] = RedisTransaction(RedisTxState{for {
out <- State.get[(Int, List[NonEmptyList[String]], Option[String])]
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): RedisTransaction[A] = RedisTransaction(RedisTxState{for {
out <- State.get[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector])]
(i, base, value) = out
_ <- State.set((i + 1, command :: base, value))
} yield Queued(l => RedisResult[A].decode(l(i)))})
Expand Down Expand Up @@ -79,11 +81,11 @@ object RedisTransaction {
final case class Error(value: String) extends TxResult[Nothing]
}

final case class RedisTxState[A](value: State[(Int, List[NonEmptyList[String]], Option[String]), A])
final case class RedisTxState[A](value: State[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector]), A])
object RedisTxState {

implicit val m: Monad[RedisTxState] = new StackSafeMonad[RedisTxState]{
def pure[A](a: A): RedisTxState[A] = RedisTxState(Monad[State[(Int, List[NonEmptyList[String]], Option[String]), *]].pure(a))
def pure[A](a: A): RedisTxState[A] = RedisTxState(Monad[State[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector]), *]].pure(a))
def flatMap[A, B](fa: RedisTxState[A])(f: A => RedisTxState[B]): RedisTxState[B] = RedisTxState(
fa.value.flatMap(f.andThen(_.value))
)
Expand Down Expand Up @@ -123,9 +125,9 @@ object RedisTransaction {
val ((_, commandsR, key), Queued(f)) = tx.value.value.run((0, List.empty, None)).value
val commands = commandsR.reverse
val all = NonEmptyList(
NonEmptyList.of("MULTI"),
NonEmptyList.of(ByteVector.encodeAscii("MULTI").fold(throw _, identity(_))),
commands ++
List(NonEmptyList.of("EXEC"))
List(NonEmptyList.of(ByteVector.encodeAscii("EXEC").fold(throw _, identity(_))))
)
RedisConnection.runRequestInternal(c)(Chunk.seq(all.toList), key)
.flatMap{_.last match {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/io/chrisdavenport/rediculous/Resp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ object Resp {
// First Byte is *
case class Array(a: Option[List[Resp]]) extends Resp

def renderRequest(nel: NonEmptyList[String]): Resp = {
def renderRequest(nel: NonEmptyList[ByteVector]): Resp = {
Resp.Array(Some(
nel.toList.map(renderArg)
))
}

def renderArg(arg: String): Resp = {
Resp.BulkString(Some(ByteVector.encodeString(arg)(StandardCharsets.UTF_8).fold(throw _ , identity)))
def renderArg(arg: ByteVector): Resp = {
Resp.BulkString(Some(arg))
}

def toStringProtocol(resp: Resp)(implicit C: Charset = StandardCharsets.UTF_8) = {
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/io/chrisdavenport/rediculous/RespRaw.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package io.chrisdavenport.rediculous
import fs2.Chunk
import cats.data.NonEmptyList
import cats.Applicative
import scodec.bits.ByteVector

object RespRaw {

sealed trait Commands[A]


object Commands {
final case class SingleCommand[A](key: Option[String], command: NonEmptyList[String]) extends Commands[A]
final case class SingleCommand[A](key: Option[ByteVector], command: NonEmptyList[ByteVector]) extends Commands[A]
final case class CompositeCommands[A](commands: Chunk[SingleCommand[_]]) extends Commands[A]

implicit val rawRespCommandsCtx: RedisCtx[Commands] = new RedisCtx[Commands] {
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): Commands[A] =
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): Commands[A] =
SingleCommand(Some(key), command)

def unkeyed[A: RedisResult](command: NonEmptyList[String]): Commands[A] =
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): Commands[A] =
SingleCommand(None, command)
}
def combine[C](c1: Commands[_], c2: Commands[_]): Commands[C] = (c1, c2) match {
Expand All @@ -36,7 +37,7 @@ object RespRaw {


}
final case class RawPipeline[A](key: Option[String], commands: Chunk[NonEmptyList[String]]){
final case class RawPipeline[A](key: Option[ByteVector], commands: Chunk[NonEmptyList[ByteVector]]){

final def pipeline[F[_]](c: RedisConnection[F])(implicit F: cats.effect.Concurrent[F]): F[Chunk[Resp]] =
RedisConnection.runRequestInternal(c)(commands, key)
Expand All @@ -46,10 +47,10 @@ object RespRaw {

implicit val ctx: RedisCtx[RawPipeline] = {
new RedisCtx[RawPipeline] {
def keyed[A: RedisResult](key: String, command: NonEmptyList[String]): RawPipeline[A] =
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): RawPipeline[A] =
RawPipeline(Some(key), Chunk.singleton(command))

def unkeyed[A: RedisResult](command: NonEmptyList[String]): RawPipeline[A] =
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): RawPipeline[A] =
RawPipeline(None, Chunk.singleton(command))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.chrisdavenport.rediculous.cluster

import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import scodec.bits.ByteVector

/**
* XMODEM CRC 16 CRC16 - 16-bit Cyclic Redundancy Check (CRC16)
Expand All @@ -26,6 +27,14 @@ object CRC16 {
crc & 0xFFFF
}

def bytevector(bv: ByteVector): Int = {
var crc: Int = 0
bv.foreach{ b =>
crc = (crc << 8) ^ table(((crc >>> 8) ^ (b & 0xff)) & 0xff)
}
crc & 0xFFFF
}

private[CRC16] lazy val table : Array[Int] = Array(
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.data.NonEmptyList
import cats.effect._
import com.comcast.ip4s._
import scodec.bits.ByteVector
import RedisCtx.syntax.all._

object ClusterCommands {

Expand Down
Loading

0 comments on commit e076007

Please sign in to comment.