Skip to content

Commit

Permalink
Upgrade to Jedis 5.0
Browse files Browse the repository at this point in the history
Adapt to API changes in the Jedis 5.0 driver.
Fix bzPopMaxShouldWorkCorrectly() and bzPopMinShouldWorkCorrectly() tests in JedisClusterConnectionTests.

Jedis 5.0 changed the bzpopmax and bzpopmin Redis commands to no longer return an empty (Array)List internally when evaluating and popping from an empty sorted set. A NullPointerException will be thrown if either bzpopmax or bzpopmin commands are executd on an empty Redis sorted set in Jedis 5.0 (vs. Jedis 4.x):

Closes #2612
Original pull request: #2716
  • Loading branch information
jxblum authored and christophstrobl committed Oct 4, 2023
1 parent d63fd19 commit 2075633
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 143 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<xstream>1.4.20</xstream>
<pool>2.11.1</pool>
<lettuce>6.2.6.RELEASE</lettuce>
<jedis>4.4.5</jedis>
<jedis>5.0.1</jedis>
<multithreadedtc>1.01</multithreadedtc>
<netty>4.1.96.Final</netty>
<java-module-name>spring.data.redis</java-module-name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.springframework.data.redis.core.ScanCursor;
import org.springframework.data.redis.core.ScanIteration;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Cluster {@link RedisHashCommands} implementation for Jedis.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author John Blum
* @since 2.0
*/
class JedisClusterHashCommands implements RedisHashCommands {
Expand Down Expand Up @@ -160,8 +162,8 @@ public Entry<byte[], byte[]> hRandFieldWithValues(byte[] key) {
Assert.notNull(key, "Key must not be null");

try {
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, 1);
return map.isEmpty() ? null : map.entrySet().iterator().next();
List<Entry<byte[], byte[]>> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1);
return mapEntryList.isEmpty() ? null : mapEntryList.get(0);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -185,8 +187,7 @@ public List<byte[]> hRandField(byte[] key, long count) {
public List<Entry<byte[], byte[]>> hRandFieldWithValues(byte[] key, long count) {

try {
Map<byte[], byte[]> map = connection.getCluster().hrandfieldWithValues(key, count);
return Streamable.of(() -> map.entrySet().iterator()).toList();
return connection.getCluster().hrandfieldWithValues(key, count);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -249,21 +250,26 @@ public Properties getConfig(String pattern) {

Assert.notNull(pattern, "Pattern must not be null");

List<NodeResult<List<String>>> mapResult = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes((JedisClusterCommandCallback<List<String>>) client -> client.configGet(pattern))
JedisClusterCommandCallback<Map<String, String>> command = jedis -> jedis.configGet(pattern);

List<NodeResult<Map<String, String>>> nodeResults = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes(command)
.getResults();

List<String> result = new ArrayList<>();
for (NodeResult<List<String>> entry : mapResult) {
Properties nodesConfiguration = new Properties();

for (NodeResult<Map<String, String>> nodeResult : nodeResults) {

String prefix = nodeResult.getNode().asString();

String prefix = entry.getNode().asString();
int i = 0;
for (String value : entry.getValue()) {
result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value);
for (Entry<String, String> entry : nodeResult.getValue().entrySet()) {
String newKey = prefix.concat(".").concat(entry.getKey());
String value = entry.getValue();
nodesConfiguration.setProperty(newKey, value);
}
}

return Converters.toProperties(result);
return nodesConfiguration;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import static org.springframework.data.redis.connection.jedis.StreamConverters.*;

import org.springframework.util.StringUtils;
import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.params.XAddParams;
import redis.clients.jedis.params.XClaimParams;
import redis.clients.jedis.params.XPendingParams;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;

Expand Down Expand Up @@ -269,9 +271,19 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op

try {

List<Object> response = connection.getCluster().xpending(key, group,
JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)),
options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName()));
@SuppressWarnings("all")
XPendingParams pendingParams = new XPendingParams(
JedisConverters.toBytes(StreamConverters.getLowerValue(range)),
JedisConverters.toBytes(StreamConverters.getUpperValue(range)),
options.getCount().intValue());

String consumerName = options.getConsumerName();

if (StringUtils.hasText(consumerName)) {
pendingParams = pendingParams.consumer(consumerName);
}

List<Object> response = connection.getCluster().xpending(key, group, pendingParams);

return StreamConverters.toPendingMessages(groupName, range,
BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import redis.clients.jedis.params.ZParams;
import redis.clients.jedis.params.ZRangeParams;
import redis.clients.jedis.resps.ScanResult;
import redis.clients.jedis.util.KeyValue;

import java.util.ArrayList;
import java.util.LinkedHashSet;
Expand All @@ -34,7 +35,6 @@
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.connection.convert.SetConverter;
import org.springframework.data.redis.connection.zset.Aggregate;
import org.springframework.data.redis.connection.zset.DefaultTuple;
import org.springframework.data.redis.connection.zset.Tuple;
import org.springframework.data.redis.connection.zset.Weights;
import org.springframework.data.redis.core.Cursor;
Expand All @@ -46,18 +46,22 @@
import org.springframework.util.Assert;

/**
* Cluster {@link RedisZSetCommands} implementation for Jedis.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Clement Ong
* @author Andrey Shlykov
* @author Jens Deppe
* @author Shyngys Sapraliyev
* @author John Blum
* @since 2.0
*/
class JedisClusterZSetCommands implements RedisZSetCommands {

private static final SetConverter<redis.clients.jedis.resps.Tuple, Tuple> TUPLE_SET_CONVERTER = new SetConverter<>(
JedisConverters::toTuple);
private static final SetConverter<redis.clients.jedis.resps.Tuple, Tuple> TUPLE_SET_CONVERTER =
new SetConverter<>(JedisConverters::toTuple);

private final JedisClusterConnection connection;

JedisClusterZSetCommands(JedisClusterConnection connection) {
Expand Down Expand Up @@ -818,7 +822,7 @@ public Set<byte[]> zDiff(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return connection.getCluster().zdiff(sets);
return JedisConverters.toSet(connection.getCluster().zdiff(sets));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -835,7 +839,7 @@ public Set<Tuple> zDiffWithScores(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets));
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand Down Expand Up @@ -872,7 +876,7 @@ public Set<byte[]> zInter(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return connection.getCluster().zinter(new ZParams(), sets);
return JedisConverters.toSet(connection.getCluster().zinter(new ZParams(), sets));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -889,7 +893,8 @@ public Set<Tuple> zInterWithScores(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets));
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
.zinterWithScores(new ZParams(), sets)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -909,8 +914,8 @@ public Set<Tuple> zInterWithScores(Aggregate aggregate, Weights weights, byte[].
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return JedisConverters
.toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets));
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
.zinterWithScores(toZParams(aggregate, weights), sets)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand Down Expand Up @@ -971,7 +976,7 @@ public Set<byte[]> zUnion(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return connection.getCluster().zunion(new ZParams(), sets);
return JedisConverters.toSet(connection.getCluster().zunion(new ZParams(), sets));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -988,7 +993,8 @@ public Set<Tuple> zUnionWithScores(byte[]... sets) {
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets));
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
.zunionWithScores(new ZParams(), sets)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
Expand All @@ -1008,10 +1014,11 @@ public Set<Tuple> zUnionWithScores(Aggregate aggregate, Weights weights, byte[].
if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) {

try {
return JedisConverters
.toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets));
return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster()
.zunionWithScores(toZParams(aggregate, weights), sets)));
} catch (Exception ex) {
throw convertJedisAccessException(ex);

}
}

Expand Down Expand Up @@ -1126,21 +1133,14 @@ private static ZParams toZParams(Aggregate aggregate, Weights weights) {
return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name()));
}

/**
* Workaround for broken Jedis BZPOP signature.
*
* @param bytes
* @return
*/
@Nullable
@SuppressWarnings("unchecked")
private static Tuple toTuple(@Nullable List<?> bytes) {
private static Tuple toTuple(@Nullable KeyValue<?, redis.clients.jedis.resps.Tuple> keyValue) {

if (bytes == null || bytes.isEmpty()) {
return null;
if (keyValue != null) {
redis.clients.jedis.resps.Tuple tuple = keyValue.getValue();
return tuple != null ? JedisConverters.toTuple(tuple) : null;
}

return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import redis.clients.jedis.BuilderFactory;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObject;
Expand Down Expand Up @@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection {

private final Log LOGGER = LogFactory.getLog(getClass());

@SuppressWarnings("rawtypes")
private List<JedisResult> pipelinedResults = new ArrayList<>();

private final @Nullable Pool<Jedis> pool;
Expand Down Expand Up @@ -348,7 +346,6 @@ public void close() throws DataAccessException {
jedis.close();
}
else {
doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close");
doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close");
}
}
Expand Down Expand Up @@ -480,6 +477,7 @@ public void discard() {
public List<Object> exec() {

try {

if (transaction == null) {
throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi");
}
Expand All @@ -489,6 +487,7 @@ public List<Object> exec() {
return !CollectionUtils.isEmpty(results)
? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results)
: results;

} catch (Exception cause) {
throw convertJedisAccessException(cause);
} finally {
Expand Down
Loading

0 comments on commit 2075633

Please sign in to comment.