diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index 57ce4a52..54d54b05 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -39,27 +39,36 @@ def build_connection_prelude class << self def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size - node_info_list = Array.new(startup_size) - errors = Array.new(startup_size) + node_info_list = errors = nil startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs) startup_nodes.each_slice(MAX_THREADS).with_index do |chuncked_startup_nodes, chuncked_idx| threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx| Thread.new(raw_client, (MAX_THREADS * chuncked_idx) + idx) do |cli, i| Thread.pass + Thread.current.thread_variable_set(:index, i) reply = cli.call('CLUSTER', 'NODES') - node_info_list[i] = parse_node_info(reply) + Thread.current.thread_variable_set(:info, parse_node_info(reply)) rescue StandardError => e - errors[i] = e + Thread.current.thread_variable_set(:error, e) ensure cli&.close end end - threads.each(&:join) + threads.each do |t| + t.join + if t.thread_variable?(:info) + node_info_list ||= Array.new(startup_size) + node_info_list[t.thread_variable_get(:index)] = t.thread_variable_get(:info) + elsif t.thread_variable?(:error) + errors ||= Array.new(startup_size) + errors[t.thread_variable_get(:index)] = t.thread_variable_get(:error) + end + end end - raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.all?(&:nil?) + raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil? grouped = node_info_list.compact.group_by do |rows| rows.sort_by { |row| row[:id] } @@ -147,7 +156,7 @@ def call_replicas(method, command, args, &block) def send_ping(method, command, args, &block) result_values, errors = call_multiple_nodes(@topology.clients, method, command, args, &block) - return result_values if errors.empty? + return result_values if errors.nil? || errors.empty? raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError) @@ -228,26 +237,35 @@ def call_multiple_nodes(clients, method, command, args, &block) def call_multiple_nodes!(clients, method, command, args, &block) result_values, errors = call_multiple_nodes(clients, method, command, args, &block) - return result_values if errors.empty? + return result_values if errors.nil? || errors.empty? raise ::RedisClient::Cluster::ErrorCollection, errors end - def try_map(clients) # rubocop:disable Metrics/MethodLength - results = {} - errors = {} + def try_map(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + results = errors = nil clients.each_slice(MAX_THREADS) do |chuncked_clients| threads = chuncked_clients.map do |k, v| Thread.new(k, v) do |node_key, client| Thread.pass + Thread.current.thread_variable_set(:node_key, node_key) reply = yield(node_key, client) - results[node_key] = reply unless reply.nil? + Thread.current.thread_variable_set(:result, reply) rescue StandardError => e - errors[node_key] = e + Thread.current.thread_variable_set(:error, e) end end - threads.each(&:join) + threads.each do |t| + t.join + if t.thread_variable?(:result) + results ||= {} + results[t.thread_variable_get(:node_key)] = t.thread_variable_get(:result) + elsif t.thread_variable?(:error) + errors ||= {} + errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error) + end + end end [results, errors] diff --git a/lib/redis_client/cluster/node/latency_replica.rb b/lib/redis_client/cluster/node/latency_replica.rb index 3286d6be..43623a50 100644 --- a/lib/redis_client/cluster/node/latency_replica.rb +++ b/lib/redis_client/cluster/node/latency_replica.rb @@ -40,12 +40,12 @@ def any_replica_node_key(seed: nil) private def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize - latencies = {} - + latencies = nil clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS) do |chuncked_clients| threads = chuncked_clients.map do |k, v| Thread.new(k, v) do |node_key, client| Thread.pass + Thread.current.thread_variable_set(:node_key, node_key) min = DUMMY_LATENCY_NSEC MEASURE_ATTEMPT_COUNT.times do @@ -55,13 +55,17 @@ def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/A min = duration if duration < min end - latencies[node_key] = min + Thread.current.thread_variable_set(:latency, min) rescue StandardError - latencies[node_key] = DUMMY_LATENCY_NSEC + Thread.current.thread_variable_set(:latency, DUMMY_LATENCY_NSEC) end end - threads.each(&:join) + threads.each do |t| + t.join + latencies ||= {} + latencies[t.thread_variable_get(:node_key)] = t.thread_variable_get(:latency) + end end latencies diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 5523ff2d..759f1a1b 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -20,37 +20,37 @@ def initialize(router, command_builder, seed: Random.new_seed) def call(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :call_v, command, block]) + add_row(node_key, [@size, :call_v, command, block]) end def call_v(args, &block) command = @command_builder.generate(args) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :call_v, command, block]) + add_row(node_key, [@size, :call_v, command, block]) end def call_once(*args, **kwargs, &block) command = @command_builder.generate(args, kwargs) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :call_once_v, command, block]) + add_row(node_key, [@size, :call_once_v, command, block]) end def call_once_v(args, &block) command = @command_builder.generate(args) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :call_once_v, command, block]) + add_row(node_key, [@size, :call_once_v, command, block]) end def blocking_call(timeout, *args, **kwargs, &block) command = @command_builder.generate(args, kwargs) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :blocking_call_v, timeout, command, block]) + add_row(node_key, [@size, :blocking_call_v, timeout, command, block]) end def blocking_call_v(timeout, args, &block) command = @command_builder.generate(args) node_key = @router.find_node_key(command, seed: @seed) - add_line(node_key, [@size, :blocking_call_v, timeout, command, block]) + add_row(node_key, [@size, :blocking_call_v, timeout, command, block]) end def empty? @@ -59,44 +59,57 @@ def empty? # TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity - all_replies = Array.new(@size) - errors = {} + all_replies = errors = nil @grouped.each_slice(MAX_THREADS) do |chuncked_grouped| threads = chuncked_grouped.map do |k, v| Thread.new(@router, k, v) do |router, node_key, rows| Thread.pass - replies = router.find_node(node_key).pipelined do |pipeline| - rows.each do |row| - case row.size - when 4 then pipeline.send(row[1], row[2], &row[3]) - when 5 then pipeline.send(row[1], row[2], row[3], &row[4]) - end - end - end - + replies = do_pipelining(router.find_node(node_key), rows) raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size - rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] } + Thread.current.thread_variable_set(:rows, rows) + Thread.current.thread_variable_set(:replies, replies) rescue StandardError => e - errors[node_key] = e + Thread.current.thread_variable_set(:node_key, node_key) + Thread.current.thread_variable_set(:error, e) end end - threads.each(&:join) + threads.each do |t| + t.join + if t.thread_variable?(:replies) + all_replies ||= Array.new(@size) + t.thread_variable_get(:rows).each_with_index { |r, i| all_replies[r.first] = t.thread_variable_get(:replies)[i] } + elsif t.thread_variable?(:error) + errors ||= {} + errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error) + end + end end - return all_replies if errors.empty? + return all_replies if errors.nil? raise ::RedisClient::Cluster::ErrorCollection, errors end private - def add_line(node_key, line) + def add_row(node_key, row) @grouped[node_key] = [] unless @grouped.key?(node_key) - @grouped[node_key] << line + @grouped[node_key] << row @size += 1 end + + def do_pipelining(node, rows) + node.pipelined do |pipeline| + rows.each do |row| + case row.size + when 4 then pipeline.send(row[1], row[2], &row[3]) + when 5 then pipeline.send(row[1], row[2], row[3], &row[4]) + end + end + end + end end end end diff --git a/test/redis_client/cluster/test_normalized_cmd_name.rb b/test/redis_client/cluster/test_normalized_cmd_name.rb index 0c42ace6..1c838625 100644 --- a/test/redis_client/cluster/test_normalized_cmd_name.rb +++ b/test/redis_client/cluster/test_normalized_cmd_name.rb @@ -67,17 +67,22 @@ def test_thread_safety threads = results.each_with_index.map do |_, i| Thread.new do Thread.pass + Thread.current.thread_variable_set(:index, i) if i.even? - results[i] = @subject.get_by_command(%w[SET foo bar]) == 'set' + Thread.current.thread_variable_set(:result, @subject.get_by_command(%w[SET foo bar]) == 'set') else @subject.clear end rescue StandardError - results[i] = false + Thread.current.thread_variable_set(:result, false) end end - threads.each(&:join) + threads.each do |t| + t.join + results[t.thread_variable_get(:index)] = t.thread_variable_get(:result) + end + refute_includes(results, false) end end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index 8c65b5a3..bd51b68f 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -139,6 +139,29 @@ def test_pipelined assert_equal(want, got) end + def test_pipelined_with_errors + assert_raises(::RedisClient::Cluster::ErrorCollection) do + @client.pipelined do |pipeline| + 10.times do |i| + pipeline.call('SET', "string#{i}", i) + pipeline.call('SET', "string#{i}", i, 'too many args') + pipeline.call('SET', "string#{i}", i + 10) + end + end + end + + wait_for_replication + + 10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) } + end + + def test_pipelined_with_many_commands + @client.pipelined { |pi| 1000.times { |i| pi.call('SET', i, i) } } + wait_for_replication + results = @client.pipelined { |pi| 1000.times { |i| pi.call('GET', i) } } + results.each_with_index { |got, i| assert_equal(i.to_s, got) } + end + def test_pubsub 10.times do |i| pubsub = @client.pubsub