Skip to content

Commit

Permalink
Merge pull request #49 from syther-labs/fix-stream-continuous-subscri…
Browse files Browse the repository at this point in the history
…ption

Fix an error with continuous subscription
  • Loading branch information
ChristopherDavenport authored Apr 29, 2022
2 parents 293c825 + befd0a1 commit 04a2490
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 5 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / tlBaseVersion := "0.2" // your current series x.y
ThisBuild / tlBaseVersion := "0.3" // your current series x.y

ThisBuild / organization := "io.chrisdavenport"
ThisBuild / organizationName := "Christopher Davenport"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,22 @@ object RedisCommands {
RedisCtx[F].unkeyed(NEL("XREAD", block ::: count ::: noAck ::: streamPairs))
}

def xrange[F[_]: RedisCtx](stream: String, startOpt: Option[String] = None, endOpt: Option[String] = None, countOpt: Option[Int] = None): F[Option[List[StreamsRecord]]] = {
val start = List(startOpt.getOrElse("-"))
val end = List(endOpt.getOrElse("+"))
val count = countOpt.toList.flatMap(l => List("COUNT", l.encode))

RedisCtx[F].unkeyed(NEL("XRANGE", stream :: start ::: end ::: count))
}

def xrevrange[F[_]: RedisCtx](stream: String, endOpt: Option[String] = None, startOpt: Option[String] = None, countOpt: Option[Int] = None): F[Option[List[StreamsRecord]]] = {
val end = List(endOpt.getOrElse("+"))
val start = List(startOpt.getOrElse("-"))
val count = countOpt.toList.flatMap(l => List("COUNT", l.encode))

RedisCtx[F].unkeyed(NEL("XREVRANGE", stream :: end ::: start ::: count))
}

def xgroupcreate[F[_]: RedisCtx](stream: String, groupName: String, startId: String): F[Status] =
RedisCtx[F].unkeyed(NEL.of("XGROUP", "CREATE", stream, groupName, startId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ object RedisStream {
private val nextOffset: String => RedisCommands.StreamsRecord => StreamOffset =
key => msg => StreamOffset.From(key, msg.recordId)

private val offsetsByKey: List[RedisCommands.StreamsRecord] => Map[String, Option[StreamOffset]] =
list => list.groupBy(_.recordId).map { case (k, values) => k -> values.lastOption.map(nextOffset(k)) }
private val offsetsByKey: List[RedisCommands.XReadResponse] => Map[String, Option[StreamOffset]] =
list => list.groupBy(_.stream).map { case (k, values) => k -> values.flatMap(_.records).lastOption.map(nextOffset(k)) }

def read(keys: Set[String], initialOffset: String => StreamOffset, block: Duration, count: Option[Long]): Stream[F, RedisCommands.XReadResponse] = {
val initial = keys.map(k => k -> initialOffset(k)).toMap
Expand All @@ -50,7 +50,7 @@ object RedisStream {
(for {
offsets <- Stream.eval(ref.get)
list <- Stream.eval(xread(offsets.values.toSet, opts).run(connection)).flattenOption
newOffsets = offsetsByKey(list.flatMap(_.records)).collect { case (key, Some(value)) => key -> value }.toList
newOffsets = offsetsByKey(list).collect { case (key, Some(value)) => key -> value }.toList
_ <- Stream.eval(newOffsets.map { case (k, v) => ref.update(_.updated(k, v)) }.sequence)
result <- Stream.emits(list)
} yield result).repeat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ class RedisStreamSpec extends CatsEffectSuite {
}
}

test("consume messages from offset"){ //connection =>
val messages = fs2.Chunk(
RedisStream.XAddMessage("fee", List("zoom" -> "zad")),
RedisStream.XAddMessage("fee", List("bar" -> "baz"))
)
redisConnection().flatMap{connection =>

val rStream = RedisStream.fromConnection(connection)
rStream.append(messages) >>
rStream
.read(Set("fee"), (_ => RedisCommands.StreamOffset.From("fee", "0-0")), Duration.Zero, 100L.some)
.take(4)
.timeout(100.milli)
.handleErrorWith(_ => fs2.Stream.empty)
.compile
.toList

}.map{ resps =>
val records = resps.flatMap(_.records)
assertEquals(records.length, 2)
}
}
}


2 changes: 1 addition & 1 deletion examples/src/main/scala/StreamsExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object StreamRate {
}
}

object StreamProducerExample extends IOApp {
object StreamExample extends IOApp {
import StreamRate._

def randomMessage: IO[List[(String, String)]] = {
Expand Down

0 comments on commit 04a2490

Please sign in to comment.