From f3bd180949b5b20fcc3d076ee1a9cdd57c4dbf43 Mon Sep 17 00:00:00 2001 From: byunjuneseok Date: Fri, 15 Aug 2025 23:41:05 +0900 Subject: [PATCH 1/4] feat: Add sharded pub/sub auto-resubscription mechanism --- .../StatefulRedisPubSubConnectionImpl.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index 91ef777f5a..66f1cbf170 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -24,6 +24,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisCommandExecutionException; @@ -53,6 +55,8 @@ public class StatefulRedisPubSubConnectionImpl extends StatefulRedisConnec private final PubSubEndpoint endpoint; + private final ShardedPubSubAutoResubscribeListener autoResubscribeListener; + /** * Initialize a new connection. * @@ -67,6 +71,10 @@ public StatefulRedisPubSubConnectionImpl(PubSubEndpoint endpoint, RedisCha super(writer, endpoint, codec, timeout, DEFAULT_JSON_PARSER); this.endpoint = endpoint; endpoint.setConnectionState(getConnectionState()); + + // Add internal listener for auto-resubscription on sunsubscribe events + this.autoResubscribeListener = new ShardedPubSubAutoResubscribeListener(); + endpoint.addListener(autoResubscribeListener); } /** @@ -163,4 +171,48 @@ public void activated() { } } + /** + * Internal listener that handles automatic resubscription for sharded pub/sub channels when they are unsubscribed due to + * slot rebalancing. + */ + private class ShardedPubSubAutoResubscribeListener extends RedisPubSubAdapter { + + private final Set intentionalUnsubscriptions = ConcurrentHashMap.newKeySet(); + + @Override + public void sunsubscribed(K shardChannel, long count) { + if (intentionalUnsubscriptions.remove(shardChannel)) { + return; + } + + if (shardChannel != null) { + InternalLoggerFactory.getInstance(getClass()).debug( + "Triggering auto-resubscribe to generate MovedRedirectionEvent for shard channel: {}", shardChannel); + RedisFuture resubscribeResult = async().ssubscribe(shardChannel); + resubscribeResult.exceptionally(throwable -> { + InternalLoggerFactory.getInstance(getClass()).debug( + "Auto-resubscribe triggered cluster redirection for shard channel {}: {}", shardChannel, + throwable.getMessage()); + return null; + }); + } + } + + /** + * Mark a channel as intentionally unsubscribed to prevent auto-resubscription + */ + public void markIntentionalUnsubscribe(K shardChannel) { + intentionalUnsubscriptions.add(shardChannel); + } + + } + + /** + * Mark a channel as intentionally unsubscribed to prevent auto-resubscription. This method is called by + * RedisPubSubAsyncCommandsImpl when sunsubscribe is explicitly called. + */ + public void markIntentionalUnsubscribe(K shardChannel) { + autoResubscribeListener.markIntentionalUnsubscribe(shardChannel); + } + } From f8871322aebff2ccdc706d54bbc92f558b995fc4 Mon Sep 17 00:00:00 2001 From: byunjuneseok Date: Sat, 16 Aug 2025 18:06:51 +0900 Subject: [PATCH 2/4] feat: Mark intentional sharded pub/sub unsubscriptions --- .../lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java | 8 ++++++++ .../core/pubsub/RedisPubSubReactiveCommandsImpl.java | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java index 44108e4942..1cc6512e1c 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubAsyncCommandsImpl.java @@ -114,6 +114,14 @@ public RedisFuture ssubscribe(K... channels) { @Override @SuppressWarnings("unchecked") public RedisFuture sunsubscribe(K... channels) { + // Mark these channels as intentionally unsubscribed to prevent auto-resubscription + StatefulRedisPubSubConnection connection = getStatefulConnection(); + if (connection instanceof StatefulRedisPubSubConnectionImpl) { + StatefulRedisPubSubConnectionImpl impl = (StatefulRedisPubSubConnectionImpl) connection; + for (K channel : channels) { + impl.markIntentionalUnsubscribe(channel); + } + } return (RedisFuture) dispatch(commandBuilder.sunsubscribe(channels)); } diff --git a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java index 54774b7269..a21d39015f 100644 --- a/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/RedisPubSubReactiveCommandsImpl.java @@ -171,6 +171,14 @@ public Mono ssubscribe(K... shardChannels) { @Override public Mono sunsubscribe(K... shardChannels) { + // Mark these channels as intentionally unsubscribed to prevent auto-resubscription + StatefulRedisPubSubConnection connection = getStatefulConnection(); + if (connection instanceof StatefulRedisPubSubConnectionImpl) { + StatefulRedisPubSubConnectionImpl impl = (StatefulRedisPubSubConnectionImpl) connection; + for (K channel : shardChannels) { + impl.markIntentionalUnsubscribe(channel); + } + } return createFlux(() -> commandBuilder.sunsubscribe(shardChannels)).then(); } From 429f61c7afb82a47ee8bb83cf12f98be9215e97d Mon Sep 17 00:00:00 2001 From: byunjuneseok Date: Sat, 16 Aug 2025 18:06:51 +0900 Subject: [PATCH 3/4] test: Add unit tests for sharded pub/sub auto-resubscription --- ...fulRedisPubSubConnectionImplUnitTests.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java index 9eb5528244..ee824c856c 100644 --- a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java @@ -14,8 +14,16 @@ import static io.lettuce.TestTags.UNIT_TEST; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; +import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; +import io.lettuce.core.pubsub.PubSubOutput; +import io.lettuce.core.pubsub.RedisPubSubAdapter; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; @@ -122,4 +130,92 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() { assertInstanceOf(AsyncCommand.class, subscriptions.get(1)); } + @Test + void autoResubscribeListenerIsRegistered() { + // Verify that the connection has the markIntentionalUnsubscribe method + // This confirms the auto-resubscribe functionality is available + connection.markIntentionalUnsubscribe("test-channel"); + // If no exception is thrown, the method exists and works + assertTrue(true); + } + + @Test + void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception { + // Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe + + // Create a mock async commands to verify ssubscribe is NOT called + RedisPubSubAsyncCommands mockAsync = mock(RedisPubSubAsyncCommands.class); + StatefulRedisPubSubConnectionImpl spyConnection = spy(connection); + when(spyConnection.async()).thenReturn(mockAsync); + + // Mark the channel as intentionally unsubscribed + spyConnection.markIntentionalUnsubscribe("test-channel"); + + // Use reflection to access the private endpoint and trigger sunsubscribed event + PubSubEndpoint endpoint = getEndpointViaReflection(spyConnection); + PubSubOutput sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec); + endpoint.notifyMessage(sunsubscribeMessage); + + // Wait a moment for any async processing + Thread.sleep(50); + + // Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe) + verify(mockAsync, never()).ssubscribe("test-channel"); + } + + @Test + void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception { + // Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe + + // Create a fresh connection with a mock async + PubSubEndpoint mockEndpoint = mock(PubSubEndpoint.class); + StatefulRedisPubSubConnectionImpl testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint, + mockedWriter, codec, timeout); + + // Create a mock async commands to verify ssubscribe IS called + RedisPubSubAsyncCommands mockAsync = mock(RedisPubSubAsyncCommands.class); + @SuppressWarnings("unchecked") + RedisFuture mockFuture = mock(RedisFuture.class); + when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture); + + StatefulRedisPubSubConnectionImpl spyConnection = spy(testConnection); + when(spyConnection.async()).thenReturn(mockAsync); + + // Get the auto-resubscribe listener directly and trigger it + RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(spyConnection); + + // Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement + autoResubscribeListener.sunsubscribed("test-channel", 0); + + // Wait a moment for async processing + Thread.sleep(50); + + // Verify that ssubscribe WAS called (auto-resubscribe triggered) + verify(mockAsync, times(1)).ssubscribe("test-channel"); + } + + @SuppressWarnings("unchecked") + private PubSubEndpoint getEndpointViaReflection( + StatefulRedisPubSubConnectionImpl connection) throws Exception { + Field endpointField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("endpoint"); + endpointField.setAccessible(true); + return (PubSubEndpoint) endpointField.get(connection); + } + + @SuppressWarnings("unchecked") + private RedisPubSubListener getAutoResubscribeListener( + StatefulRedisPubSubConnectionImpl connection) throws Exception { + Field listenerField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("autoResubscribeListener"); + listenerField.setAccessible(true); + return (RedisPubSubListener) listenerField.get(connection); + } + + private PubSubOutput createSunsubscribeMessage(String channel, RedisCodec codec) { + PubSubOutput output = new PubSubOutput<>(codec); + output.set(ByteBuffer.wrap("sunsubscribe".getBytes())); + output.set(ByteBuffer.wrap(channel.getBytes())); + output.set(0L); // count + return output; + } + } From cf1b45c00f52e625954661f4c978181856d3d805 Mon Sep 17 00:00:00 2001 From: byunjuneseok Date: Sat, 16 Aug 2025 18:06:51 +0900 Subject: [PATCH 4/4] test: Add integration tests for sharded pub/sub auto-resubscription --- .../pubsub/PubSubCommandIntegrationTests.java | 64 ++++++++++++++++ ...fulRedisPubSubConnectionImplUnitTests.java | 74 ++----------------- 2 files changed, 70 insertions(+), 68 deletions(-) diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java index 3472660281..a657ad35a3 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java @@ -611,4 +611,68 @@ void echoAllowedInSubscriptionState() { pubsub.unsubscribe(channel); } + @Test + void autoResubscribeOnShardChannelUnsubscribed() throws Exception { + final BlockingQueue subscribedChannels = LettuceFactories.newBlockingQueue(); + final BlockingQueue unsubscribedChannels = LettuceFactories.newBlockingQueue(); + + RedisPubSubListener listener = new RedisPubSubAdapter() { + + @Override + public void ssubscribed(String channel, long count) { + subscribedChannels.add(channel); + } + + @Override + public void sunsubscribed(String channel, long count) { + unsubscribedChannels.add(channel); + } + + }; + + pubsub.getStatefulConnection().addListener(listener); + pubsub.ssubscribe(shardChannel); + + assertThat(subscribedChannels.take()).isEqualTo(shardChannel); + + pubsub.sunsubscribe(shardChannel); + + assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel); + assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull(); + + pubsub.getStatefulConnection().removeListener(listener); + } + + @Test + void noAutoResubscribeOnIntentionalUnsubscribe() throws Exception { + final BlockingQueue subscribedChannels = LettuceFactories.newBlockingQueue(); + final BlockingQueue unsubscribedChannels = LettuceFactories.newBlockingQueue(); + + RedisPubSubListener listener = new RedisPubSubAdapter() { + + @Override + public void ssubscribed(String channel, long count) { + subscribedChannels.add(channel); + } + + @Override + public void sunsubscribed(String channel, long count) { + unsubscribedChannels.add(channel); + } + + }; + + pubsub.getStatefulConnection().addListener(listener); + pubsub.ssubscribe(shardChannel); + + assertThat(subscribedChannels.take()).isEqualTo(shardChannel); + + pubsub.sunsubscribe(shardChannel); + assertThat(unsubscribedChannels.take()).isEqualTo(shardChannel); + + assertThat(subscribedChannels.poll(50, TimeUnit.MILLISECONDS)).isNull(); + + pubsub.getStatefulConnection().removeListener(listener); + } + } diff --git a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java index ee824c856c..5228050f20 100644 --- a/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImplUnitTests.java @@ -17,13 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.*; -import io.lettuce.core.protocol.AsyncCommand; -import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; -import io.lettuce.core.pubsub.PubSubOutput; -import io.lettuce.core.pubsub.RedisPubSubAdapter; - import java.lang.reflect.Field; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.Arrays; import java.util.HashSet; @@ -132,74 +126,27 @@ void resubscribeChannelAndPatternAndShardChanelSubscription() { @Test void autoResubscribeListenerIsRegistered() { - // Verify that the connection has the markIntentionalUnsubscribe method - // This confirms the auto-resubscribe functionality is available connection.markIntentionalUnsubscribe("test-channel"); - // If no exception is thrown, the method exists and works assertTrue(true); } @Test void intentionalUnsubscribeBypassesAutoResubscribe() throws Exception { - // Test 1: Intentional unsubscribe should NOT trigger auto-resubscribe - - // Create a mock async commands to verify ssubscribe is NOT called - RedisPubSubAsyncCommands mockAsync = mock(RedisPubSubAsyncCommands.class); - StatefulRedisPubSubConnectionImpl spyConnection = spy(connection); - when(spyConnection.async()).thenReturn(mockAsync); - - // Mark the channel as intentionally unsubscribed - spyConnection.markIntentionalUnsubscribe("test-channel"); - - // Use reflection to access the private endpoint and trigger sunsubscribed event - PubSubEndpoint endpoint = getEndpointViaReflection(spyConnection); - PubSubOutput sunsubscribeMessage = createSunsubscribeMessage("test-channel", codec); - endpoint.notifyMessage(sunsubscribeMessage); + connection.markIntentionalUnsubscribe("test-channel"); - // Wait a moment for any async processing - Thread.sleep(50); + RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(connection); - // Verify that ssubscribe was NOT called (intentional unsubscribe bypassed auto-resubscribe) - verify(mockAsync, never()).ssubscribe("test-channel"); + autoResubscribeListener.sunsubscribed("test-channel", 0); + verify(mockedWriter, never()).write(any(io.lettuce.core.protocol.RedisCommand.class)); } @Test void unintentionalUnsubscribeTriggersAutoResubscribe() throws Exception { - // Test 2: Unintentional unsubscribe (from Redis) should trigger auto-resubscribe - - // Create a fresh connection with a mock async - PubSubEndpoint mockEndpoint = mock(PubSubEndpoint.class); - StatefulRedisPubSubConnectionImpl testConnection = new StatefulRedisPubSubConnectionImpl<>(mockEndpoint, - mockedWriter, codec, timeout); - - // Create a mock async commands to verify ssubscribe IS called - RedisPubSubAsyncCommands mockAsync = mock(RedisPubSubAsyncCommands.class); - @SuppressWarnings("unchecked") - RedisFuture mockFuture = mock(RedisFuture.class); - when(mockAsync.ssubscribe("test-channel")).thenReturn(mockFuture); - - StatefulRedisPubSubConnectionImpl spyConnection = spy(testConnection); - when(spyConnection.async()).thenReturn(mockAsync); + RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(connection); - // Get the auto-resubscribe listener directly and trigger it - RedisPubSubListener autoResubscribeListener = getAutoResubscribeListener(spyConnection); - - // Do NOT mark as intentional - simulate Redis server sunsubscribe during slot movement autoResubscribeListener.sunsubscribed("test-channel", 0); - // Wait a moment for async processing - Thread.sleep(50); - - // Verify that ssubscribe WAS called (auto-resubscribe triggered) - verify(mockAsync, times(1)).ssubscribe("test-channel"); - } - - @SuppressWarnings("unchecked") - private PubSubEndpoint getEndpointViaReflection( - StatefulRedisPubSubConnectionImpl connection) throws Exception { - Field endpointField = StatefulRedisPubSubConnectionImpl.class.getDeclaredField("endpoint"); - endpointField.setAccessible(true); - return (PubSubEndpoint) endpointField.get(connection); + verify(mockedWriter, times(1)).write(any(io.lettuce.core.protocol.RedisCommand.class)); } @SuppressWarnings("unchecked") @@ -209,13 +156,4 @@ private RedisPubSubListener getAutoResubscribeListener( listenerField.setAccessible(true); return (RedisPubSubListener) listenerField.get(connection); } - - private PubSubOutput createSunsubscribeMessage(String channel, RedisCodec codec) { - PubSubOutput output = new PubSubOutput<>(codec); - output.set(ByteBuffer.wrap("sunsubscribe".getBytes())); - output.set(ByteBuffer.wrap(channel.getBytes())); - output.set(0L); // count - return output; - } - }