diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index e5682777..8dd2738d 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -11,12 +11,12 @@ Metrics/AbcSize: Max: 19 -# Offense count: 4 +# Offense count: 5 # Configuration parameters: CountComments, CountAsOne. Metrics/ClassLength: - Max: 387 + Max: 241 -# Offense count: 6 +# Offense count: 5 # Configuration parameters: CountComments, CountAsOne, ExcludedMethods, IgnoredMethods. Metrics/MethodLength: Exclude: diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index 67564fe2..13f41b19 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -57,20 +57,6 @@ def initialize(servers = nil, options = {}) # The standard memcached instruction set # - ## - # Turn on quiet aka noreply support. - # All relevant operations within this block will be effectively - # pipelined as Dalli will use 'quiet' operations where possible. - # Currently supports the set, add, replace and delete operations. - def multi - old = Thread.current[::Dalli::QUIET] - Thread.current[::Dalli::QUIET] = true - yield - ensure - @ring&.pipeline_consume_and_ignore_responses - Thread.current[::Dalli::QUIET] = old - end - ## # Get the value associated with the key. # If a value is not found, then +nil+ is returned. @@ -78,6 +64,35 @@ def get(key, req_options = nil) perform(:get, key, req_options) end + ## + # Gat (get and touch) fetch an item and simultaneously update its expiration time. + # + # If a value is not found, then +nil+ is returned. + def gat(key, ttl = nil) + perform(:gat, key, ttl_or_default(ttl)) + end + + ## + # Touch updates expiration time for a given key. + # + # Returns true if key exists, otherwise nil. + def touch(key, ttl = nil) + resp = perform(:touch, key, ttl_or_default(ttl)) + resp.nil? ? nil : true + end + + ## + # Get the value and CAS ID associated with the key. If a block is provided, + # value and CAS will be passed to the block. + def get_cas(key) + (value, cas) = perform(:cas, key) + # TODO: This is odd. Confirm this is working as expected. + value = nil if !value || value == 'Not found' + return [value, cas] unless block_given? + + yield value, cas + end + ## # Fetch multiple keys efficiently. # If a block is given, yields key/value pairs one at a time. @@ -97,7 +112,21 @@ def get_multi(*keys) end end - CACHE_NILS = { cache_nils: true }.freeze + ## + # Fetch multiple keys efficiently, including available metadata such as CAS. + # If a block is given, yields key/data pairs one a time. Data is an array: + # [value, cas_id] + # If no block is given, returns a hash of + # { 'key' => [value, cas_id] } + def get_multi_cas(*keys) + if block_given? + pipelined_getter.process(keys) { |*args| yield(*args) } + else + {}.tap do |hash| + pipelined_getter.process(keys) { |k, data| hash[k] = data } + end + end + end # Fetch the value associated with the key. # If a value is found, then it is returned. @@ -110,19 +139,11 @@ def get_multi(*keys) def fetch(key, ttl = nil, req_options = nil) req_options = req_options.nil? ? CACHE_NILS : req_options.merge(CACHE_NILS) if cache_nils val = get(key, req_options) - if not_found?(val) && block_given? - val = yield - add(key, val, ttl_or_default(ttl), req_options) - end - val - end + return val unless block_given? && not_found?(val) - def not_found?(val) - cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? - end - - def cache_nils - @options[:cache_nils] + new_val = yield + add(key, new_val, ttl_or_default(ttl), req_options) + new_val end ## @@ -151,8 +172,42 @@ def cas!(key, ttl = nil, req_options = nil, &block) cas_core(key, true, ttl, req_options, &block) end + ## + # Turn on quiet aka noreply support for a number of + # memcached operations. + # + # All relevant operations within this block will be effectively + # pipelined as Dalli will use 'quiet' versions. The invoked methods + # will all return nil, rather than their usual response. Method + # latency will be substantially lower, as the caller will not be + # blocking on responses. + # + # Currently supports storage (set, add, replace, append, prepend), + # arithmetic (incr, decr), flush and delete operations. Use of + # unsupported operations inside a block will raise an error. + # + # Any error replies will be discarded at the end of the block, and + # Dalli client methods invoked inside the block will not + # have return values + def quiet + old = Thread.current[::Dalli::QUIET] + Thread.current[::Dalli::QUIET] = true + yield + ensure + @ring&.pipeline_consume_and_ignore_responses + Thread.current[::Dalli::QUIET] = old + end + alias multi quiet + def set(key, value, ttl = nil, req_options = nil) - perform(:set, key, value, ttl_or_default(ttl), 0, req_options) + set_cas(key, value, 0, ttl, req_options) + end + + ## + # Set the key-value pair, verifying existing CAS. + # Returns the resulting CAS value if succeeded, and falsy otherwise. + def set_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:set, key, value, ttl_or_default(ttl), cas, req_options) end ## @@ -166,11 +221,25 @@ def add(key, value, ttl = nil, req_options = nil) # Conditionally add a key/value pair, only if the key already exists # on the server. Returns truthy if the operation succeeded. def replace(key, value, ttl = nil, req_options = nil) - perform(:replace, key, value, ttl_or_default(ttl), 0, req_options) + replace_cas(key, value, 0, ttl, req_options) + end + + ## + # Conditionally add a key/value pair, verifying existing CAS, only if the + # key already exists on the server. Returns the new CAS value if the + # operation succeeded, or falsy otherwise. + def replace_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:replace, key, value, ttl_or_default(ttl), cas, req_options) + end + + # Delete a key/value pair, verifying existing CAS. + # Returns true if succeeded, and falsy otherwise. + def delete_cas(key, cas = 0) + perform(:delete, key, cas) end def delete(key) - perform(:delete, key, 0) + delete_cas(key, 0) end ## @@ -187,16 +256,6 @@ def prepend(key, value) perform(:prepend, key, value.to_s) end - ## - # Flush the memcached server, at 'delay' seconds in the future. - # Delay defaults to zero seconds, which means an immediate flush. - ## - def flush(delay = 0) - ring.servers.map { |s| s.request(:flush, delay) } - end - - alias flush_all flush - ## # Incr adds the given amount to the counter on the memcached server. # Amt must be a positive integer value. @@ -208,8 +267,10 @@ def flush(delay = 0) # Note that the ttl will only apply if the counter does not already # exist. To increase an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def incr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:incr, key, amt.to_i, ttl_or_default(ttl), default) end @@ -228,35 +289,31 @@ def incr(key, amt = 1, ttl = nil, default = nil) # Note that the ttl will only apply if the counter does not already # exist. To decrease an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def decr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:decr, key, amt.to_i, ttl_or_default(ttl), default) end ## - # Touch updates expiration time for a given key. - # - # Returns true if key exists, otherwise nil. - def touch(key, ttl = nil) - resp = perform(:touch, key, ttl_or_default(ttl)) - resp.nil? ? nil : true - end - + # Flush the memcached server, at 'delay' seconds in the future. + # Delay defaults to zero seconds, which means an immediate flush. ## - # Gat (get and touch) fetch an item and simultaneously update its expiration time. - # - # If a value is not found, then +nil+ is returned. - def gat(key, ttl = nil) - perform(:gat, key, ttl_or_default(ttl)) + def flush(delay = 0) + ring.servers.map { |s| s.request(:flush, delay) } end + alias flush_all flush + + ALLOWED_STAT_KEYS = %i[items slabs settings].freeze ## # Collect the stats for each server. # You can optionally pass a type including :items, :slabs or :settings to get specific stats # Returns a hash like { 'hostname:port' => { 'stat1' => 'value1', ... }, 'hostname2:port' => { ... } } def stats(type = nil) - type = nil unless [nil, :items, :slabs, :settings].include? type + type = nil unless ALLOWED_STAT_KEYS.include? type values = {} ring.servers.each do |server| values[server.name.to_s] = server.alive? ? server.request(:stats, type.to_s) : nil @@ -272,12 +329,6 @@ def reset_stats end end - ## - ## Make sure memcache servers are alive, or raise an Dalli::RingError - def alive! - ring.server_for_key('') - end - ## ## Version of the memcache servers. def version @@ -289,55 +340,9 @@ def version end ## - # Get the value and CAS ID associated with the key. If a block is provided, - # value and CAS will be passed to the block. - def get_cas(key) - (value, cas) = perform(:cas, key) - value = nil if !value || value == 'Not found' - if block_given? - yield value, cas - else - [value, cas] - end - end - - ## - # Fetch multiple keys efficiently, including available metadata such as CAS. - # If a block is given, yields key/data pairs one a time. Data is an array: - # [value, cas_id] - # If no block is given, returns a hash of - # { 'key' => [value, cas_id] } - def get_multi_cas(*keys) - if block_given? - pipelined_getter.process(keys) { |*args| yield(*args) } - else - {}.tap do |hash| - pipelined_getter.process(keys) { |k, data| hash[k] = data } - end - end - end - - ## - # Set the key-value pair, verifying existing CAS. - # Returns the resulting CAS value if succeeded, and falsy otherwise. - def set_cas(key, value, cas, ttl = nil, req_options = nil) - ttl ||= @options[:expires_in].to_i - perform(:set, key, value, ttl, cas, req_options) - end - - ## - # Conditionally add a key/value pair, verifying existing CAS, only if the - # key already exists on the server. Returns the new CAS value if the - # operation succeeded, or falsy otherwise. - def replace_cas(key, value, cas, ttl = nil, req_options = nil) - ttl ||= @options[:expires_in].to_i - perform(:replace, key, value, ttl, cas, req_options) - end - - # Delete a key/value pair, verifying existing CAS. - # Returns true if succeeded, and falsy otherwise. - def delete_cas(key, cas = 0) - perform(:delete, key, cas) + ## Make sure memcache servers are alive, or raise an Dalli::RingError + def alive! + ring.server_for_key('') end ## @@ -349,6 +354,16 @@ def close end alias reset close + CACHE_NILS = { cache_nils: true }.freeze + + def not_found?(val) + cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? + end + + def cache_nils + @options[:cache_nils] + end + # Stub method so a bare Dalli client can pretend to be a connection pool. def with yield self @@ -356,6 +371,10 @@ def with private + def check_positive!(amt) + raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + end + def cas_core(key, always_set, ttl = nil, req_options = nil) (value, cas) = perform(:cas, key) value = nil if !value || value == 'Not found' @@ -365,6 +384,10 @@ def cas_core(key, always_set, ttl = nil, req_options = nil) perform(:set, key, newvalue, ttl_or_default(ttl), cas, req_options) end + ## + # Uses the argument TTL or the client-wide default. Ensures + # that the value is an integer + ## def ttl_or_default(ttl) (ttl || @options[:expires_in]).to_i rescue NoMethodError @@ -385,7 +408,16 @@ def protocol_implementation @protocol_implementation ||= @options.fetch(:protocol_implementation, Dalli::Protocol::Binary) end - # Chokepoint method for instrumentation + ## + # Chokepoint method for memcached methods with a key argument. + # Validates the key, resolves the key to the appropriate server + # instance, and invokes the memcached method on the appropriate + # server. + # + # This method also forces retries on network errors - when + # a particular memcached instance becomes unreachable, or the + # operational times out. + ## def perform(*all_args) return yield if block_given? diff --git a/lib/dalli/pipelined_getter.rb b/lib/dalli/pipelined_getter.rb index 2a520480..801885cb 100644 --- a/lib/dalli/pipelined_getter.rb +++ b/lib/dalli/pipelined_getter.rb @@ -19,13 +19,7 @@ def process(keys, &block) @ring.lock do servers = setup_requests(keys) start_time = Time.now - loop do - # Remove any servers which are not connected - servers.delete_if { |s| !s.connected? } - break if servers.empty? - - servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) - end + servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? end rescue NetworkError => e Dalli.logger.debug { e.inspect } @@ -44,6 +38,10 @@ def setup_requests(keys) ## # Loop through the server-grouped sets of keys, writing # the corresponding getkq requests to the appropriate servers + # + # It's worth noting that we could potentially reduce bytes + # on the wire by switching from getkq to getq, and using + # the opaque value to match requests to responses. ## def make_getkq_requests(groups) groups.each do |server, keys_for_server| @@ -95,6 +93,10 @@ def abort_without_timeout(servers) end def fetch_responses(servers, start_time, timeout, &block) + # Remove any servers which are not connected + servers.delete_if { |s| !s.connected? } + return [] if servers.empty? + time_left = remaining_time(start_time, timeout) readable_servers = servers_with_response(servers, time_left) if readable_servers.empty? diff --git a/lib/dalli/protocol/binary.rb b/lib/dalli/protocol/binary.rb index 3ad477ba..79eb9477 100644 --- a/lib/dalli/protocol/binary.rb +++ b/lib/dalli/protocol/binary.rb @@ -120,16 +120,18 @@ def pipeline_next_responses @connection_manager.error_on_request!(e) end - # Abort an earlier #pipeline_response_setup. Used to signal an external - # timeout. The underlying socket is disconnected, and the exception is - # swallowed. + # Abort current pipelined get. Generally used to signal an external + # timeout during pipelined get. The underlying socket is + # disconnected, and the exception is swallowed. # # Returns nothing. def pipeline_abort response_buffer.clear @connection_manager.abort_request! - return true unless @sock + return true unless connected? + # Closes the connection, which ensures that our connection + # is in a clean state for future requests @connection_manager.error_on_request!('External timeout') rescue NetworkError true @@ -165,9 +167,9 @@ def verify_state(opkey) verify_allowed_quiet!(opkey) if quiet? end - ALLOWED_MULTI_OPS = %i[add addq delete deleteq replace replaceq set setq noop].freeze + ALLOWED_QUIET_OPS = %i[add delete replace set incr decr append prepend noop flush].freeze def verify_allowed_quiet!(opkey) - return if ALLOWED_MULTI_OPS.include?(opkey) + return if ALLOWED_QUIET_OPS.include?(opkey) raise Dalli::NotPermittedMultiOpError, "The operation #{opkey} is not allowed in a multi block." end @@ -198,7 +200,8 @@ def gat(key, ttl, options = nil) def touch(key, ttl) ttl = TtlSanitizer.sanitize(ttl) - write_generic RequestFormatter.standard_request(opkey: :touch, key: key, ttl: ttl) + write(RequestFormatter.standard_request(opkey: :touch, key: key, ttl: ttl)) + @response_processor.generic_response end # TODO: This is confusing, as there's a cas command in memcached @@ -234,20 +237,23 @@ def storage_req(opkey, key, value, ttl, cas, options) value: value, bitflags: bitflags, ttl: ttl, cas: cas) write(req) - @response_processor.cas_response unless quiet? + @response_processor.storage_response unless quiet? end # rubocop:enable Metrics/ParameterLists def append(key, value) - write_append_prepend :append, key, value + opkey = quiet? ? :appendq : :append + write_append_prepend opkey, key, value end def prepend(key, value) - write_append_prepend :prepend, key, value + opkey = quiet? ? :prependq : :prepend + write_append_prepend opkey, key, value end def write_append_prepend(opkey, key, value) - write_generic RequestFormatter.standard_request(opkey: opkey, key: key, value: value) + write(RequestFormatter.standard_request(opkey: opkey, key: key, value: value)) + @response_processor.no_body_response unless quiet? end # Delete Commands @@ -255,16 +261,18 @@ def delete(key, cas) opkey = quiet? ? :deleteq : :delete req = RequestFormatter.standard_request(opkey: opkey, key: key, cas: cas) write(req) - @response_processor.generic_response unless quiet? + @response_processor.no_body_response unless quiet? end # Arithmetic Commands def decr(key, count, ttl, initial) - decr_incr :decr, key, count, ttl, initial + opkey = quiet? ? :decrq : :decr + decr_incr opkey, key, count, ttl, initial end def incr(key, count, ttl, initial) - decr_incr :incr, key, count, ttl, initial + opkey = quiet? ? :incrq : :incr + decr_incr opkey, key, count, ttl, initial end # This allows us to special case a nil initial value, and @@ -279,14 +287,14 @@ def decr_incr(opkey, key, count, ttl, initial) initial ||= 0 write(RequestFormatter.decr_incr_request(opkey: opkey, key: key, count: count, initial: initial, expiry: expiry)) - @response_processor.decr_incr_response + @response_processor.decr_incr_response unless quiet? end # Other Commands def flush(ttl = 0) - req = RequestFormatter.standard_request(opkey: :flush, ttl: ttl) - write(req) - @response_processor.generic_response + opkey = quiet? ? :flushq : :flush + write(RequestFormatter.standard_request(opkey: opkey, ttl: ttl)) + @response_processor.no_body_response unless quiet? end # Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands. @@ -303,15 +311,12 @@ def stats(info = '') end def reset_stats - write_generic RequestFormatter.standard_request(opkey: :stat, key: 'reset') + write(RequestFormatter.standard_request(opkey: :stat, key: 'reset')) + @response_processor.generic_response end def version - write_generic RequestFormatter.standard_request(opkey: :version) - end - - def write_generic(bytes) - write(bytes) + write(RequestFormatter.standard_request(opkey: :version)) @response_processor.generic_response end @@ -343,7 +348,7 @@ def response_buffer end def pipeline_response(bytes_to_advance = 0) - response_buffer.process_single_response(bytes_to_advance) + response_buffer.process_single_getk_response(bytes_to_advance) end # Called after the noop response is received at the end of a set @@ -354,7 +359,7 @@ def finish_pipeline end def reconnect_on_pipeline_complete! - @connection_manager.reconnect! 'multi_response has completed' if pipeline_complete? + @connection_manager.reconnect! 'pipelined get has completed' if pipeline_complete? end def raise_memcached_down_err diff --git a/lib/dalli/protocol/binary/request_formatter.rb b/lib/dalli/protocol/binary/request_formatter.rb index f167ecf2..dd237d02 100644 --- a/lib/dalli/protocol/binary/request_formatter.rb +++ b/lib/dalli/protocol/binary/request_formatter.rb @@ -31,11 +31,13 @@ class RequestFormatter deleteq: 0x14, incrq: 0x15, decrq: 0x16, + appendq: 0x19, + prependq: 0x1A, + touch: 0x1C, + gat: 0x1D, auth_negotiation: 0x20, auth_request: 0x21, - auth_continue: 0x22, - touch: 0x1C, - gat: 0x1D + auth_continue: 0x22 }.freeze REQ_HEADER_FORMAT = 'CCnCCnNNQ' @@ -56,6 +58,8 @@ class RequestFormatter append: KEY_AND_VALUE, prepend: KEY_AND_VALUE, + appendq: KEY_AND_VALUE, + prependq: KEY_AND_VALUE, auth_request: KEY_AND_VALUE, auth_continue: KEY_AND_VALUE, @@ -68,8 +72,11 @@ class RequestFormatter incr: INCR_DECR, decr: INCR_DECR, + incrq: INCR_DECR, + decrq: INCR_DECR, flush: TTL_ONLY, + flushq: TTL_ONLY, noop: NO_BODY, auth_negotiation: NO_BODY, diff --git a/lib/dalli/protocol/binary/response_processor.rb b/lib/dalli/protocol/binary/response_processor.rb index 7ebb4dac..09a2b20e 100644 --- a/lib/dalli/protocol/binary/response_processor.rb +++ b/lib/dalli/protocol/binary/response_processor.rb @@ -67,8 +67,8 @@ def raise_on_not_ok_status!(resp_header) def generic_response(unpack: false, cache_nils: false) resp_header, body = read_response - return cache_nils ? ::Dalli::NOT_FOUND : nil if resp_header.not_found? return false if resp_header.not_stored? # Not stored, normal status for add operation + return cache_nils ? ::Dalli::NOT_FOUND : nil if resp_header.not_found? raise_on_not_ok_status!(resp_header) return true unless body @@ -76,7 +76,28 @@ def generic_response(unpack: false, cache_nils: false) unpack_response_body(resp_header.extra_len, resp_header.key_len, body, unpack).last end - def data_cas_response + ## + # Response for a storage operation. Returns the cas on success. False + # if the value wasn't stored. And raises an error on all other error + # codes from memcached. + ## + def storage_response + resp_header, = read_response + return false if resp_header.not_stored? # Not stored, normal status for add operation + + raise_on_not_ok_status!(resp_header) + resp_header.cas + end + + def no_body_response + resp_header, = read_response + return false if resp_header.not_stored? # Not stored, possible status for append/prepend + + raise_on_not_ok_status!(resp_header) + true + end + + def data_cas_response(unpack: true) resp_header, body = read_response return [nil, resp_header.cas] if resp_header.not_found? return [nil, false] if resp_header.not_stored? @@ -84,7 +105,7 @@ def data_cas_response raise_on_not_ok_status!(resp_header) return [nil, resp_header.cas] unless body - [unpack_response_body(resp_header.extra_len, resp_header.key_len, body, true).last, resp_header.cas] + [unpack_response_body(resp_header.extra_len, resp_header.key_len, body, unpack).last, resp_header.cas] end def cas_response diff --git a/lib/dalli/protocol/response_buffer.rb b/lib/dalli/protocol/response_buffer.rb index d3d94e71..b96d2ba9 100644 --- a/lib/dalli/protocol/response_buffer.rb +++ b/lib/dalli/protocol/response_buffer.rb @@ -20,7 +20,7 @@ def read # Attempts to process a single response from the buffer. Starts # by advancing the buffer to the specified start position - def process_single_response(start_position = 0) + def process_single_getk_response(start_position = 0) advance(start_position) @response_processor.getk_response_from_buffer(@buffer) end