Skip to content

Commit

Permalink
Some reorganization to make the memcached commands clearer. Added qui…
Browse files Browse the repository at this point in the history
…et versions of a number of commands (still need tests). Some additional comments.
  • Loading branch information
petergoldstein committed Dec 7, 2021
1 parent 4fe2bdf commit 27d444d
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 155 deletions.
6 changes: 3 additions & 3 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
Metrics/AbcSize:
Max: 19

# Offense count: 4
# Offense count: 5
# Configuration parameters: CountComments, CountAsOne.
Metrics/ClassLength:
Max: 387
Max: 241

# Offense count: 6
# Offense count: 5
# Configuration parameters: CountComments, CountAsOne, ExcludedMethods, IgnoredMethods.
Metrics/MethodLength:
Exclude:
Expand Down
256 changes: 144 additions & 112 deletions lib/dalli/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,42 @@ def initialize(servers = nil, options = {})
# The standard memcached instruction set
#

##
# Turn on quiet aka noreply support.
# All relevant operations within this block will be effectively
# pipelined as Dalli will use 'quiet' operations where possible.
# Currently supports the set, add, replace and delete operations.
def multi
old = Thread.current[::Dalli::QUIET]
Thread.current[::Dalli::QUIET] = true
yield
ensure
@ring&.pipeline_consume_and_ignore_responses
Thread.current[::Dalli::QUIET] = old
end

##
# Get the value associated with the key.
# If a value is not found, then +nil+ is returned.
def get(key, req_options = nil)
perform(:get, key, req_options)
end

##
# Gat (get and touch) fetch an item and simultaneously update its expiration time.
#
# If a value is not found, then +nil+ is returned.
def gat(key, ttl = nil)
perform(:gat, key, ttl_or_default(ttl))
end

##
# Touch updates expiration time for a given key.
#
# Returns true if key exists, otherwise nil.
def touch(key, ttl = nil)
resp = perform(:touch, key, ttl_or_default(ttl))
resp.nil? ? nil : true
end

##
# Get the value and CAS ID associated with the key. If a block is provided,
# value and CAS will be passed to the block.
def get_cas(key)
(value, cas) = perform(:cas, key)
# TODO: This is odd. Confirm this is working as expected.
value = nil if !value || value == 'Not found'
return [value, cas] unless block_given?

yield value, cas
end

##
# Fetch multiple keys efficiently.
# If a block is given, yields key/value pairs one at a time.
Expand All @@ -97,7 +112,21 @@ def get_multi(*keys)
end
end

CACHE_NILS = { cache_nils: true }.freeze
##
# Fetch multiple keys efficiently, including available metadata such as CAS.
# If a block is given, yields key/data pairs one a time. Data is an array:
# [value, cas_id]
# If no block is given, returns a hash of
# { 'key' => [value, cas_id] }
def get_multi_cas(*keys)
if block_given?
pipelined_getter.process(keys) { |*args| yield(*args) }
else
{}.tap do |hash|
pipelined_getter.process(keys) { |k, data| hash[k] = data }
end
end
end

# Fetch the value associated with the key.
# If a value is found, then it is returned.
Expand All @@ -110,19 +139,11 @@ def get_multi(*keys)
def fetch(key, ttl = nil, req_options = nil)
req_options = req_options.nil? ? CACHE_NILS : req_options.merge(CACHE_NILS) if cache_nils
val = get(key, req_options)
if not_found?(val) && block_given?
val = yield
add(key, val, ttl_or_default(ttl), req_options)
end
val
end
return val unless block_given? && not_found?(val)

def not_found?(val)
cache_nils ? val == ::Dalli::NOT_FOUND : val.nil?
end

def cache_nils
@options[:cache_nils]
new_val = yield
add(key, new_val, ttl_or_default(ttl), req_options)
new_val
end

##
Expand Down Expand Up @@ -151,8 +172,42 @@ def cas!(key, ttl = nil, req_options = nil, &block)
cas_core(key, true, ttl, req_options, &block)
end

##
# Turn on quiet aka noreply support for a number of
# memcached operations.
#
# All relevant operations within this block will be effectively
# pipelined as Dalli will use 'quiet' versions. The invoked methods
# will all return nil, rather than their usual response. Method
# latency will be substantially lower, as the caller will not be
# blocking on responses.
#
# Currently supports storage (set, add, replace, append, prepend),
# arithmetic (incr, decr), flush and delete operations. Use of
# unsupported operations inside a block will raise an error.
#
# Any error replies will be discarded at the end of the block, and
# Dalli client methods invoked inside the block will not
# have return values
def quiet
old = Thread.current[::Dalli::QUIET]
Thread.current[::Dalli::QUIET] = true
yield
ensure
@ring&.pipeline_consume_and_ignore_responses
Thread.current[::Dalli::QUIET] = old
end
alias multi quiet

def set(key, value, ttl = nil, req_options = nil)
perform(:set, key, value, ttl_or_default(ttl), 0, req_options)
set_cas(key, value, 0, ttl, req_options)
end

##
# Set the key-value pair, verifying existing CAS.
# Returns the resulting CAS value if succeeded, and falsy otherwise.
def set_cas(key, value, cas, ttl = nil, req_options = nil)
perform(:set, key, value, ttl_or_default(ttl), cas, req_options)
end

##
Expand All @@ -166,11 +221,25 @@ def add(key, value, ttl = nil, req_options = nil)
# Conditionally add a key/value pair, only if the key already exists
# on the server. Returns truthy if the operation succeeded.
def replace(key, value, ttl = nil, req_options = nil)
perform(:replace, key, value, ttl_or_default(ttl), 0, req_options)
replace_cas(key, value, 0, ttl, req_options)
end

##
# Conditionally add a key/value pair, verifying existing CAS, only if the
# key already exists on the server. Returns the new CAS value if the
# operation succeeded, or falsy otherwise.
def replace_cas(key, value, cas, ttl = nil, req_options = nil)
perform(:replace, key, value, ttl_or_default(ttl), cas, req_options)
end

# Delete a key/value pair, verifying existing CAS.
# Returns true if succeeded, and falsy otherwise.
def delete_cas(key, cas = 0)
perform(:delete, key, cas)
end

def delete(key)
perform(:delete, key, 0)
delete_cas(key, 0)
end

##
Expand All @@ -187,16 +256,6 @@ def prepend(key, value)
perform(:prepend, key, value.to_s)
end

##
# Flush the memcached server, at 'delay' seconds in the future.
# Delay defaults to zero seconds, which means an immediate flush.
##
def flush(delay = 0)
ring.servers.map { |s| s.request(:flush, delay) }
end

alias flush_all flush

##
# Incr adds the given amount to the counter on the memcached server.
# Amt must be a positive integer value.
Expand All @@ -208,8 +267,10 @@ def flush(delay = 0)
# Note that the ttl will only apply if the counter does not already
# exist. To increase an existing counter and update its TTL, use
# #cas.
#
# If the value already exists, it must have been set with raw: true
def incr(key, amt = 1, ttl = nil, default = nil)
raise ArgumentError, "Positive values only: #{amt}" if amt.negative?
check_positive!(amt)

perform(:incr, key, amt.to_i, ttl_or_default(ttl), default)
end
Expand All @@ -228,35 +289,31 @@ def incr(key, amt = 1, ttl = nil, default = nil)
# Note that the ttl will only apply if the counter does not already
# exist. To decrease an existing counter and update its TTL, use
# #cas.
#
# If the value already exists, it must have been set with raw: true
def decr(key, amt = 1, ttl = nil, default = nil)
raise ArgumentError, "Positive values only: #{amt}" if amt.negative?
check_positive!(amt)

perform(:decr, key, amt.to_i, ttl_or_default(ttl), default)
end

##
# Touch updates expiration time for a given key.
#
# Returns true if key exists, otherwise nil.
def touch(key, ttl = nil)
resp = perform(:touch, key, ttl_or_default(ttl))
resp.nil? ? nil : true
end

# Flush the memcached server, at 'delay' seconds in the future.
# Delay defaults to zero seconds, which means an immediate flush.
##
# Gat (get and touch) fetch an item and simultaneously update its expiration time.
#
# If a value is not found, then +nil+ is returned.
def gat(key, ttl = nil)
perform(:gat, key, ttl_or_default(ttl))
def flush(delay = 0)
ring.servers.map { |s| s.request(:flush, delay) }
end
alias flush_all flush

ALLOWED_STAT_KEYS = %i[items slabs settings].freeze

##
# Collect the stats for each server.
# You can optionally pass a type including :items, :slabs or :settings to get specific stats
# Returns a hash like { 'hostname:port' => { 'stat1' => 'value1', ... }, 'hostname2:port' => { ... } }
def stats(type = nil)
type = nil unless [nil, :items, :slabs, :settings].include? type
type = nil unless ALLOWED_STAT_KEYS.include? type
values = {}
ring.servers.each do |server|
values[server.name.to_s] = server.alive? ? server.request(:stats, type.to_s) : nil
Expand All @@ -272,12 +329,6 @@ def reset_stats
end
end

##
## Make sure memcache servers are alive, or raise an Dalli::RingError
def alive!
ring.server_for_key('')
end

##
## Version of the memcache servers.
def version
Expand All @@ -289,55 +340,9 @@ def version
end

##
# Get the value and CAS ID associated with the key. If a block is provided,
# value and CAS will be passed to the block.
def get_cas(key)
(value, cas) = perform(:cas, key)
value = nil if !value || value == 'Not found'
if block_given?
yield value, cas
else
[value, cas]
end
end

##
# Fetch multiple keys efficiently, including available metadata such as CAS.
# If a block is given, yields key/data pairs one a time. Data is an array:
# [value, cas_id]
# If no block is given, returns a hash of
# { 'key' => [value, cas_id] }
def get_multi_cas(*keys)
if block_given?
pipelined_getter.process(keys) { |*args| yield(*args) }
else
{}.tap do |hash|
pipelined_getter.process(keys) { |k, data| hash[k] = data }
end
end
end

##
# Set the key-value pair, verifying existing CAS.
# Returns the resulting CAS value if succeeded, and falsy otherwise.
def set_cas(key, value, cas, ttl = nil, req_options = nil)
ttl ||= @options[:expires_in].to_i
perform(:set, key, value, ttl, cas, req_options)
end

##
# Conditionally add a key/value pair, verifying existing CAS, only if the
# key already exists on the server. Returns the new CAS value if the
# operation succeeded, or falsy otherwise.
def replace_cas(key, value, cas, ttl = nil, req_options = nil)
ttl ||= @options[:expires_in].to_i
perform(:replace, key, value, ttl, cas, req_options)
end

# Delete a key/value pair, verifying existing CAS.
# Returns true if succeeded, and falsy otherwise.
def delete_cas(key, cas = 0)
perform(:delete, key, cas)
## Make sure memcache servers are alive, or raise an Dalli::RingError
def alive!
ring.server_for_key('')
end

##
Expand All @@ -349,13 +354,27 @@ def close
end
alias reset close

CACHE_NILS = { cache_nils: true }.freeze

def not_found?(val)
cache_nils ? val == ::Dalli::NOT_FOUND : val.nil?
end

def cache_nils
@options[:cache_nils]
end

# Stub method so a bare Dalli client can pretend to be a connection pool.
def with
yield self
end

private

def check_positive!(amt)
raise ArgumentError, "Positive values only: #{amt}" if amt.negative?
end

def cas_core(key, always_set, ttl = nil, req_options = nil)
(value, cas) = perform(:cas, key)
value = nil if !value || value == 'Not found'
Expand All @@ -365,6 +384,10 @@ def cas_core(key, always_set, ttl = nil, req_options = nil)
perform(:set, key, newvalue, ttl_or_default(ttl), cas, req_options)
end

##
# Uses the argument TTL or the client-wide default. Ensures
# that the value is an integer
##
def ttl_or_default(ttl)
(ttl || @options[:expires_in]).to_i
rescue NoMethodError
Expand All @@ -385,7 +408,16 @@ def protocol_implementation
@protocol_implementation ||= @options.fetch(:protocol_implementation, Dalli::Protocol::Binary)
end

# Chokepoint method for instrumentation
##
# Chokepoint method for memcached methods with a key argument.
# Validates the key, resolves the key to the appropriate server
# instance, and invokes the memcached method on the appropriate
# server.
#
# This method also forces retries on network errors - when
# a particular memcached instance becomes unreachable, or the
# operational times out.
##
def perform(*all_args)
return yield if block_given?

Expand Down
Loading

0 comments on commit 27d444d

Please sign in to comment.