diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index e5682777..928fbbc0 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -11,12 +11,12 @@ Metrics/AbcSize: Max: 19 -# Offense count: 4 +# Offense count: 5 # Configuration parameters: CountComments, CountAsOne. Metrics/ClassLength: - Max: 387 + Max: 237 -# Offense count: 6 +# Offense count: 5 # Configuration parameters: CountComments, CountAsOne, ExcludedMethods, IgnoredMethods. Metrics/MethodLength: Exclude: diff --git a/History.md b/History.md index 6c373b9c..1e1382d9 100644 --- a/History.md +++ b/History.md @@ -4,6 +4,8 @@ Dalli Changelog Unreleased ========== +- Add quiet support for incr, decr, append, depend, and flush (petergoldstein) +- Additional refactoring to allow reuse of connection behavior (petergoldstein) - Fix issue in flush such that it wasn't passing the delay argument to memcached (petergoldstein) 3.1.0 diff --git a/lib/dalli.rb b/lib/dalli.rb index a2fc672f..6055e662 100644 --- a/lib/dalli.rb +++ b/lib/dalli.rb @@ -31,7 +31,7 @@ class NotPermittedMultiOpError < DalliError; end class NilObject; end # rubocop:disable Lint/EmptyClass NOT_FOUND = NilObject.new - MULTI_KEY = :dalli_multi + QUIET = :dalli_multi def self.logger @logger ||= (rails_logger || default_logger) @@ -63,6 +63,7 @@ def self.logger=(logger) require_relative 'dalli/ring' require_relative 'dalli/protocol' require_relative 'dalli/protocol/binary' +require_relative 'dalli/protocol/connection_manager' require_relative 'dalli/protocol/response_buffer' require_relative 'dalli/protocol/server_config_parser' require_relative 'dalli/protocol/ttl_sanitizer' diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index d0cebfce..13f41b19 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -49,7 +49,7 @@ class Client def initialize(servers = nil, options = {}) @servers = ::Dalli::ServersArgNormalizer.normalize_servers(servers) @options = normalize_options(options) - @key_manager = ::Dalli::KeyManager.new(options) + @key_manager = ::Dalli::KeyManager.new(@options) @ring = nil end @@ -58,24 +58,39 @@ def initialize(servers = nil, options = {}) # ## - # Turn on quiet aka noreply support. - # All relevant operations within this block will be effectively - # pipelined as Dalli will use 'quiet' operations where possible. - # Currently supports the set, add, replace and delete operations. - def multi - old = Thread.current[::Dalli::MULTI_KEY] - Thread.current[::Dalli::MULTI_KEY] = true - yield - ensure - @ring&.pipeline_consume_and_ignore_responses - Thread.current[::Dalli::MULTI_KEY] = old + # Get the value associated with the key. + # If a value is not found, then +nil+ is returned. + def get(key, req_options = nil) + perform(:get, key, req_options) end ## - # Get the value associated with the key. + # Gat (get and touch) fetch an item and simultaneously update its expiration time. + # # If a value is not found, then +nil+ is returned. - def get(key, options = nil) - perform(:get, key, options) + def gat(key, ttl = nil) + perform(:gat, key, ttl_or_default(ttl)) + end + + ## + # Touch updates expiration time for a given key. + # + # Returns true if key exists, otherwise nil. + def touch(key, ttl = nil) + resp = perform(:touch, key, ttl_or_default(ttl)) + resp.nil? ? nil : true + end + + ## + # Get the value and CAS ID associated with the key. If a block is provided, + # value and CAS will be passed to the block. + def get_cas(key) + (value, cas) = perform(:cas, key) + # TODO: This is odd. Confirm this is working as expected. + value = nil if !value || value == 'Not found' + return [value, cas] unless block_given? + + yield value, cas end ## @@ -97,7 +112,21 @@ def get_multi(*keys) end end - CACHE_NILS = { cache_nils: true }.freeze + ## + # Fetch multiple keys efficiently, including available metadata such as CAS. + # If a block is given, yields key/data pairs one a time. Data is an array: + # [value, cas_id] + # If no block is given, returns a hash of + # { 'key' => [value, cas_id] } + def get_multi_cas(*keys) + if block_given? + pipelined_getter.process(keys) { |*args| yield(*args) } + else + {}.tap do |hash| + pipelined_getter.process(keys) { |k, data| hash[k] = data } + end + end + end # Fetch the value associated with the key. # If a value is found, then it is returned. @@ -110,19 +139,11 @@ def get_multi(*keys) def fetch(key, ttl = nil, req_options = nil) req_options = req_options.nil? ? CACHE_NILS : req_options.merge(CACHE_NILS) if cache_nils val = get(key, req_options) - if not_found?(val) && block_given? - val = yield - add(key, val, ttl_or_default(ttl), req_options) - end - val - end - - def not_found?(val) - cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? - end + return val unless block_given? && not_found?(val) - def cache_nils - @options[:cache_nils] + new_val = yield + add(key, new_val, ttl_or_default(ttl), req_options) + new_val end ## @@ -136,8 +157,8 @@ def cache_nils # - nil if the key did not exist. # - false if the value was changed by someone else. # - true if the value was successfully updated. - def cas(key, ttl = nil, options = nil, &block) - cas_core(key, false, ttl, options, &block) + def cas(key, ttl = nil, req_options = nil, &block) + cas_core(key, false, ttl, req_options, &block) end ## @@ -147,30 +168,78 @@ def cas(key, ttl = nil, options = nil, &block) # Returns: # - false if the value was changed by someone else. # - true if the value was successfully updated. - def cas!(key, ttl = nil, options = nil, &block) - cas_core(key, true, ttl, options, &block) + def cas!(key, ttl = nil, req_options = nil, &block) + cas_core(key, true, ttl, req_options, &block) end - def set(key, value, ttl = nil, options = nil) - perform(:set, key, value, ttl_or_default(ttl), 0, options) + ## + # Turn on quiet aka noreply support for a number of + # memcached operations. + # + # All relevant operations within this block will be effectively + # pipelined as Dalli will use 'quiet' versions. The invoked methods + # will all return nil, rather than their usual response. Method + # latency will be substantially lower, as the caller will not be + # blocking on responses. + # + # Currently supports storage (set, add, replace, append, prepend), + # arithmetic (incr, decr), flush and delete operations. Use of + # unsupported operations inside a block will raise an error. + # + # Any error replies will be discarded at the end of the block, and + # Dalli client methods invoked inside the block will not + # have return values + def quiet + old = Thread.current[::Dalli::QUIET] + Thread.current[::Dalli::QUIET] = true + yield + ensure + @ring&.pipeline_consume_and_ignore_responses + Thread.current[::Dalli::QUIET] = old + end + alias multi quiet + + def set(key, value, ttl = nil, req_options = nil) + set_cas(key, value, 0, ttl, req_options) + end + + ## + # Set the key-value pair, verifying existing CAS. + # Returns the resulting CAS value if succeeded, and falsy otherwise. + def set_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:set, key, value, ttl_or_default(ttl), cas, req_options) end ## # Conditionally add a key/value pair, if the key does not already exist # on the server. Returns truthy if the operation succeeded. - def add(key, value, ttl = nil, options = nil) - perform(:add, key, value, ttl_or_default(ttl), options) + def add(key, value, ttl = nil, req_options = nil) + perform(:add, key, value, ttl_or_default(ttl), req_options) end ## # Conditionally add a key/value pair, only if the key already exists # on the server. Returns truthy if the operation succeeded. - def replace(key, value, ttl = nil, options = nil) - perform(:replace, key, value, ttl_or_default(ttl), 0, options) + def replace(key, value, ttl = nil, req_options = nil) + replace_cas(key, value, 0, ttl, req_options) + end + + ## + # Conditionally add a key/value pair, verifying existing CAS, only if the + # key already exists on the server. Returns the new CAS value if the + # operation succeeded, or falsy otherwise. + def replace_cas(key, value, cas, ttl = nil, req_options = nil) + perform(:replace, key, value, ttl_or_default(ttl), cas, req_options) + end + + # Delete a key/value pair, verifying existing CAS. + # Returns true if succeeded, and falsy otherwise. + def delete_cas(key, cas = 0) + perform(:delete, key, cas) end def delete(key) - perform(:delete, key, 0) + delete_cas(key, 0) end ## @@ -187,16 +256,6 @@ def prepend(key, value) perform(:prepend, key, value.to_s) end - ## - # Flush the memcached server, at 'delay' seconds in the future. - # Delay defaults to zero seconds, which means an immediate flush. - ## - def flush(delay = 0) - ring.servers.map { |s| s.request(:flush, delay) } - end - - alias flush_all flush - ## # Incr adds the given amount to the counter on the memcached server. # Amt must be a positive integer value. @@ -208,8 +267,10 @@ def flush(delay = 0) # Note that the ttl will only apply if the counter does not already # exist. To increase an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def incr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:incr, key, amt.to_i, ttl_or_default(ttl), default) end @@ -228,35 +289,31 @@ def incr(key, amt = 1, ttl = nil, default = nil) # Note that the ttl will only apply if the counter does not already # exist. To decrease an existing counter and update its TTL, use # #cas. + # + # If the value already exists, it must have been set with raw: true def decr(key, amt = 1, ttl = nil, default = nil) - raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + check_positive!(amt) perform(:decr, key, amt.to_i, ttl_or_default(ttl), default) end ## - # Touch updates expiration time for a given key. - # - # Returns true if key exists, otherwise nil. - def touch(key, ttl = nil) - resp = perform(:touch, key, ttl_or_default(ttl)) - resp.nil? ? nil : true - end - + # Flush the memcached server, at 'delay' seconds in the future. + # Delay defaults to zero seconds, which means an immediate flush. ## - # Gat (get and touch) fetch an item and simultaneously update its expiration time. - # - # If a value is not found, then +nil+ is returned. - def gat(key, ttl = nil) - perform(:gat, key, ttl_or_default(ttl)) + def flush(delay = 0) + ring.servers.map { |s| s.request(:flush, delay) } end + alias flush_all flush + + ALLOWED_STAT_KEYS = %i[items slabs settings].freeze ## # Collect the stats for each server. # You can optionally pass a type including :items, :slabs or :settings to get specific stats # Returns a hash like { 'hostname:port' => { 'stat1' => 'value1', ... }, 'hostname2:port' => { ... } } def stats(type = nil) - type = nil unless [nil, :items, :slabs, :settings].include? type + type = nil unless ALLOWED_STAT_KEYS.include? type values = {} ring.servers.each do |server| values[server.name.to_s] = server.alive? ? server.request(:stats, type.to_s) : nil @@ -272,12 +329,6 @@ def reset_stats end end - ## - ## Make sure memcache servers are alive, or raise an Dalli::RingError - def alive! - ring.server_for_key('') - end - ## ## Version of the memcache servers. def version @@ -289,55 +340,9 @@ def version end ## - # Get the value and CAS ID associated with the key. If a block is provided, - # value and CAS will be passed to the block. - def get_cas(key) - (value, cas) = perform(:cas, key) - value = nil if !value || value == 'Not found' - if block_given? - yield value, cas - else - [value, cas] - end - end - - ## - # Fetch multiple keys efficiently, including available metadata such as CAS. - # If a block is given, yields key/data pairs one a time. Data is an array: - # [value, cas_id] - # If no block is given, returns a hash of - # { 'key' => [value, cas_id] } - def get_multi_cas(*keys) - if block_given? - pipelined_getter.process(keys) { |*args| yield(*args) } - else - {}.tap do |hash| - pipelined_getter.process(keys) { |k, data| hash[k] = data } - end - end - end - - ## - # Set the key-value pair, verifying existing CAS. - # Returns the resulting CAS value if succeeded, and falsy otherwise. - def set_cas(key, value, cas, ttl = nil, options = nil) - ttl ||= @options[:expires_in].to_i - perform(:set, key, value, ttl, cas, options) - end - - ## - # Conditionally add a key/value pair, verifying existing CAS, only if the - # key already exists on the server. Returns the new CAS value if the - # operation succeeded, or falsy otherwise. - def replace_cas(key, value, cas, ttl = nil, options = nil) - ttl ||= @options[:expires_in].to_i - perform(:replace, key, value, ttl, cas, options) - end - - # Delete a key/value pair, verifying existing CAS. - # Returns true if succeeded, and falsy otherwise. - def delete_cas(key, cas = 0) - perform(:delete, key, cas) + ## Make sure memcache servers are alive, or raise an Dalli::RingError + def alive! + ring.server_for_key('') end ## @@ -349,6 +354,16 @@ def close end alias reset close + CACHE_NILS = { cache_nils: true }.freeze + + def not_found?(val) + cache_nils ? val == ::Dalli::NOT_FOUND : val.nil? + end + + def cache_nils + @options[:cache_nils] + end + # Stub method so a bare Dalli client can pretend to be a connection pool. def with yield self @@ -356,15 +371,23 @@ def with private - def cas_core(key, always_set, ttl = nil, options = nil) + def check_positive!(amt) + raise ArgumentError, "Positive values only: #{amt}" if amt.negative? + end + + def cas_core(key, always_set, ttl = nil, req_options = nil) (value, cas) = perform(:cas, key) value = nil if !value || value == 'Not found' return if value.nil? && !always_set newvalue = yield(value) - perform(:set, key, newvalue, ttl_or_default(ttl), cas, options) + perform(:set, key, newvalue, ttl_or_default(ttl), cas, req_options) end + ## + # Uses the argument TTL or the client-wide default. Ensures + # that the value is an integer + ## def ttl_or_default(ttl) (ttl || @options[:expires_in]).to_i rescue NoMethodError @@ -385,7 +408,16 @@ def protocol_implementation @protocol_implementation ||= @options.fetch(:protocol_implementation, Dalli::Protocol::Binary) end - # Chokepoint method for instrumentation + ## + # Chokepoint method for memcached methods with a key argument. + # Validates the key, resolves the key to the appropriate server + # instance, and invokes the memcached method on the appropriate + # server. + # + # This method also forces retries on network errors - when + # a particular memcached instance becomes unreachable, or the + # operational times out. + ## def perform(*all_args) return yield if block_given? diff --git a/lib/dalli/options.rb b/lib/dalli/options.rb index 1e33b6a6..a48cd4ba 100644 --- a/lib/dalli/options.rb +++ b/lib/dalli/options.rb @@ -31,19 +31,19 @@ def close end end - def pipeline_response_start + def pipeline_response_setup @lock.synchronize do super end end - def process_outstanding_pipeline_requests + def pipeline_next_responses @lock.synchronize do super end end - def pipeline_response_abort + def pipeline_abort @lock.synchronize do super end diff --git a/lib/dalli/pipelined_getter.rb b/lib/dalli/pipelined_getter.rb index 31d870ac..801885cb 100644 --- a/lib/dalli/pipelined_getter.rb +++ b/lib/dalli/pipelined_getter.rb @@ -19,13 +19,7 @@ def process(keys, &block) @ring.lock do servers = setup_requests(keys) start_time = Time.now - loop do - # Remove any servers which are not connected - servers.delete_if { |s| !s.connected? } - break if servers.empty? - - servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) - end + servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? end rescue NetworkError => e Dalli.logger.debug { e.inspect } @@ -44,6 +38,10 @@ def setup_requests(keys) ## # Loop through the server-grouped sets of keys, writing # the corresponding getkq requests to the appropriate servers + # + # It's worth noting that we could potentially reduce bytes + # on the wire by switching from getkq to getq, and using + # the opaque value to match requests to responses. ## def make_getkq_requests(groups) groups.each do |server, keys_for_server| @@ -80,7 +78,7 @@ def finish_queries(servers) end def finish_query_for_server(server) - server.pipeline_response_start + server.pipeline_response_setup rescue Dalli::NetworkError raise rescue Dalli::DalliError => e @@ -91,10 +89,14 @@ def finish_query_for_server(server) # Swallows Dalli::NetworkError def abort_without_timeout(servers) - servers.each(&:pipeline_response_abort) + servers.each(&:pipeline_abort) end def fetch_responses(servers, start_time, timeout, &block) + # Remove any servers which are not connected + servers.delete_if { |s| !s.connected? } + return [] if servers.empty? + time_left = remaining_time(start_time, timeout) readable_servers = servers_with_response(servers, time_left) if readable_servers.empty? @@ -135,11 +137,11 @@ def abort_with_timeout(servers) # Processes responses from a server. Returns true if there are no # additional responses from this server. def process_server(server) - server.process_outstanding_pipeline_requests.each_pair do |key, value_list| + server.pipeline_next_responses.each_pair do |key, value_list| yield @key_manager.key_without_namespace(key), value_list end - server.pipeline_response_completed? + server.pipeline_complete? end def servers_with_response(servers, timeout) diff --git a/lib/dalli/protocol/binary.rb b/lib/dalli/protocol/binary.rb index fbc453e6..5e412ec3 100644 --- a/lib/dalli/protocol/binary.rb +++ b/lib/dalli/protocol/binary.rb @@ -1,6 +1,5 @@ # frozen_string_literal: true -require 'English' require 'forwardable' require 'socket' require 'timeout' @@ -20,62 +19,30 @@ module Protocol class Binary extend Forwardable - attr_accessor :hostname, :port, :weight, :options - attr_reader :sock, :socket_type + attr_accessor :weight, :options def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default? + def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout, + :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error - DEFAULTS = { - # seconds between trying to contact a remote server - down_retry_delay: 30, - # connect/read/write timeout for socket operations - socket_timeout: 1, - # times a socket operation may fail before considering the server dead - socket_max_failures: 2, - # amount of time to sleep between retries when a failure occurs - socket_failure_delay: 0.1 - }.freeze - - def initialize(attribs, options = {}) - @hostname, @port, @weight, @socket_type, options = ServerConfigParser.parse(attribs, options) - @options = DEFAULTS.merge(options) + def initialize(attribs, client_options = {}) + hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs) + @options = client_options.merge(user_creds) @value_marshaller = ValueMarshaller.new(@options) - @response_processor = ResponseProcessor.new(self, @value_marshaller) - @response_buffer = ResponseBuffer.new(self, @response_processor) - - reset_down_info - @sock = nil - @pid = nil - @request_in_progress = false - end - - def response_buffer - @response_buffer ||= ResponseBuffer.new(self, @response_processor) - end - - def name - if socket_type == :unix - hostname - else - "#{hostname}:#{port}" - end + @connection_manager = ConnectionManager.new(hostname, port, socket_type, @options) + @response_processor = ResponseProcessor.new(@connection_manager, @value_marshaller) end # Chokepoint method for error handling and ensuring liveness def request(opkey, *args) verify_state(opkey) - # The alive? call has the side effect of connecting the underlying - # socket if it is not connected, or there's been a disconnect - # because of timeout or other error. Method raises an error - # if it can't connect - raise_memcached_down_err unless alive? begin send(opkey, *args) rescue Dalli::MarshalError => e - log_marshall_err(args.first, e) + log_marshal_err(args.first, e) raise - rescue Dalli::DalliError, Dalli::NetworkError, Dalli::ValueOverMaxSize, Timeout::Error + rescue Dalli::DalliError raise rescue StandardError => e log_unexpected_err(e) @@ -83,63 +50,17 @@ def request(opkey, *args) end end - def raise_memcached_down_err - raise Dalli::NetworkError, - "#{name} is down: #{@error} #{@msg}. If you are sure it is running, "\ - "ensure memcached version is > #{::Dalli::MIN_SUPPORTED_MEMCACHED_VERSION}." - end - - def log_marshall_err(key, err) - Dalli.logger.error "Marshalling error for key '#{key}': #{err.message}" - Dalli.logger.error 'You are trying to cache a Ruby object which cannot be serialized to memcached.' - end - - def log_unexpected_err(err) - Dalli.logger.error "Unexpected exception during Dalli request: #{err.class.name}: #{err.message}" - Dalli.logger.error err.backtrace.join("\n\t") - end - - # The socket connection to the underlying server is initialized as a side - # effect of this call. In fact, this is the ONLY place where that - # socket connection is initialized. + ## + # Boolean method used by clients of this class to determine if this + # particular memcached instance is available for use. def alive? - return true if @sock - return false unless reconnect_down_server? - - connect - !!@sock + ensure_connected! rescue Dalli::NetworkError + # ensure_connected! raises a NetworkError if connection fails. We + # want to capture that error and convert it to a boolean value here. false end - def reconnect_down_server? - return true unless @last_down_at - - time_to_next_reconnect = @last_down_at + options[:down_retry_delay] - Time.now - return true unless time_to_next_reconnect.positive? - - Dalli.logger.debug do - format('down_retry_delay not reached for %s (%