From 8cb791f553d13802a64764f37b0b8737f030e959 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Fri, 27 Sep 2024 14:12:22 +0900 Subject: [PATCH] fix: ensure recoverability for pubsub (#383) --- bin/pubsub | 92 +++++++++++++++++++++++++++++ lib/redis_client/cluster/pub_sub.rb | 25 ++++---- lib/redis_client/cluster/router.rb | 4 +- test/cluster_controller.rb | 4 +- 4 files changed, 108 insertions(+), 17 deletions(-) create mode 100755 bin/pubsub diff --git a/bin/pubsub b/bin/pubsub new file mode 100755 index 00000000..06d93a71 --- /dev/null +++ b/bin/pubsub @@ -0,0 +1,92 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require 'bundler/setup' +require 'redis_cluster_client' + +module PubSubDebug + module_function + + def spawn_publisher(cli, chan) + Thread.new(cli, chan) do |r, c| + role = ' Publisher' + i = 0 + + loop do + handle_errors(role) do + msg = format('%05d', i) + r.call('spublish', c, msg) + log "#{role}: sent: #{msg}" + i += 1 + end + ensure + sleep 1.0 + end + rescue StandardError => e + log "#{role}: dead: #{e.class}: #{e.message}" + raise + end + end + + def spawn_subscriber(cli, chan) # rubocop:disable Metrics/AbcSize + Thread.new(cli, chan) do |r, c| + role = 'Subscriber' + ps = nil + + loop do + ps = r.pubsub + ps.call('ssubscribe', c) + log "#{role}: done: subscription started to #{c}" + break + rescue StandardError => e + log "#{role}: init: #{e.class}: #{e.message}" + ps&.close + ensure + sleep 1.0 + end + + loop do + handle_errors('Subscriber') do + e = ps.next_event(0.01) + log "#{role}: recv: #{e.nil? ? 'nil' : e}" + end + ensure + sleep 1.0 + end + rescue StandardError => e + log "#{role}: dead: #{e.class}: #{e.message}" + raise + end + end + + def handle_errors(role) + yield + rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e + log "#{role}: recv: #{e.class}" + rescue RedisClient::CommandError => e + log "#{role}: recv: #{e.class}: #{e.message}" + raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served') + rescue StandardError => e + log "#{role}: recv: #{e.class}: #{e.message}" + raise + end + + def log(msg) + print "#{msg}\n" + end +end + +clients = Array.new(2) { RedisClient.cluster(connect_with_original_config: true).new_client } +threads = [] +channel = 'chan1' + +Signal.trap(:INT) do + threads.each(&:exit) + clients.each(&:close) + PubSubDebug.log("\nBye bye") + exit 0 +end + +threads << PubSubDebug.spawn_subscriber(clients[0], channel) +threads << PubSubDebug.spawn_publisher(clients[1], channel) +threads.each(&:join) diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index fd281674..7c86589a 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'redis_client' +require 'redis_client/cluster/errors' require 'redis_client/cluster/normalized_cmd_name' class RedisClient @@ -91,10 +92,8 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma when ::RedisClient::CommandError raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served') - @router.renew_cluster_state break start_over when ::RedisClient::ConnectionError - @router.renew_cluster_state break start_over when StandardError then raise event when Array then break event @@ -151,25 +150,23 @@ def calc_max_duration(timeout) def handle_connection_error(node_key, ignore: false) yield rescue ::RedisClient::ConnectionError - @state_dict[node_key].close + @state_dict[node_key]&.close @state_dict.delete(node_key) @router.renew_cluster_state raise unless ignore end def start_over - @state_dict.each_value(&:close) - @state_dict.clear - @commands.each do |command| - loop do - _call(command) - break - rescue ::RedisClient::ConnectionError - sleep 1.0 - end + loop do + @router.renew_cluster_state + @state_dict.each_value(&:close) + @state_dict.clear + @queue.clear + @commands.each { |command| _call(command) } + break + rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::NodeMightBeDown + sleep 1.0 end - - nil end end end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 77441b8a..2a6c1e17 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -29,7 +29,7 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @pool = pool @client_kwargs = kwargs @node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs) - renew_cluster_state + @node.reload! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder end @@ -241,6 +241,8 @@ def node_keys def renew_cluster_state @node.reload! + rescue ::RedisClient::Cluster::InitialSetupError + # ignore end def close diff --git a/test/cluster_controller.rb b/test/cluster_controller.rb index 1aacc6e2..af3e67c9 100644 --- a/test/cluster_controller.rb +++ b/test/cluster_controller.rb @@ -270,7 +270,7 @@ def flush_all_data(clients) print_debug("#{c.config.host}:#{c.config.port} ... FLUSHALL") rescue ::RedisClient::CommandError, ::RedisClient::ReadOnlyError # READONLY You can't write against a read only replica. - rescue ::RedisClient::CannotConnectError => e + rescue ::RedisClient::ConnectionError => e print_debug("#{c.config.host}:#{c.config.port} ... FLUSHALL: #{e.class}: #{e.message}") end end @@ -279,7 +279,7 @@ def reset_cluster(clients) clients.each do |c| c.call('CLUSTER', 'RESET', 'HARD') print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD") - rescue ::RedisClient::CannotConnectError => e + rescue ::RedisClient::ConnectionError => e print_debug("#{c.config.host}:#{c.config.port} ... CLUSTER RESET HARD: #{e.class}: #{e.message}") end end