diff --git a/pom.xml b/pom.xml index 778dd177dc..97ebc2565e 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 1.4.20 2.11.1 6.2.6.RELEASE - 4.4.5 + 5.0.1 1.01 4.1.96.Final spring.data.redis diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java index 06bbf2a569..1bfbfe550d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterHashCommands.java @@ -30,13 +30,15 @@ import org.springframework.data.redis.core.ScanCursor; import org.springframework.data.redis.core.ScanIteration; import org.springframework.data.redis.core.ScanOptions; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** + * Cluster {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisClusterHashCommands implements RedisHashCommands { @@ -160,8 +162,8 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); try { - Map map = connection.getCluster().hrandfieldWithValues(key, 1); - return map.isEmpty() ? null : map.entrySet().iterator().next(); + List> mapEntryList = connection.getCluster().hrandfieldWithValues(key, 1); + return mapEntryList.isEmpty() ? null : mapEntryList.get(0); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -185,8 +187,7 @@ public List hRandField(byte[] key, long count) { public List> hRandFieldWithValues(byte[] key, long count) { try { - Map map = connection.getCluster().hrandfieldWithValues(key, count); - return Streamable.of(() -> map.entrySet().iterator()).toList(); + return connection.getCluster().hrandfieldWithValues(key, count); } catch (Exception ex) { throw convertJedisAccessException(ex); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java index 43b83bf874..5e920cc85c 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterServerCommands.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -249,21 +250,26 @@ public Properties getConfig(String pattern) { Assert.notNull(pattern, "Pattern must not be null"); - List>> mapResult = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((JedisClusterCommandCallback>) client -> client.configGet(pattern)) + JedisClusterCommandCallback> command = jedis -> jedis.configGet(pattern); + + List>> nodeResults = connection.getClusterCommandExecutor() + .executeCommandOnAllNodes(command) .getResults(); - List result = new ArrayList<>(); - for (NodeResult> entry : mapResult) { + Properties nodesConfiguration = new Properties(); + + for (NodeResult> nodeResult : nodeResults) { + + String prefix = nodeResult.getNode().asString(); - String prefix = entry.getNode().asString(); - int i = 0; - for (String value : entry.getValue()) { - result.add((i++ % 2 == 0 ? (prefix + ".") : "") + value); + for (Entry entry : nodeResult.getValue().entrySet()) { + String newKey = prefix.concat(".").concat(entry.getKey()); + String value = entry.getValue(); + nodesConfiguration.setProperty(newKey, value); } } - return Converters.toProperties(result); + return nodesConfiguration; } @Override diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 11223cb2d5..f9372ba9a4 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -17,9 +17,11 @@ import static org.springframework.data.redis.connection.jedis.StreamConverters.*; +import org.springframework.util.StringUtils; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; +import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; @@ -269,9 +271,19 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op try { - List response = connection.getCluster().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())); + @SuppressWarnings("all") + XPendingParams pendingParams = new XPendingParams( + JedisConverters.toBytes(StreamConverters.getLowerValue(range)), + JedisConverters.toBytes(StreamConverters.getUpperValue(range)), + options.getCount().intValue()); + + String consumerName = options.getConsumerName(); + + if (StringUtils.hasText(consumerName)) { + pendingParams = pendingParams.consumer(consumerName); + } + + List response = connection.getCluster().xpending(key, group, pendingParams); return StreamConverters.toPendingMessages(groupName, range, BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(response)); diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java index 4d0923e43d..c090186086 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterZSetCommands.java @@ -20,6 +20,7 @@ import redis.clients.jedis.params.ZParams; import redis.clients.jedis.params.ZRangeParams; import redis.clients.jedis.resps.ScanResult; +import redis.clients.jedis.util.KeyValue; import java.util.ArrayList; import java.util.LinkedHashSet; @@ -34,7 +35,6 @@ import org.springframework.data.redis.connection.RedisZSetCommands; import org.springframework.data.redis.connection.convert.SetConverter; import org.springframework.data.redis.connection.zset.Aggregate; -import org.springframework.data.redis.connection.zset.DefaultTuple; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.connection.zset.Weights; import org.springframework.data.redis.core.Cursor; @@ -46,18 +46,22 @@ import org.springframework.util.Assert; /** + * Cluster {@link RedisZSetCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch * @author Clement Ong * @author Andrey Shlykov * @author Jens Deppe * @author Shyngys Sapraliyev + * @author John Blum * @since 2.0 */ class JedisClusterZSetCommands implements RedisZSetCommands { - private static final SetConverter TUPLE_SET_CONVERTER = new SetConverter<>( - JedisConverters::toTuple); + private static final SetConverter TUPLE_SET_CONVERTER = + new SetConverter<>(JedisConverters::toTuple); + private final JedisClusterConnection connection; JedisClusterZSetCommands(JedisClusterConnection connection) { @@ -818,7 +822,7 @@ public Set zDiff(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zdiff(sets); + return JedisConverters.toSet(connection.getCluster().zdiff(sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -835,7 +839,7 @@ public Set zDiffWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zdiffWithScores(sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster().zdiffWithScores(sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -872,7 +876,7 @@ public Set zInter(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zinter(new ZParams(), sets); + return JedisConverters.toSet(connection.getCluster().zinter(new ZParams(), sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -889,7 +893,8 @@ public Set zInterWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zinterWithScores(new ZParams(), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(new ZParams(), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -909,8 +914,8 @@ public Set zInterWithScores(Aggregate aggregate, Weights weights, byte[]. if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters - .toTupleSet(connection.getCluster().zinterWithScores(toZParams(aggregate, weights), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zinterWithScores(toZParams(aggregate, weights), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -971,7 +976,7 @@ public Set zUnion(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return connection.getCluster().zunion(new ZParams(), sets); + return JedisConverters.toSet(connection.getCluster().zunion(new ZParams(), sets)); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -988,7 +993,8 @@ public Set zUnionWithScores(byte[]... sets) { if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters.toTupleSet(connection.getCluster().zunionWithScores(new ZParams(), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(new ZParams(), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); } @@ -1008,10 +1014,11 @@ public Set zUnionWithScores(Aggregate aggregate, Weights weights, byte[]. if (ClusterSlotHashUtil.isSameSlotForAllKeys(sets)) { try { - return JedisConverters - .toTupleSet(connection.getCluster().zunionWithScores(toZParams(aggregate, weights), sets)); + return JedisConverters.toSet(JedisConverters.toTupleList(connection.getCluster() + .zunionWithScores(toZParams(aggregate, weights), sets))); } catch (Exception ex) { throw convertJedisAccessException(ex); + } } @@ -1126,21 +1133,14 @@ private static ZParams toZParams(Aggregate aggregate, Weights weights) { return new ZParams().weights(weights.toArray()).aggregate(ZParams.Aggregate.valueOf(aggregate.name())); } - /** - * Workaround for broken Jedis BZPOP signature. - * - * @param bytes - * @return - */ @Nullable - @SuppressWarnings("unchecked") - private static Tuple toTuple(@Nullable List bytes) { + private static Tuple toTuple(@Nullable KeyValue keyValue) { - if (bytes == null || bytes.isEmpty()) { - return null; + if (keyValue != null) { + redis.clients.jedis.resps.Tuple tuple = keyValue.getValue(); + return tuple != null ? JedisConverters.toTuple(tuple) : null; } - return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2)))); + return null; } - } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index eaa1b5ba4e..a474bbb8fe 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -59,9 +59,6 @@ import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.CommandObject; @@ -130,6 +127,7 @@ public class JedisConnection extends AbstractRedisConnection { private final Log LOGGER = LogFactory.getLog(getClass()); + @SuppressWarnings("rawtypes") private List pipelinedResults = new ArrayList<>(); private final @Nullable Pool pool; @@ -348,7 +346,6 @@ public void close() throws DataAccessException { jedis.close(); } else { - doExceptionThrowingOperationSafely(jedis::quit, "Failed to quit during close"); doExceptionThrowingOperationSafely(jedis::disconnect, "Failed to disconnect during close"); } } @@ -480,6 +477,7 @@ public void discard() { public List exec() { try { + if (transaction == null) { throw new InvalidDataAccessApiUsageException("No ongoing transaction; Did you forget to call multi"); } @@ -489,6 +487,7 @@ public List exec() { return !CollectionUtils.isEmpty(results) ? new TransactionResultConverter<>(txResults, JedisExceptionConverter.INSTANCE).convert(results) : results; + } catch (Exception cause) { throw convertJedisAccessException(cause); } finally { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java index 88b8a5eb4d..ac689c75ca 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConverters.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +88,8 @@ import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import redis.clients.jedis.util.SafeEncoder; + /** * Jedis type converters. * @@ -114,12 +117,22 @@ abstract class JedisConverters extends Converters { MINUS_BYTES = toBytes("-"); POSITIVE_INFINITY_BYTES = toBytes("+inf"); NEGATIVE_INFINITY_BYTES = toBytes("-inf"); + + } + + @Nullable + static Set toSet(@Nullable List list) { + return list != null ? new LinkedHashSet<>(list) : null; } public static Converter stringToBytes() { return JedisConverters::toBytes; } + static ListConverter stringListToByteList() { + return new ListConverter<>(stringToBytes()); + } + /** * {@link ListConverter} converting jedis {@link redis.clients.jedis.resps.Tuple} to {@link Tuple}. * @@ -129,16 +142,16 @@ static ListConverter tuplesToTuples() { return new ListConverter<>(JedisConverters::toTuple); } - static ListConverter stringListToByteList() { - return new ListConverter<>(stringToBytes()); + static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { + return new DefaultTuple(source.getBinaryElement(), source.getScore()); } - static Set toTupleSet(Set source) { - return new SetConverter<>(JedisConverters::toTuple).convert(source); + static List toTupleList(List source) { + return tuplesToTuples().convert(source); } - public static Tuple toTuple(redis.clients.jedis.resps.Tuple source) { - return new DefaultTuple(source.getBinaryElement(), source.getScore()); + static Set toTupleSet(Set source) { + return new SetConverter<>(JedisConverters::toTuple).convert(source); } /** @@ -255,6 +268,7 @@ public static byte[][] toByteArrays(Map source) { public static SortingParams toSortingParams(@Nullable SortParameters params) { SortingParams jedisParams = null; + if (params != null) { jedisParams = new SortingParams(); byte[] byPattern = params.getByPattern(); @@ -278,6 +292,7 @@ public static SortingParams toSortingParams(@Nullable SortParameters params) { jedisParams.alpha(); } } + return jedisParams; } @@ -386,8 +401,10 @@ public static SetParams toSetCommandExPxArgument(Expiration expiration, SetParam * @since 2.6 */ static GetExParams toGetExParams(Expiration expiration) { + return toGetExParams(expiration, new GetExParams()); + } - GetExParams params = new GetExParams(); + static GetExParams toGetExParams(Expiration expiration, GetExParams params) { if (expiration.isPersistent()) { return params.persist(); @@ -584,18 +601,14 @@ static ZAddParams toZAddParams(ZAddArgs source) { return new ZAddParams(); } - ZAddParams target = new ZAddParams() { - - { - if (source.contains(ZAddArgs.Flag.GT)) { - addParam("gt"); - } - if (source.contains(ZAddArgs.Flag.LT)) { - addParam("lt"); - } - } - }; + ZAddParams target = new ZAddParams(); + if (source.contains(ZAddArgs.Flag.GT)) { + target.gt(); + } + if (source.contains(ZAddArgs.Flag.LT)) { + target.lt(); + } if (source.contains(ZAddArgs.Flag.XX)) { target.xx(); } @@ -605,6 +618,7 @@ static ZAddParams toZAddParams(ZAddArgs source) { if (source.contains(ZAddArgs.Flag.CH)) { target.ch(); } + return target; } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java index 4efaf0c85c..419d19d3f8 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisHashCommands.java @@ -37,8 +37,11 @@ import org.springframework.util.Assert; /** + * {@link RedisHashCommands} implementation for Jedis. + * * @author Christoph Strobl * @author Mark Paluch + * @author John Blum * @since 2.0 */ class JedisHashCommands implements RedisHashCommands { @@ -122,7 +125,7 @@ public Entry hRandFieldWithValues(byte[] key) { Assert.notNull(key, "Key must not be null"); return connection.invoke().from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, 1L) - .get(it -> it.isEmpty() ? null : it.entrySet().iterator().next()); + .get(mapEntryList -> mapEntryList.isEmpty() ? null : mapEntryList.get(0)); } @Nullable @@ -141,12 +144,16 @@ public List> hRandFieldWithValues(byte[] key, long count) Assert.notNull(key, "Key must not be null"); return connection.invoke() - .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count).get(it -> { + .from(Jedis::hrandfieldWithValues, PipelineBinaryCommands::hrandfieldWithValues, key, count) + .get(mapEntryList -> { + + List> convertedMapEntryList = new ArrayList<>(mapEntryList.size()); + + mapEntryList.forEach(entry -> + convertedMapEntryList.add(Converters.entryOf(entry.getKey(), entry.getValue()))); - List> entries = new ArrayList<>(it.size()); - it.forEach((k, v) -> entries.add(Converters.entryOf(k, v))); + return convertedMapEntryList; - return entries; }); } diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java index 805efabe4b..ef1771969d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisInvoker.java @@ -17,7 +17,6 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Queable; import redis.clients.jedis.Response; import redis.clients.jedis.Transaction; import redis.clients.jedis.commands.DatabasePipelineCommands; @@ -47,7 +46,7 @@ * composing a functional pipeline to transform the result using a {@link Converter}. *

* Usage example: - * + *

*

  * JedisInvoker invoker = …;
  *
@@ -62,6 +61,7 @@
  *
  * @author Mark Paluch
  * @author Christoph Strobl
+ * @author John Blum
  * @since 2.5
  */
 class JedisInvoker {
@@ -937,10 +937,7 @@ static class DefaultSingleInvocationSpec implements SingleInvocationSpec {
 
 		@Override
 		public  T get(Converter converter) {
-
-			Assert.notNull(converter, "Converter must not be null");
-
-			return synchronizer.invoke(parentFunction, parentPipelineFunction, converter, () -> null);
+			return getOrElse(converter, () -> null);
 		}
 
 		@Nullable
@@ -959,6 +956,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		private final Function>> parentPipelineFunction;
 		private final Synchronizer synchronizer;
 
+		@SuppressWarnings({ "rawtypes", "unchecked" })
 		DefaultManyInvocationSpec(Function> parentFunction,
 				Function>> parentPipelineFunction,
 				Synchronizer synchronizer) {
@@ -969,6 +967,7 @@ static class DefaultManyInvocationSpec implements ManyInvocationSpec {
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  List toList(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -981,15 +980,17 @@ public  List toList(Converter converter) {
 
 				List result = new ArrayList<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptyList);
 		}
 
 		@Override
+		@SuppressWarnings("all")
 		public  Set toSet(Converter converter) {
 
 			Assert.notNull(converter, "Converter must not be null");
@@ -1002,11 +1003,12 @@ public  Set toSet(Converter converter) {
 
 				Set result = new LinkedHashSet<>(source.size());
 
-				for (S s : source) {
-					result.add(converter.convert(s));
+				for (S element : source) {
+					result.add(converter.convert(element));
 				}
 
 				return result;
+
 			}, Collections::emptySet);
 		}
 	}
@@ -1020,6 +1022,7 @@ interface Synchronizer {
 		@Nullable
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		default  T invoke(Function callFunction, Function> pipelineFunction) {
+
 			return (T) doInvoke((Function) callFunction, (Function) pipelineFunction, Converters.identityConverter(),
 					() -> null);
 		}
@@ -1046,15 +1049,13 @@ interface ResponseCommands extends PipelineBinaryCommands, DatabasePipelineComma
 	/**
 	 * Create a proxy to invoke methods dynamically on {@link Pipeline} or {@link Transaction} as those share many
 	 * commands that are not defined on a common super-type.
-	 *
-	 * @param pipelineOrTransaction
-	 * @return
 	 */
-	static ResponseCommands createCommands(Queable pipelineOrTransaction) {
+	static ResponseCommands createCommands(Object pipelineOrTransaction) {
 
 		ProxyFactory proxyFactory = new ProxyFactory(pipelineOrTransaction);
+
 		proxyFactory.addInterface(ResponseCommands.class);
+
 		return (ResponseCommands) proxyFactory.getProxy(JedisInvoker.class.getClassLoader());
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
index 1aab87de0a..a19247a8d1 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisZSetCommands.java
@@ -22,6 +22,7 @@
 import redis.clients.jedis.params.ZParams;
 import redis.clients.jedis.params.ZRangeParams;
 import redis.clients.jedis.resps.ScanResult;
+import redis.clients.jedis.util.KeyValue;
 
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -31,7 +32,6 @@
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.connection.RedisZSetCommands;
 import org.springframework.data.redis.connection.zset.Aggregate;
-import org.springframework.data.redis.connection.zset.DefaultTuple;
 import org.springframework.data.redis.connection.zset.Tuple;
 import org.springframework.data.redis.connection.zset.Weights;
 import org.springframework.data.redis.core.Cursor;
@@ -42,11 +42,14 @@
 import org.springframework.util.Assert;
 
 /**
+ * {@link RedisZSetCommands} implementation for Jedis.
+ *
  * @author Christoph Strobl
  * @author Clement Ong
  * @author Mark Paluch
  * @author Andrey Shlykov
  * @author Shyngys Sapraliyev
+ * @author John Blum
  * @since 2.0
  */
 class JedisZSetCommands implements RedisZSetCommands {
@@ -74,8 +77,10 @@ public Long zAdd(byte[] key, Set tuples, ZAddArgs args) {
 		Assert.notNull(key, "Key must not be null");
 		Assert.notNull(tuples, "Tuples must not be null");
 
-		return connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key, JedisConverters.toTupleMap(tuples),
-				JedisConverters.toZAddParams(args));
+		Long count = connection.invoke().just(Jedis::zadd, PipelineBinaryCommands::zadd, key,
+				JedisConverters.toTupleMap(tuples), JedisConverters.toZAddParams(args));
+
+		return count != null ? count : 0L;
 	}
 
 	@Override
@@ -424,7 +429,7 @@ public Set zDiff(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets);
+		return connection.invoke().fromMany(Jedis::zdiff, PipelineBinaryCommands::zdiff, sets).toSet();
 	}
 
 	@Override
@@ -450,7 +455,7 @@ public Set zInter(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zinter, PipelineBinaryCommands::zinter, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -504,7 +509,7 @@ public Set zUnion(byte[]... sets) {
 
 		Assert.notNull(sets, "Sets must not be null");
 
-		return connection.invoke().just(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets);
+		return connection.invoke().fromMany(Jedis::zunion, PipelineBinaryCommands::zunion, new ZParams(), sets).toSet();
 	}
 
 	@Override
@@ -772,21 +777,8 @@ static ZRangeParams toZRangeParams(Protocol.Keyword by, byte[] min, byte[] max,
 		return zRangeParams;
 	}
 
-	/**
-	 * Workaround for broken Jedis BZPOP signature.
-	 *
-	 * @param bytes
-	 * @return
-	 */
 	@Nullable
-	@SuppressWarnings("unchecked")
-	private static Tuple toTuple(List bytes) {
-
-		if (bytes.isEmpty()) {
-			return null;
-		}
-
-		return new DefaultTuple((byte[]) bytes.get(1), Double.parseDouble(new String((byte[]) bytes.get(2))));
+	private static Tuple toTuple(@Nullable KeyValue keyValue) {
+		return keyValue != null ? JedisConverters.toTuple(keyValue.getValue()) : null;
 	}
-
 }
diff --git a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
index eb2c198e8e..3862985bb8 100644
--- a/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
+++ b/src/main/java/org/springframework/data/redis/connection/zset/DefaultTuple.java
@@ -24,17 +24,20 @@
  *
  * @author Costin Leau
  * @author Christoph Strobl
+ * @author John Blum
  */
 public class DefaultTuple implements Tuple {
 
+	private static final Double ZERO = 0.0d;
+
 	private final Double score;
 	private final byte[] value;
 
 	/**
-	 * Constructs a new DefaultTuple instance.
+	 * Constructs a new {@link DefaultTuple}.
 	 *
-	 * @param value
-	 * @param score
+	 * @param value {@link byte[]} of the member's raw value.
+	 * @param score {@link Double score} of the raw value used in sorting.
 	 */
 	public DefaultTuple(byte[] value, Double score) {
 
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
index 78ffeb756e..13621d31fc 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisClusterConnectionTests.java
@@ -2279,7 +2279,7 @@ public void zPopMinShouldWorkCorrectly() {
 	@EnabledOnCommand("BZPOPMIN")
 	public void bzPopMinShouldWorkCorrectly() {
 
-		assertThat(clusterConnection.bZPopMin(KEY_1_BYTES, 10, TimeUnit.MILLISECONDS)).isNull();
+		assertThat(clusterConnection.zSetCommands().zCard(KEY_1_BYTES)).isZero();
 
 		nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES);
 		nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES);
@@ -2306,7 +2306,7 @@ public void zPopMaxShouldWorkCorrectly() {
 	@EnabledOnCommand("BZPOPMAX")
 	public void bzPopMaxShouldWorkCorrectly() {
 
-		assertThat(clusterConnection.bZPopMax(KEY_1_BYTES, 10, TimeUnit.MILLISECONDS)).isNull();
+		assertThat(clusterConnection.zSetCommands().zCard(KEY_1_BYTES)).isZero();
 
 		nativeConnection.zadd(KEY_1_BYTES, 10D, VALUE_1_BYTES);
 		nativeConnection.zadd(KEY_1_BYTES, 20D, VALUE_2_BYTES);
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
index 2a39274473..7f61f9aea5 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java
@@ -165,7 +165,7 @@ public void scanShouldKeepTheConnectionOpen() {
 
 			connection.scan(ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531, GH-2006
@@ -177,7 +177,7 @@ public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.scan(ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -188,7 +188,7 @@ public void sScanShouldKeepTheConnectionOpen() {
 
 			connection.sScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -200,7 +200,7 @@ public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.sScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -211,7 +211,7 @@ public void zScanShouldKeepTheConnectionOpen() {
 
 			connection.zScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -223,7 +223,7 @@ public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor cursor = connection.zScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -234,7 +234,7 @@ public void hScanShouldKeepTheConnectionOpen() {
 
 			connection.hScan("foo".getBytes(), ScanOptions.NONE);
 
-			verify(jedisSpy, never()).quit();
+			verify(jedisSpy, never()).disconnect();
 		}
 
 		@Test // DATAREDIS-531
@@ -246,7 +246,7 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
 			Cursor> cursor = connection.hScan("foo".getBytes(), ScanOptions.NONE);
 			cursor.close();
 
-			verify(jedisSpy, times(1)).quit();
+			verify(jedisSpy, times(1)).disconnect();
 		}
 
 		@Test // DATAREDIS-714
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
index eb37cf171d..7cbc1b6e92 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConvertersUnitTests.java
@@ -16,7 +16,15 @@
 package org.springframework.data.redis.connection.jedis;
 
 import static org.assertj.core.api.Assertions.*;
-
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import redis.clients.jedis.Protocol;
 import redis.clients.jedis.params.GetExParams;
 import redis.clients.jedis.params.SetParams;
 
@@ -35,12 +43,15 @@
 import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
 import org.springframework.data.redis.core.types.Expiration;
 import org.springframework.data.redis.core.types.RedisClientInfo;
+import org.springframework.lang.Nullable;
+import org.springframework.test.util.ReflectionTestUtils;
 
 /**
  * Unit tests for {@link JedisConverters}.
  *
  * @author Christoph Strobl
  * @author Mark Paluch
+ * @author John Blum
  */
 class JedisConvertersUnitTests {
 
@@ -212,15 +223,31 @@ void toSetCommandExPxOptionShouldReturnEXforMilliseconds() {
 	@Test // GH-2050
 	void convertsExpirationToSetPXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().pxAt(10).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToSetEXAT() {
 
-		assertThat(JedisConverters.toSetCommandExPxArgument(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(SetParams::toString).isEqualTo(SetParams.setParams().exAt(60).toString());
+		SetParams mockSetParams = mock(SetParams.class);
+
+		doReturn(mockSetParams).when(mockSetParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toSetCommandExPxArgument(expiration, mockSetParams)).isNotNull();
+
+		verify(mockSetParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockSetParams);
 	}
 
 	@Test // DATAREDIS-316, DATAREDIS-749
@@ -241,43 +268,91 @@ void toSetCommandNxXxOptionShouldReturnEmptyArrayforUpsert() {
 	@Test // GH-2050
 	void convertsExpirationToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.seconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.seconds(10);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.from(1, TimeUnit.MINUTES))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().ex(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).ex(anyLong());
+
+		Expiration expiration = Expiration.from(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).ex(eq(60L)); // seconds
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPEX() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.milliseconds(10))).extracting(GetExParams::toString)
-				.isEqualTo(new GetExParams().px(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).px(anyLong());
+
+		Expiration expiration = Expiration.milliseconds(10L);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).px(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.SECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.SECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationWithTimeUnitToGetExEXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(1, TimeUnit.MINUTES)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().exAt(60).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).exAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(1, TimeUnit.MINUTES);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).exAt(eq(60L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	@Test // GH-2050
 	void convertsExpirationToGetExPXAT() {
 
-		assertThat(JedisConverters.toGetExParams(Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS)))
-				.extracting(GetExParams::toString).isEqualTo(new GetExParams().pxAt(10).toString());
+		GetExParams mockGetExParams = mock(GetExParams.class);
+
+		doReturn(mockGetExParams).when(mockGetExParams).pxAt(anyLong());
+
+		Expiration expiration = Expiration.unixTimestamp(10, TimeUnit.MILLISECONDS);
+
+		assertThat(JedisConverters.toGetExParams(expiration, mockGetExParams)).isNotNull();
+
+		verify(mockGetExParams, times(1)).pxAt(eq(10L));
+		verifyNoMoreInteractions(mockGetExParams);
 	}
 
 	private void verifyRedisServerInfo(RedisServer server, Map values) {
@@ -289,18 +364,22 @@ private void verifyRedisServerInfo(RedisServer server, Map value
 
 	private static String toString(SetParams setParams) {
 
-		StringBuilder builder = new StringBuilder();
+		StringBuilder stringBuilder = new StringBuilder();
 
-		for (byte[] parameter : setParams.getByteParams()) {
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "existance")));
+		stringBuilder.append(toString((Protocol.Keyword) ReflectionTestUtils.getField(setParams, "expiration")));
 
-			if (builder.length() != 0) {
-				builder.append(' ');
-			}
+		Long expirationValue = (Long) ReflectionTestUtils.getField(setParams, "expirationValue");
 
-			builder.append(new String(parameter));
+		if (expirationValue != null) {
+			stringBuilder.append(" ").append(expirationValue);
 		}
 
-		return builder.toString();
+		return stringBuilder.toString().trim();
+	}
+
+	private static String toString(@Nullable Enum value) {
+		return value != null ? value.name().toLowerCase() : "";
 	}
 
 	private Map getRedisServerInfoMap(String name, int port) {