Skip to content

Commit

Permalink
fix: use thread local variables instead of relying on GVL for thread …
Browse files Browse the repository at this point in the history
…safety (#128)
  • Loading branch information
supercaracal committed Sep 21, 2022
1 parent 12e8880 commit fad3e42
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 45 deletions.
46 changes: 32 additions & 14 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,36 @@ def build_connection_prelude
class << self
def load_info(options, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size
node_info_list = Array.new(startup_size)
errors = Array.new(startup_size)
node_info_list = errors = nil
startup_options = options.to_a.sample(MAX_STARTUP_SAMPLE).to_h
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, **kwargs)
startup_nodes.each_slice(MAX_THREADS).with_index do |chuncked_startup_nodes, chuncked_idx|
threads = chuncked_startup_nodes.each_with_index.map do |raw_client, idx|
Thread.new(raw_client, (MAX_THREADS * chuncked_idx) + idx) do |cli, i|
Thread.pass
Thread.current.thread_variable_set(:index, i)
reply = cli.call('CLUSTER', 'NODES')
node_info_list[i] = parse_node_info(reply)
Thread.current.thread_variable_set(:info, parse_node_info(reply))
rescue StandardError => e
errors[i] = e
Thread.current.thread_variable_set(:error, e)
ensure
cli&.close
end
end

threads.each(&:join)
threads.each do |t|
t.join
if t.thread_variable?(:info)
node_info_list ||= Array.new(startup_size)
node_info_list[t.thread_variable_get(:index)] = t.thread_variable_get(:info)
elsif t.thread_variable?(:error)
errors ||= Array.new(startup_size)
errors[t.thread_variable_get(:index)] = t.thread_variable_get(:error)
end
end
end

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.all?(&:nil?)
raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?

grouped = node_info_list.compact.group_by do |rows|
rows.sort_by { |row| row[:id] }
Expand Down Expand Up @@ -147,7 +156,7 @@ def call_replicas(method, command, args, &block)

def send_ping(method, command, args, &block)
result_values, errors = call_multiple_nodes(@topology.clients, method, command, args, &block)
return result_values if errors.empty?
return result_values if errors.nil? || errors.empty?

raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)

Expand Down Expand Up @@ -228,26 +237,35 @@ def call_multiple_nodes(clients, method, command, args, &block)

def call_multiple_nodes!(clients, method, command, args, &block)
result_values, errors = call_multiple_nodes(clients, method, command, args, &block)
return result_values if errors.empty?
return result_values if errors.nil? || errors.empty?

raise ::RedisClient::Cluster::ErrorCollection, errors
end

def try_map(clients) # rubocop:disable Metrics/MethodLength
results = {}
errors = {}
def try_map(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
results = errors = nil
clients.each_slice(MAX_THREADS) do |chuncked_clients|
threads = chuncked_clients.map do |k, v|
Thread.new(k, v) do |node_key, client|
Thread.pass
Thread.current.thread_variable_set(:node_key, node_key)
reply = yield(node_key, client)
results[node_key] = reply unless reply.nil?
Thread.current.thread_variable_set(:result, reply)
rescue StandardError => e
errors[node_key] = e
Thread.current.thread_variable_set(:error, e)
end
end

threads.each(&:join)
threads.each do |t|
t.join
if t.thread_variable?(:result)
results ||= {}
results[t.thread_variable_get(:node_key)] = t.thread_variable_get(:result)
elsif t.thread_variable?(:error)
errors ||= {}
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
end
end
end

[results, errors]
Expand Down
14 changes: 9 additions & 5 deletions lib/redis_client/cluster/node/latency_replica.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ def any_replica_node_key(seed: nil)
private

def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
latencies = {}

latencies = nil
clients.each_slice(::RedisClient::Cluster::Node::MAX_THREADS) do |chuncked_clients|
threads = chuncked_clients.map do |k, v|
Thread.new(k, v) do |node_key, client|
Thread.pass
Thread.current.thread_variable_set(:node_key, node_key)

min = DUMMY_LATENCY_NSEC
MEASURE_ATTEMPT_COUNT.times do
Expand All @@ -55,13 +55,17 @@ def measure_latencies(clients) # rubocop:disable Metrics/MethodLength, Metrics/A
min = duration if duration < min
end

latencies[node_key] = min
Thread.current.thread_variable_set(:latency, min)
rescue StandardError
latencies[node_key] = DUMMY_LATENCY_NSEC
Thread.current.thread_variable_set(:latency, DUMMY_LATENCY_NSEC)
end
end

threads.each(&:join)
threads.each do |t|
t.join
latencies ||= {}
latencies[t.thread_variable_get(:node_key)] = t.thread_variable_get(:latency)
end
end

latencies
Expand Down
59 changes: 36 additions & 23 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,37 @@ def initialize(router, command_builder, seed: Random.new_seed)
def call(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :call_v, command, block])
add_row(node_key, [@size, :call_v, command, block])
end

def call_v(args, &block)
command = @command_builder.generate(args)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :call_v, command, block])
add_row(node_key, [@size, :call_v, command, block])
end

def call_once(*args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :call_once_v, command, block])
add_row(node_key, [@size, :call_once_v, command, block])
end

def call_once_v(args, &block)
command = @command_builder.generate(args)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :call_once_v, command, block])
add_row(node_key, [@size, :call_once_v, command, block])
end

def blocking_call(timeout, *args, **kwargs, &block)
command = @command_builder.generate(args, kwargs)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :blocking_call_v, timeout, command, block])
add_row(node_key, [@size, :blocking_call_v, timeout, command, block])
end

def blocking_call_v(timeout, args, &block)
command = @command_builder.generate(args)
node_key = @router.find_node_key(command, seed: @seed)
add_line(node_key, [@size, :blocking_call_v, timeout, command, block])
add_row(node_key, [@size, :blocking_call_v, timeout, command, block])
end

def empty?
Expand All @@ -59,44 +59,57 @@ def empty?

# TODO: https://github.com/redis-rb/redis-cluster-client/issues/37 handle redirections
def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
all_replies = Array.new(@size)
errors = {}
all_replies = errors = nil
@grouped.each_slice(MAX_THREADS) do |chuncked_grouped|
threads = chuncked_grouped.map do |k, v|
Thread.new(@router, k, v) do |router, node_key, rows|
Thread.pass
replies = router.find_node(node_key).pipelined do |pipeline|
rows.each do |row|
case row.size
when 4 then pipeline.send(row[1], row[2], &row[3])
when 5 then pipeline.send(row[1], row[2], row[3], &row[4])
end
end
end

replies = do_pipelining(router.find_node(node_key), rows)
raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size

rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
Thread.current.thread_variable_set(:rows, rows)
Thread.current.thread_variable_set(:replies, replies)
rescue StandardError => e
errors[node_key] = e
Thread.current.thread_variable_set(:node_key, node_key)
Thread.current.thread_variable_set(:error, e)
end
end

threads.each(&:join)
threads.each do |t|
t.join
if t.thread_variable?(:replies)
all_replies ||= Array.new(@size)
t.thread_variable_get(:rows).each_with_index { |r, i| all_replies[r.first] = t.thread_variable_get(:replies)[i] }
elsif t.thread_variable?(:error)
errors ||= {}
errors[t.thread_variable_get(:node_key)] = t.thread_variable_get(:error)
end
end
end

return all_replies if errors.empty?
return all_replies if errors.nil?

raise ::RedisClient::Cluster::ErrorCollection, errors
end

private

def add_line(node_key, line)
def add_row(node_key, row)
@grouped[node_key] = [] unless @grouped.key?(node_key)
@grouped[node_key] << line
@grouped[node_key] << row
@size += 1
end

def do_pipelining(node, rows)
node.pipelined do |pipeline|
rows.each do |row|
case row.size
when 4 then pipeline.send(row[1], row[2], &row[3])
when 5 then pipeline.send(row[1], row[2], row[3], &row[4])
end
end
end
end
end
end
end
11 changes: 8 additions & 3 deletions test/redis_client/cluster/test_normalized_cmd_name.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,22 @@ def test_thread_safety
threads = results.each_with_index.map do |_, i|
Thread.new do
Thread.pass
Thread.current.thread_variable_set(:index, i)
if i.even?
results[i] = @subject.get_by_command(%w[SET foo bar]) == 'set'
Thread.current.thread_variable_set(:result, @subject.get_by_command(%w[SET foo bar]) == 'set')
else
@subject.clear
end
rescue StandardError
results[i] = false
Thread.current.thread_variable_set(:result, false)
end
end

threads.each(&:join)
threads.each do |t|
t.join
results[t.thread_variable_get(:index)] = t.thread_variable_get(:result)
end

refute_includes(results, false)
end
end
Expand Down
23 changes: 23 additions & 0 deletions test/redis_client/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,29 @@ def test_pipelined
assert_equal(want, got)
end

def test_pipelined_with_errors
assert_raises(::RedisClient::Cluster::ErrorCollection) do
@client.pipelined do |pipeline|
10.times do |i|
pipeline.call('SET', "string#{i}", i)
pipeline.call('SET', "string#{i}", i, 'too many args')
pipeline.call('SET', "string#{i}", i + 10)
end
end
end

wait_for_replication

10.times { |i| assert_equal((i + 10).to_s, @client.call('GET', "string#{i}")) }
end

def test_pipelined_with_many_commands
@client.pipelined { |pi| 1000.times { |i| pi.call('SET', i, i) } }
wait_for_replication
results = @client.pipelined { |pi| 1000.times { |i| pi.call('GET', i) } }
results.each_with_index { |got, i| assert_equal(i.to_s, got) }
end

def test_pubsub
10.times do |i|
pubsub = @client.pubsub
Expand Down

0 comments on commit fad3e42

Please sign in to comment.