Skip to content

Commit

Permalink
Adapt DefaultStringRedisConnection to API changes in Jedis 5.0.
Browse files Browse the repository at this point in the history
Refactor and cleanup compiler warnings.

Organize source.

Closes spring-projects#2612
  • Loading branch information
jxblum committed Sep 19, 2023
1 parent f702314 commit 8255358
Showing 1 changed file with 56 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.data.redis.domain.geo.GeoReference;
import org.springframework.data.redis.domain.geo.GeoReference.GeoMemberReference;
import org.springframework.data.redis.domain.geo.GeoShape;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
Expand Down Expand Up @@ -87,21 +86,19 @@ public class DefaultStringRedisConnection implements StringRedisConnection, Deco

private static final byte[][] EMPTY_2D_BYTE_ARRAY = new byte[0][];

private boolean deserializePipelineAndTxResults = false;

private final Log log = LogFactory.getLog(DefaultStringRedisConnection.class);

private final RedisConnection delegate;

private final RedisSerializer<String> serializer;
private Converter<byte[], String> bytesToString = new DeserializingConverter();
private Converter<String, byte[]> stringToBytes = new SerializingConverter();
private final TupleConverter tupleConverter = new TupleConverter();
private SetConverter<Tuple, StringTuple> tupleToStringTuple = new SetConverter<>(tupleConverter);
private SetConverter<StringTuple, Tuple> stringTupleToTuple = new SetConverter<>(new StringTupleConverter());
private ListConverter<Tuple, StringTuple> tupleListToStringTuple = new ListConverter<>(new TupleConverter());
private ListConverter<byte[], String> byteListToStringList = new ListConverter<>(bytesToString);
private MapConverter<byte[], String> byteMapToStringMap = new MapConverter<>(bytesToString);
private MapConverter<String, byte[]> stringMapToByteMap = new MapConverter<>(stringToBytes);
private SetConverter<byte[], String> byteSetToStringSet = new SetConverter<>(bytesToString);
private Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<String>>> byteGeoResultsToStringGeoResults;
private Converter<ByteRecord, StringRecord> byteMapRecordToStringMapRecordConverter = new Converter<ByteRecord, StringRecord>() {

private final Converter<byte[], String> bytesToString = new DeserializingConverter();

private final Converter<GeoResults<GeoLocation<byte[]>>, GeoResults<GeoLocation<String>>> byteGeoResultsToStringGeoResults;

private final Converter<ByteRecord, StringRecord> byteMapRecordToStringMapRecordConverter = new Converter<>() {

@Nullable
@Override
Expand All @@ -110,47 +107,55 @@ public StringRecord convert(ByteRecord source) {
}
};

private ListConverter<ByteRecord, StringRecord> listByteMapRecordToStringMapRecordConverter = new ListConverter<>(
byteMapRecordToStringMapRecordConverter);
private final ListConverter<byte[], String> byteListToStringList = new ListConverter<>(bytesToString);
private final ListConverter<Tuple, StringTuple> tupleListToStringTuple = new ListConverter<>(new TupleConverter());

@SuppressWarnings("rawtypes") private Queue<Converter> pipelineConverters = new LinkedList<>();
@SuppressWarnings("rawtypes") private Queue<Converter> txConverters = new LinkedList<>();
private boolean deserializePipelineAndTxResults = false;
private final ListConverter<ByteRecord, StringRecord> listByteMapRecordToStringMapRecordConverter =
new ListConverter<>(byteMapRecordToStringMapRecordConverter);

private final MapConverter<byte[], String> byteMapToStringMap = new MapConverter<>(bytesToString);

private final TupleConverter tupleConverter = new TupleConverter();

private final SetConverter<byte[], String> byteSetToStringSet = new SetConverter<>(bytesToString);
private final SetConverter<StringTuple, Tuple> stringTupleToTuple = new SetConverter<>(new StringTupleConverter());
private final SetConverter<Tuple, StringTuple> tupleToStringTuple = new SetConverter<>(tupleConverter);

@SuppressWarnings("rawtypes")
private final Queue<Converter> pipelineConverters = new LinkedList<>();

@SuppressWarnings("rawtypes")
private final Queue<Converter> txConverters = new LinkedList<>();

private Entry<String, String> convertEntry(Entry<byte[], byte[]> source) {
return Converters.entryOf(bytesToString.convert(source.getKey()), bytesToString.convert(source.getValue()));
}

private class DeserializingConverter implements Converter<byte[], String> {

public String convert(byte[] source) {
return serializer.deserialize(source);
}
}

private class SerializingConverter implements Converter<String, byte[]> {
private class StringTupleConverter implements Converter<StringTuple, Tuple> {

@Nullable
@Override
public byte[] convert(String source) {
return serializer.serialize(source);
public Tuple convert(StringTuple source) {
return new DefaultTuple(source.getValue(), source.getScore());
}
}

private class TupleConverter implements Converter<Tuple, StringTuple> {

public StringTuple convert(Tuple source) {
return new DefaultStringTuple(source, serializer.deserialize(source.getValue()));
}
}

private class StringTupleConverter implements Converter<StringTuple, Tuple> {
public Tuple convert(StringTuple source) {
return new DefaultTuple(source.getValue(), source.getScore());
}
}

@SuppressWarnings("rawtypes")
private class TransactionResultConverter implements Converter<List<Object>, List<Object>> {
private Queue<Converter> txConverters;

private final Queue<Converter> txConverters;

public TransactionResultConverter(Queue<Converter> txConverters) {
this.txConverters = txConverters;
Expand Down Expand Up @@ -327,7 +332,6 @@ public byte[] echo(byte[] message) {
}

@Override
@SuppressWarnings("rawtypes")
public List<Object> exec() {

try {
Expand Down Expand Up @@ -1022,7 +1026,7 @@ public Long zDiffStore(byte[] destKey, byte[]... sets) {
@Nullable
@Override
public Set<String> zDiff(String... sets) {
return convertAndReturn(delegate.zDiff(serializeMulti(sets)), byteSetToStringSet);
return convertAndReturn(delegate.zDiff(serializeMulti(sets)), byteListToStringList);
}

@Nullable
Expand Down Expand Up @@ -1059,7 +1063,7 @@ public Set<Tuple> zInterWithScores(Aggregate aggregate, Weights weights, byte[].
@Nullable
@Override
public Set<String> zInter(String... sets) {
return convertAndReturn(delegate.zInter(serializeMulti(sets)), byteSetToStringSet);
return convertAndReturn(delegate.zInter(serializeMulti(sets)), byteListToStringList);
}

@Nullable
Expand Down Expand Up @@ -1265,7 +1269,7 @@ public Set<Tuple> zUnionWithScores(Aggregate aggregate, Weights weights, byte[].
@Nullable
@Override
public Set<String> zUnion(String... sets) {
return convertAndReturn(delegate.zUnion(serializeMulti(sets)), byteSetToStringSet);
return convertAndReturn(delegate.zUnion(serializeMulti(sets)), byteListToStringList);
}

@Nullable
Expand Down Expand Up @@ -1396,11 +1400,11 @@ private org.springframework.data.domain.Range.Bound<byte[]> rawBound(
.orElseGet(org.springframework.data.domain.Range.Bound::unbounded);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "rawtypes", "unchecked" })
private GeoReference<byte[]> serialize(GeoReference<String> data) {
return data instanceof GeoReference.GeoMemberReference
? GeoReference
.fromMember(serializer.serialize(((GeoMemberReference<String>) data).getMember()))

return data instanceof GeoReference.GeoMemberReference<String> geoMemberReference
? GeoReference.fromMember(serializer.serialize(geoMemberReference.getMember()))
: (GeoReference) data;
}

Expand Down Expand Up @@ -2887,12 +2891,14 @@ public List<StringRecord> xRange(String key, org.springframework.data.domain.Ran
}

@Override
@SuppressWarnings("unchecked")
public List<StringRecord> xReadAsString(StreamReadOptions readOptions, StreamOffset<String>... streams) {
return convertAndReturn(delegate.xRead(readOptions, serialize(streams)),
listByteMapRecordToStringMapRecordConverter);
}

@Override
@SuppressWarnings("unchecked")
public List<StringRecord> xReadGroupAsString(Consumer consumer, StreamReadOptions readOptions,
StreamOffset<String>... streams) {

Expand Down Expand Up @@ -3000,11 +3006,13 @@ public List<ByteRecord> xRange(byte[] key, org.springframework.data.domain.Range
}

@Override
@SuppressWarnings("unchecked")
public List<ByteRecord> xRead(StreamReadOptions readOptions, StreamOffset<byte[]>... streams) {
return delegate.xRead(readOptions, streams);
}

@Override
@SuppressWarnings("unchecked")
public List<ByteRecord> xReadGroup(Consumer consumer, StreamReadOptions readOptions,
StreamOffset<byte[]>... streams) {
return delegate.xReadGroup(consumer, readOptions, streams);
Expand Down Expand Up @@ -3036,12 +3044,11 @@ public void setDeserializePipelineAndTxResults(boolean deserializePipelineAndTxR
this.deserializePipelineAndTxResults = deserializePipelineAndTxResults;
}

@SuppressWarnings("unchecked")
@Nullable
@SuppressWarnings({ "rawtypes", "unchecked" })
private <T> T convertAndReturn(@Nullable Object value, Converter converter) {

if (isFutureConversion()) {

addResultConverter(converter);
return null;
}
Expand All @@ -3056,6 +3063,7 @@ private <T> T convertAndReturn(@Nullable Object value, Converter converter) {
}

private void addResultConverter(Converter<?, ?> converter) {

if (isQueueing()) {
txConverters.add(converter);
} else {
Expand All @@ -3069,20 +3077,26 @@ private boolean isFutureConversion() {

@SuppressWarnings({ "unchecked", "rawtypes" })
private List<Object> convertResults(@Nullable List<Object> results, Queue<Converter> converters) {

if (!deserializePipelineAndTxResults || results == null) {
return results;
}

if (results.size() != converters.size()) {
// Some of the commands were done directly on the delegate, don't attempt to convert
log.warn("Delegate returned an unexpected number of results; Abandoning type conversion.");
return results;
}

List<Object> convertedResults = new ArrayList<>(results.size());

for (Object result : results) {

Converter converter = converters.remove();

convertedResults.add(result == null ? null : converter.convert(result));
}

return convertedResults;
}

Expand All @@ -3095,9 +3109,11 @@ public List<Long> bitField(byte[] key, BitFieldSubCommands subCommands) {
public List<Long> bitfield(String key, BitFieldSubCommands operation) {

List<Long> results = delegate.bitField(serialize(key), operation);

if (isFutureConversion()) {
addResultConverter(Converters.identityConverter());
}

return results;
}

Expand Down

0 comments on commit 8255358

Please sign in to comment.