Skip to content

Commit

Permalink
Merge pull request #16 from scoquelin/poc-cache-client
Browse files Browse the repository at this point in the history
Add support for testing operations with LongCodec
  • Loading branch information
scoquelin authored Jul 30, 2024
2 parents f565661 + 88adfae commit 9c085a5
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object RedisCodec {
val Utf8WithValueAsLongCodec: RedisCodec[String, Long] = RedisCodec(LongCodec)
}

private object LongCodec extends JRedisCodec[String, Long] with ToByteBufEncoder[String, Long] {
private[arugula] object LongCodec extends JRedisCodec[String, Long] with ToByteBufEncoder[String, Long] {

import java.nio.ByteBuffer
import io.netty.buffer.ByteBuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.scoquelin.arugula
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import com.dimafeng.testcontainers.{DockerComposeContainer, ExposedService}
import com.github.scoquelin.arugula.BaseRedisCommandsIntegrationSpec._
import com.github.scoquelin.arugula.codec.RedisCodec
import com.github.scoquelin.arugula.config.LettuceRedisClientConfig
import io.lettuce.core.internal.HostAndPort
import io.lettuce.core.resource.{ClientResources, DnsResolvers, MappingSocketAddressResolver}
Expand All @@ -11,11 +12,11 @@ import org.scalatest.wordspec.AsyncWordSpecLike
import org.testcontainers.containers.wait.strategy.Wait

import java.io.File
import scala.concurrent.Future
import scala.jdk.FunctionConverters.enrichAsJavaFunction

trait BaseRedisCommandsIntegrationSpec extends AsyncWordSpecLike with TestContainerForAll with BeforeAndAfterEach {
var redisSingleNodeCommandsClient: RedisCommandsClient[String, String] = null
var redisClusterCommandsClient: RedisCommandsClient[String, String] = null
var cachedClients: CachedClients = null

override val containerDef: DockerComposeContainer.Def = {
DockerComposeContainer.Def(
Expand All @@ -30,11 +31,9 @@ trait BaseRedisCommandsIntegrationSpec extends AsyncWordSpecLike with TestContai
override def afterContainersStart(containers: Containers): Unit = {
super.afterContainersStart(containers)

redisSingleNodeCommandsClient = LettuceRedisCommandsClient(
LettuceRedisClientConfig(
host = containers.getServiceHost(RedisSingleNode, RedisSingleNodePort),
port = containers.getServicePort(RedisSingleNode, RedisSingleNodePort)
)
val redisSingleNodeConfig = LettuceRedisClientConfig(
host = containers.getServiceHost(RedisSingleNode, RedisSingleNodePort),
port = containers.getServicePort(RedisSingleNode, RedisSingleNodePort)
)

//Special hack to get cluster client topology refresh working since we need direct connectivity to cluster nodes see https://github.com/lettuce-io/lettuce-core/issues/941
Expand All @@ -49,33 +48,39 @@ trait BaseRedisCommandsIntegrationSpec extends AsyncWordSpecLike with TestContai
val resolver = MappingSocketAddressResolver.create(DnsResolvers.UNRESOLVED, mapHostAndPort.asJavaFunction)
val clientResources = ClientResources.builder.socketAddressResolver(resolver).build

redisClusterCommandsClient = LettuceRedisCommandsClient(
LettuceRedisClientConfig(
val redisClusterConfig = LettuceRedisClientConfig(
host = containers.getServiceHost(RedisClusterNode, RedisClusterPort),
port = containers.getServicePort(RedisClusterNode, RedisClusterPort),
clientResources = clientResources
)
)

cachedClients = RedisCommandCachedClients(redisSingleNodeConfig, redisClusterConfig)
}

override def afterEach(): Unit = {
//flushing both redis instances after each test
redisSingleNodeCommandsClient.flushAll
redisClusterCommandsClient.flushAll
cachedClients.getClient(RedisCodec.Utf8WithValueAsStringCodec, SingleNode).flushAll
cachedClients.getClient(RedisCodec.Utf8WithValueAsStringCodec, Cluster).flushAll
}

def withRedisSingleNode[K, V, A](runTest: RedisCommandsClient[String, String] => A): A = {
withContainers(_ => runTest(redisSingleNodeCommandsClient))
def withRedisSingleNode[K, V, A](codec: RedisCodec[K, V])(runTest: RedisCommandsClient[K, V] => Future[A]): Future[A] = {
withContainers(_ => {
runTest(cachedClients.getClient(codec, SingleNode))
})
}

def withRedisCluster[K, V, A](runTest: RedisCommandsClient[String, String] => A): A = {
withContainers(_ => runTest(redisClusterCommandsClient))
def withRedisCluster[K, V, A](codec: RedisCodec[K, V])(runTest: RedisCommandsClient[K, V] => Future[A]): Future[A] = {
withContainers(_ => {
runTest(cachedClients.getClient(codec, Cluster))
})
}

def withRedisSingleNodeAndCluster[K, V, A](runTest: RedisCommandsClient[String, String] => A): A = {
def withRedisSingleNodeAndCluster[K, V, A](codec: RedisCodec[K, V])(runTest: RedisCommandsClient[K, V] => Future[A]): Future[A] = {
withContainers(_ => {
runTest(redisSingleNodeCommandsClient)
runTest(redisClusterCommandsClient)
for {
_ <- runTest(cachedClients.getClient(codec, SingleNode))
outcome <- runTest(cachedClients.getClient(codec, Cluster))
} yield outcome
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.scoquelin.arugula

import com.github.scoquelin.arugula.codec.RedisCodec
import com.github.scoquelin.arugula.codec.LongCodec
import com.github.scoquelin.arugula.config.LettuceRedisClientConfig
import io.lettuce.core.codec.{StringCodec => JStringCodec, RedisCodec => JRedisCodec}

import scala.concurrent.ExecutionContext

sealed trait RedisConnectionType

case object SingleNode extends RedisConnectionType

case object Cluster extends RedisConnectionType

trait CachedClients {
def getClient[K, V](codec: RedisCodec[K, V], connectionType: RedisConnectionType): RedisCommandsClient[K, V]
}

private class RedisCommandCachedClients(redisSingleNodeClientWithStringValue: RedisCommandsClient[String, String],
redisSingleNodeClientWithLongValue: RedisCommandsClient[String, Long],
redisClusterClientWithStringValue: RedisCommandsClient[String, String],
redisClusterClientWithLongValue: RedisCommandsClient[String, Long]) extends CachedClients {

def getClient[K, V](codec: RedisCodec[K, V], connectionType: RedisConnectionType): RedisCommandsClient[K, V] = {
(codec, connectionType) match {
case (RedisCodec(JStringCodec.UTF8), SingleNode) => redisSingleNodeClientWithStringValue.asInstanceOf[RedisCommandsClient[K, V]]
case (RedisCodec(JStringCodec.UTF8), Cluster) => redisClusterClientWithStringValue.asInstanceOf[RedisCommandsClient[K, V]]
case (RedisCodec(LongCodec), SingleNode) => redisSingleNodeClientWithLongValue.asInstanceOf[RedisCommandsClient[K, V]]
case (RedisCodec(LongCodec), Cluster) => redisClusterClientWithLongValue.asInstanceOf[RedisCommandsClient[K, V]]
case (codec, connectionType) => throw new IllegalStateException(s"Codec $codec not supported for connection type $connectionType")
}
}
}

object RedisCommandCachedClients {
def apply(singleNodeConfig: LettuceRedisClientConfig, clusterConfig: LettuceRedisClientConfig)(implicit ec: ExecutionContext): CachedClients = {
val redisSingleNodeClientWithStringValue = LettuceRedisCommandsClient(singleNodeConfig, RedisCodec.Utf8WithValueAsStringCodec)
val redisSingleNodeClientWithLongValue = LettuceRedisCommandsClient(singleNodeConfig, RedisCodec.Utf8WithValueAsLongCodec)
val redisClusterClientWithStringValue = LettuceRedisCommandsClient(clusterConfig, RedisCodec.Utf8WithValueAsStringCodec)
val redisClusterClientWithLongValue = LettuceRedisCommandsClient(clusterConfig, RedisCodec.Utf8WithValueAsLongCodec)
new RedisCommandCachedClients(
redisSingleNodeClientWithStringValue,
redisSingleNodeClientWithLongValue,
redisClusterClientWithStringValue,
redisClusterClientWithLongValue
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package com.github.scoquelin.arugula

import scala.collection.immutable.ListMap

import com.github.scoquelin.arugula.codec.RedisCodec
import com.github.scoquelin.arugula.commands.RedisSortedSetAsyncCommands.{RangeLimit, ScoreWithValue, ZAddOptions, ZRange}
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._

import java.util.concurrent.TimeUnit
Expand All @@ -16,7 +18,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisBaseAsyncCommands" should {

"ping" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
for {
response <- client.ping
_ <- response shouldBe "PONG"
Expand All @@ -27,8 +29,44 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with

"leveraging RedisStringAsyncCommands" should {

"create, check, retrieve, and delete a key" in {
withRedisSingleNodeAndCluster { client =>
"create, check, retrieve, and delete a key holding a Long value" in {
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsLongCodec) { client =>
val key = randomKey()
val value = 1L

for {
_ <- client.set(key, value)
keyExists <- client.exists(key)
_ <- keyExists shouldBe true
existingKeyAdded <- client.setNx(key, value) //noop since key already exists
_ <- existingKeyAdded shouldBe false
newKeyAdded <- client.setNx("newKey", value)
_ <- newKeyAdded shouldBe true
keyValue <- client.get(key)
_ <- keyValue match {
case Some(expectedValue) => expectedValue shouldBe value
case None => fail("Expected value not found")
}
deleted <- client.del(key)
_ <- deleted shouldBe 1L
keyExists <- client.exists(key)
_ <- keyExists shouldBe false
valuePriorToSet <- client.getSet(key, value)
_ <- valuePriorToSet match {
case Some(_) => fail("Expected value not found")
case None => succeed
}
priorValue <- client.getDel(key)
_ <- priorValue match {
case Some(expectedValue) => expectedValue shouldBe value
case None => fail("Expected value not found")
}
} yield succeed
}
}

"create, check, retrieve, and delete a key holding a String value" in {
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey()
val value = "value"

Expand Down Expand Up @@ -63,8 +101,45 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}
}

"increment and decrement a key" in {
withRedisSingleNodeAndCluster { client =>
"increment and decrement a key holding a Long value" in {
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsLongCodec) { client =>
val key = randomKey("increment-key")
for {
_ <- client.incr(key)
value <- client.get(key)
_ <- value match {
case Some(expectedValue) => expectedValue shouldBe 1
case None => fail("Expected value not found")
}
_ <- client.incrBy(key, 5)
value <- client.get(key)
_ <- value match {
case Some(expectedValue) => expectedValue shouldBe 6
case None => fail("Expected value not found")
}
_ <- client.decr(key)

value <- client.get(key)
_ <- value match {
case Some(expectedValue) => expectedValue shouldBe 5
case None => fail("Expected value not found")
}

_ <- client.decrBy(key, 3)

value <- client.get(key)

_ <- value match {
case Some(expectedValue) => expectedValue shouldBe 2
case None => fail("Expected value not found")
}

} yield succeed
}
}

"increment and decrement a key holding a String value" in {
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("increment-key")
for {
_ <- client.incr(key)
Expand Down Expand Up @@ -110,7 +185,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}

"create, check, retrieve, and delete a key with expiration" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("expiring-key")
val value = "value"
val expireIn = 30.minutes
Expand Down Expand Up @@ -149,7 +224,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}

"support string range operations" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("range-key")
for {
lenResult <- client.append(key, "Hello")
Expand All @@ -171,7 +246,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}

"support multiple key operations" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val suffix = "{user1}"
val key1 = randomKey("k1") + suffix
val key2 = randomKey("k2") + suffix
Expand All @@ -195,7 +270,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisListAsyncCommands" should {

"create, retrieve, and delete values in a list" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("list-key")
val values = List("one", "two", "three")

Expand Down Expand Up @@ -242,7 +317,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisSortedSetAsyncCommands" should {

"create, retrieve, scan and delete values in a sorted set" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("sorted-set")

for {
Expand Down Expand Up @@ -291,7 +366,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisHashAsyncCommands" should {

"create, retrieve, and delete a field with a string value for a hash key" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("hash-key")
val field = "field"
val value = "value"
Expand Down Expand Up @@ -322,7 +397,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}

"create, retrieve, and delete a field with an integer value for a hash key" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("int-hash-key")
val field = "field"
val value = 1
Expand All @@ -349,7 +424,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisPipelineAsyncCommands" should {

"allow to send a batch of commands using pipeline" in {
withRedisSingleNodeAndCluster { client =>
withRedisSingleNodeAndCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey("pipeline-key")
val field = "field"
val value = "value"
Expand Down Expand Up @@ -380,7 +455,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
"leveraging RedisServerAsyncCommands" should {

"allow to get information from server and retrieve at least 199 keys (single-node)" in {
withRedisSingleNode { client =>
withRedisSingleNode(RedisCodec.Utf8WithValueAsStringCodec) { client =>
for {
info <- client.info
_ <- info.isEmpty shouldBe false
Expand All @@ -390,7 +465,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with
}

"allow to get information from server and retrieve at least 200 keys (cluster)" in {
withRedisCluster { client =>
withRedisCluster(RedisCodec.Utf8WithValueAsStringCodec) { client =>
for {
info <- client.info
_ <- info.isEmpty shouldBe false
Expand All @@ -401,7 +476,7 @@ class RedisCommandsIntegrationSpec extends BaseRedisCommandsIntegrationSpec with

"allow to flush all keys from all databases" in {
//single node only for now as it produces a random error "READONLY You can't write against a read only replica" (port 7005) with the Redis cluster
withRedisSingleNode { client =>
withRedisSingleNode(RedisCodec.Utf8WithValueAsStringCodec) { client =>
val key = randomKey()
val value = "value"
val field = "field"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ class LettuceRedisCommandsClientSpec extends wordspec.FixtureAsyncWordSpec with
}
}

"delete INCR command to Lettuce and lift result into a Future" in { testContext =>
"delegate INCR command to Lettuce and lift result into a Future" in { testContext =>
import testContext._

val expectedValue = 1L
Expand Down

0 comments on commit 9c085a5

Please sign in to comment.