diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fd102a..87d2a55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ ## [Unreleased] +## [1.16.0] - 2025-10-19 +### Added +- `#claer_locks_of_acquirer` (alias: `#release_locks_of_acquirer`) - remove locks of the concrete acquirer and drop this acquirer form all lock queues; +- `#clear_locks_of_host` (alias: `#release_locks_of_host`) - remove locks of the concrete host; +- Instrumentation events: + - added `:rel_req_cnt` (the count of removed lock-requests from lock queues) to `"redis_queued_locks.release_locks_of"` instrumentation event payload; +- Added new methods to `RedisQueuedLocks::Resource`: + - `.acquirer_host` - extract host identifier from acquirer identifier; + - `.acquirer_pattern_from_host` - extract acquirer search pattern from any host identifier needed for `ZSCAN` command; + - `.extract_identity` - extract **identity** part from acquirer identifier (or from host indetifier) needed for `ZSCAN` command; + - `.extract_non_identified_part` - extract the substring from acquirer identifier (or from host identifier) excluding the **identity** part need for `ZSCAN` command; +### Changed +- the process-ractor-thread-fiber order of the acquirer identifier and the host identifier has changed: + - the order is consider the object scope priority: + - before: **process_id** -> **thread_id** -> **fiber_id** -> **ractor_id** -> **identity** + - after: **process_id** -> **ractor_id** -> **thread_id** -> **fiber_id** -> **identity** + - current patterns of acquire_identifier and host_identifier: + - **acquirer**: `"rql:acq:#{process_id}/#{ractor_id}/#{thread_id}/#{fiber_id}/#{identity}"` + - **host**: `"rql:hst:#{process_id}/#{ractor_id}/#{thread_id}/#{identity}"` + ## [1.15.0] - 2025-10-17 ### Changed - `"redis_queud_locks.release_locks_of"` instrumentation event payload now includes `hst_id` and `acq_id`; diff --git a/Gemfile.lock b/Gemfile.lock index faad87a..fa1638a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - redis_queued_locks (1.15.0) + redis_queued_locks (1.16.0) redis-client (~> 0.20) GEM diff --git a/README.md b/README.md index e90a2b2..e931c50 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ Provides flexible invocation flow, parametrized limits (lock request ttl, lock t - [clear_locks](#clear_locks---release-all-locks-and-lock-queues) (aka `release_locks`) - [clear_locks_of](#clear_locks_of) (aka `release_locks_of`) - [clear_current_locks](#clear_current_locks) (aka `release_current_locks`) + - [clear_locks_of_acquirer](#clear_locks_of_acquirer) (aka `release_locks_of_acquirer`) + - [clear_locks_of_host](#clear_locks_of) (aka `release_locks_of_host`) - [extend_lock_ttl](#extend_lock_ttl) - [locks](#locks---get-list-of-obtained-locks) - [queues](#queues---get-list-of-lock-request-queues) @@ -72,6 +74,8 @@ Provides flexible invocation flow, parametrized limits (lock request ttl, lock t - ["redis_queued_locks.explicit_lock_release"](#redis_queued_locksexplicit_lock_release) - ["redis_queued_locks.explicit_all_locks_release"](#redis_queued_locksexplicit_all_locks_release) - ["redis_queued_locks.release_locks_of"](#redis_queued_locksrelease_locks_of) + - ["redis_queued_locks.release_locks_of_acquirer"](#redis_queued_locksrelease_locks_of_acquirer) + - ["redis_queued_locks.release_locks_of_host"](#redis_queued_locksrelease_locks_of_host) - [Roadmap](#roadmap) - [Build and Develop](#build-and-develop) - [Contributing](#contributing) @@ -253,6 +257,10 @@ client = RedisQueuedLocks::Client.new(redis_client) do |config| # - affects the performance of your Redis (configure thoughtfully); config['clear_locks_of__queue_scan_size'] = 300 + # (default: 100) + # - ? + config['clear_locks_of_host__queue_cleanup_cursor_count'] = 100 + # (default: 500) # - how many items should be extracted from redis during the #locks, #queues, #keys # #locks_info, and #queues_info operations (uses SCAN); @@ -1333,6 +1341,19 @@ rql.release_locks_of(host_id: rql.current_host_id, acquirer_id: rql.current_acqu --- +#### clear_locks_of_acquirer + +\[[back to top](#usage)\] + +--- + + +#### clear_locks_of_host + +\[[back to top](#usage)\] + +--- + #### #extend_lock_ttl \[[back to top](#usage)\] @@ -2194,6 +2215,8 @@ List of instrumentation events - ["redis_queued_locks.explicit_lock_release"](#redis_queued_locksexplicit_lock_release) - ["redis_queued_locks.explicit_all_locks_release"](#redis_queued_locksexplicit_all_locks_release) - ["redis_queued_locks.release_locks_of"](#redis_queued_locksrelease_locks_of) +- ["redis_queued_locks.release_locks_of_acquirer"](#redis_queued_locksrelease_locks_of_acquirer) +- ["redis_queued_locks.release_locks_of_host"](#redis_queued_locksrelease_locks_of_host) Detalized event semantics and payload structure: @@ -2286,7 +2309,7 @@ Detalized event semantics and payload structure: ##### `"redis_queued_locks.release_locks_of"` - \[[back to the list](#instrumentation-events)\] -- an event signalizes about the released locks (and removement from lock queues) of the concrete host and acquirer; +- an event signalizes about the released locks (and removement of all lock requests from lock queues) of the concrete host and acquirer; - raised from `#clear_locks_of` and `#clear_current_locks` (`#release_locks_of` and `#release_current_locks` respectively); - payload: - `:rel_time` - `float`/`milliseconds` - time spent on "release locks of" operation; @@ -2294,8 +2317,33 @@ Detalized event semantics and payload structure: - `:acq_id` - `string` - refused acquirer identifier; - `:hst_id` - `string` - refused host identifier; - `:rel_key_cnt` - `integer` - released locks count; + - `:rel_req_cnt` - `integer` - the count of removed lock requests from all related lock-queues; - `:tch_queue_cnt` - `:integer` - the number of queues from which the concrete host/acquirer was removed; +#### `"redis_queued_locks.release_locks_of_acquirer"` +- \[[back to the list](#instrumentation-events)\] +- an event signalizes about the released locks (and removement of all lock requests from lock queues) of the concrete acquirer; +- raised from `#clear_locks_of_acquirer` (and `#release_locks_of_acquirer` respectively); +- payload: + - `:rel_time` - `float`/`milliseconds` - time spent on "release locks of" operation; + - `:at` - `float`/`epoch` - the time when the opertaion has ended; + - `:acq_id` - `string` - refused acquirer identifier; + - `:rel_key_cnt` - `integer` - released locks count; + - `:rel_req_cnt` - `integer` - the count of removed lock requests from all related lock-queues; + - `:tch_queue_cnt` - `:integer` - the number of queues from which the concrete acquirer was removed; + +#### `"redis_queued_locks.release_locks_of_host"` +- \[[back to the list](#instrumentation-events)\] +- an event signalizes about the released locks (and removement of all lock requests from lock queues) of the concrete host; +- raised from `#clear_locks_of_host` (and `#release_locks_of_host` respectively); +- payload: + - `:rel_time` - `float`/`milliseconds` - time spent on "release locks of" operation; + - `:at` - `float`/`epoch` - the time when the opertaion has ended; + - `:hst_id` - `string` - refused host identifier; + - `:rel_key_cnt` - `integer` - released locks count; + - `:rel_req_cnt` - `integer` - the count of removed lock requests from all related lock-queues; + - `:tch_queue_cnt` - `:integer` - the number of queues from which the concrete host was removed; + --- ## Roadmap @@ -2318,7 +2366,6 @@ Detalized event semantics and payload structure: ```ruby rql.lock_series('lock_a', 'lock_b', 'lock_c') { puts 'locked' } ``` - - an ability to release all locks and all requests of the concrete acquirer id or host id (or both in validation-orianted combination); - detailed lock informotion inside the error object in cases of exceptions (at the moment we have this info inside the error message only that hard to analyze in work); - a convenient way to mark any `lock` invocation as "non-instrumentable" / "non-loggable" (as an alternative to `VoidNotifier` and to `VoidLogger`); - `Read`/`Write` semantics: you can mark your locks as `read` or `write` lock in order to simulate `read`/`write` lock behavior: diff --git a/github_ci/ruby3.3.gemfile.lock b/github_ci/ruby3.3.gemfile.lock index ba81221..d27ebdd 100644 --- a/github_ci/ruby3.3.gemfile.lock +++ b/github_ci/ruby3.3.gemfile.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - redis_queued_locks (1.15.0) + redis_queued_locks (1.16.0) redis-client (~> 0.20) GEM diff --git a/lib/redis_queued_locks/acquirer/acquire_lock.rb b/lib/redis_queued_locks/acquirer/acquire_lock.rb index 14647f2..70f14ef 100644 --- a/lib/redis_queued_locks/acquirer/acquire_lock.rb +++ b/lib/redis_queued_locks/acquirer/acquire_lock.rb @@ -159,7 +159,7 @@ class << self # # @api private # @since 1.0.0 - # @version 1.14.0 + # @version 1.16.0 def acquire_lock( redis, lock_name, @@ -232,15 +232,15 @@ def acquire_lock( # Step 1: prepare lock requirements (generate lock name, calc lock ttl, etc). acquirer_id = RedisQueuedLocks::Resource.acquirer_identifier( process_id, + ractor_id, thread_id, fiber_id, - ractor_id, identity ) host_id = RedisQueuedLocks::Resource.host_identifier( process_id, - thread_id, ractor_id, + thread_id, identity ) lock_ttl = ttl diff --git a/lib/redis_queued_locks/acquirer/release_locks_of.rb b/lib/redis_queued_locks/acquirer/release_locks_of.rb index b45ec74..79ea603 100644 --- a/lib/redis_queued_locks/acquirer/release_locks_of.rb +++ b/lib/redis_queued_locks/acquirer/release_locks_of.rb @@ -2,6 +2,8 @@ # @api private # @since 1.14.0 +# @version 1.16.0 +# rubocop:disable Metrics/ModuleLength module RedisQueuedLocks::Acquirer::ReleaseLocksOf # @since 1.14.0 extend RedisQueuedLocks::Utilities @@ -77,6 +79,7 @@ class << self # ok: true, # result: { # rel_key_cnt: Integer, + # rel_req_cnt: Integer, # tch_queue_cnt: Integer, # rel_time: Numeric # } @@ -84,7 +87,7 @@ class << self # # @api private # @since 1.14.0 - # @version 1.15.0 + # @version 1.16.0 # rubocop:disable Metrics/MethodLength def release_locks_of( refused_host_id, @@ -137,6 +140,7 @@ def release_locks_of( acq_id: refused_acquirer_id, rel_time: rel_time, rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], tch_queue_cnt: result[:tch_queue_cnt] }) end if instr_sampled @@ -145,6 +149,7 @@ def release_locks_of( ok: true, result: { rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], tch_queue_cnt: result[:tch_queue_cnt], rel_time: rel_time } @@ -160,10 +165,11 @@ def release_locks_of( # @param lock_scan_size [Integer] # @param queue_scan_size [Integer] # @return [Hash>] - # - Example: { ok: true, result: { rel_key_cnt: 12345, tch_queue_cnt: 321 } } + # Example: { ok: true, result: { rel_key_cnt: 12345, tch_queue_cnt: 321, rel_req_cnt: 123 } } # # @api private # @since 1.14.0 + # @version 1.16.0 # rubocop:disable Metrics/MethodLength def fully_release_locks_of( refused_host_id, @@ -174,6 +180,7 @@ def fully_release_locks_of( ) # TODO: some indexing approach isntead of rel_key_cnt = 0 + rel_req_cnt = 0 tch_queue_cnt = 0 redis.with do |rconn| @@ -210,12 +217,14 @@ def fully_release_locks_of( count: queue_scan_size ) do |lock_queue| res = rconn.call('ZREM', lock_queue, refused_acquirer_id) + rel_req_cnt += res tch_queue_cnt += 1 if res != 0 end end - { ok: true, result: { rel_key_cnt:, tch_queue_cnt: } } + { ok: true, result: { rel_key_cnt:, tch_queue_cnt:, rel_req_cnt: } } end end # rubocop:enable Metrics/MethodLength end +# rubocop:enable Metrics/ModuleLength diff --git a/lib/redis_queued_locks/acquirer/release_locks_of_acquirer.rb b/lib/redis_queued_locks/acquirer/release_locks_of_acquirer.rb new file mode 100644 index 0000000..330902d --- /dev/null +++ b/lib/redis_queued_locks/acquirer/release_locks_of_acquirer.rb @@ -0,0 +1,157 @@ +# frozen_string_literal: true + +# @api private +# @since 1.16.0 +module RedisQueuedLocks::Acquirer::ReleaseLocksOfAcquirer + # @since 1.16.0 + extend RedisQueuedLocks::Utilities + + class << self + # @param refused_acquirer_id [String] + # @param redis [RedisClient] + # @param lock_scan_size [Integer] + # @param queue_scan_size [Integer] + # @param logger [::Logger,#debug] + # @param isntrumenter [#notify] + # @param instrument [NilClass,Any] + # @param log_sampling_enabled [Boolean] + # @param log_sampling_percent [Integer] + # @param log_sampler [#sampling_happened?,Module] + # @param log_sample_this [Boolean] + # @param instr_sampling_enabled [Boolean] + # @param instr_sampling_percent [Integer] + # @param instr_sampler [#sampling_happened?,Module] + # @param instr_sample_this [Boolean] + # @return [Hash>] + # + # @api private + # @since 1.16.0 + # rubocop:disable Metrics/MethodLength + def release_locks_of_acquirer( + refused_acquirer_id, + redis, + lock_scan_size, + queue_scan_size, + logger, + instrumenter, + instrument, + log_sampling_enabled, + log_sampling_percent, + log_sampler, + log_sample_this, + instr_sampling_enabled, + instr_sampling_percent, + instr_sampler, + instr_sample_this + ) + rel_start_time = clock_gettime + + fully_release_locks_of_acquirer( + refused_acquirer_id, + redis, + lock_scan_size, + queue_scan_size + ) => { ok:, result: } + + # @type var ok: bool + # @type var result: Hash[Symbol,Integer] + + time_at = Time.now.to_f + rel_end_time = clock_gettime + rel_time = ((rel_end_time - rel_start_time) / 1_000.0).ceil(2) + + instr_sampled = RedisQueuedLocks::Instrument.should_instrument?( + instr_sampling_enabled, + instr_sample_this, + instr_sampling_percent, + instr_sampler + ) + + run_non_critical do + instrumenter.notify('redis_queued_locks.release_locks_of_acquirer', { + at: time_at, + acq_id: refused_acquirer_id, + rel_time: rel_time, + rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], + tch_queue_cnt: result[:tch_queue_cnt] + }) + end if instr_sampled + + { + ok: true, + result: { + rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], + tch_queue_cnt: result[:tch_queue_cnt], + rel_time: rel_time + } + } + end + # rubocop:enable Metrics/MethodLength + + private + + # @param refused_acquirer_id [String] + # @param redis [RedisClient] + # @param lock_scan_size [Integer] + # @param queue_scan_size [Integer] + # @return [Hash>] + # + # @api private + # @since 1.16.0 + # rubocop:disable Metrics/MethodLength + def fully_release_locks_of_acquirer( + refused_acquirer_id, + redis, + lock_scan_size, + queue_scan_size + ) + rel_key_cnt = 0 + tch_queue_cnt = 0 + rel_req_cnt = 0 + + redis.with do |rconn| + # Step A: drop locks of the passed acquirer + refused_locks = Set.new #: Set[String] + rconn.scan( + 'MATCH', + RedisQueuedLocks::Resource::LOCK_PATTERN, + count: lock_scan_size + ) do |lock_key| + acquirer_id = rconn.call('HMGET', lock_key, 'acq_id') + if refused_acquirer_id == acquirer_id + refused_locks << lock_key + end + + if refused_locks.size >= lock_scan_size + # NOTE: steep can not recognize the `*`-splat operator on Set objects + rconn.call('DEL', *refused_locks) # steep:ignore + rel_key_cnt += refused_locks.size + refused_locks.clear + end + end + + if refused_locks.any? + # NOTE: steep can not recognize the `*`-splat operator on Set objects + rconn.call('DEL', *refused_locks) # steep:ignore + rel_key_cnt += refused_locks.size + end + + # Step B: drop passed acquirer from lock queues + rconn.scan( + 'MATCH', + RedisQueuedLocks::Resource::LOCK_QUEUE_PATTERN, + count: queue_scan_size + ) do |lock_queue| + res = rconn.call('ZREM', lock_queue, refused_acquirer_id) + rel_req_cnt += res + tch_queue_cnt += 1 if res != 0 + end + end + + { ok: true, result: { rel_key_cnt:, tch_queue_cnt:, rel_req_cnt: } } + end + # rubocop:enable Metrics/MethodLength + end +end diff --git a/lib/redis_queued_locks/acquirer/release_locks_of_host.rb b/lib/redis_queued_locks/acquirer/release_locks_of_host.rb new file mode 100644 index 0000000..7d703c0 --- /dev/null +++ b/lib/redis_queued_locks/acquirer/release_locks_of_host.rb @@ -0,0 +1,199 @@ +# frozen_string_literal: true + +# @api private +# @since 1.16.0 +# rubocop:disable Metrics/ModuleLength +module RedisQueuedLocks::Acquirer::ReleaseLocksOfHost + # @since 1.16.0 + extend RedisQueuedLocks::Utilities + + # rubocop:disable Metrics/ClassLength + class << self + # @param refused_host_id [String] + # @param redis [RedisClient] + # @param lock_scan_size [Integer] + # @param queue_scan_size [Integer] + # @param queue_cleanup_cursor_count [Integer] + # @param logger [::Logger,#debug] + # @param isntrumenter [#notify] + # @param instrument [NilClass,Any] + # @param log_sampling_enabled [Boolean] + # @param log_sampling_percent [Integer] + # @param log_sampler [#sampling_happened?,Module] + # @param log_sample_this [Boolean] + # @param instr_sampling_enabled [Boolean] + # @param instr_sampling_percent [Integer] + # @param instr_sampler [#sampling_happened?,Module] + # @param instr_sample_this [Boolean] + # @return [Hash>] + # + # @api private + # @since 1.16.0 + # rubocop:disable Metrics/MethodLength + def release_locks_of_host( + refused_host_id, + redis, + lock_scan_size, + queue_scan_size, + queue_cleanup_cursor_count, + logger, + instrumenter, + instrument, + log_sampling_enabled, + log_sampling_percent, + log_sampler, + log_sample_this, + instr_sampling_enabled, + instr_sampling_percent, + instr_sampler, + instr_sample_this + ) + rel_start_time = clock_gettime + + fully_release_locks_of_host( + refused_host_id, + redis, + lock_scan_size, + queue_scan_size, + queue_cleanup_cursor_count + ) => { ok:, result: } + + # @type var ok: bool + # @type var result: Hash[Symbol,Integer] + + time_at = Time.now.to_f + rel_end_time = clock_gettime + rel_time = ((rel_end_time - rel_start_time) / 1_000.0).ceil(2) + + instr_sampled = RedisQueuedLocks::Instrument.should_instrument?( + instr_sampling_enabled, + instr_sample_this, + instr_sampling_percent, + instr_sampler + ) + + run_non_critical do + instrumenter.notify('redis_queued_locks.release_locks_of_host', { + at: time_at, + hst_id: refused_host_id, + rel_time: rel_time, + rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], + tch_queue_cnt: result[:tch_queue_cnt] + }) + end if instr_sampled + + { + ok: true, + result: { + rel_key_cnt: result[:rel_key_cnt], + rel_req_cnt: result[:rel_req_cnt], + tch_queue_cnt: result[:tch_queue_cnt], + rel_time: rel_time + } + } + end + # rubocop:enable Metrics/MethodLength + + private + + # @param refused_host_id [String] + # @param redis [RedisClient] + # @param lock_scan_size [Integer] + # @param queue_scan_size [Integer] + # @param queue_cleanup_cursor_count [Integer] + # @return [Hash>] + # + # @api private + # @since 1.16.0 + # rubocop:disable Metrics/MethodLength + def fully_release_locks_of_host( + refused_host_id, + redis, + lock_scan_size, + queue_scan_size, + queue_cleanup_cursor_count + ) + # TODO: some indexing approach isntead of + rel_key_cnt = 0 + tch_queue_cnt = 0 + rel_req_cnt = 0 + + redis.with do |rconn| + # Step A: drop locks of the passed host/acquirer + refused_locks = Set.new #: Set[String] + rconn.scan( + 'MATCH', + RedisQueuedLocks::Resource::LOCK_PATTERN, + count: lock_scan_size + ) do |lock_key| + host_id = rconn.call('HMGET', lock_key, 'hst_id') + if refused_host_id == host_id + refused_locks << lock_key + end + + if refused_locks.size >= lock_scan_size + # NOTE: steep can not recognize the `*`-splat operator on Set objects + rconn.call('DEL', *refused_locks) # steep:ignore + rel_key_cnt += refused_locks.size + refused_locks.clear + end + end + + if refused_locks.any? + # NOTE: steep can not recognize the `*`-splat operator on Set objects + rconn.call('DEL', *refused_locks) # steep:ignore + rel_key_cnt += refused_locks.size + end + + host_acquirers_pattern = RedisQueuedLocks::Resource.acquirer_pattern_from_host( + refused_host_id + ) + + # Step B: drop passed host/acquirer from lock queues + rconn.scan( + 'MATCH', + RedisQueuedLocks::Resource::LOCK_QUEUE_PATTERN, + count: queue_scan_size + ) do |lock_queue| + lock_queue_is_mutated = false + current_cursor = '0' + + # rubocop:disable Layout/LineLength + # EXAMPLE: (in redis) + # ZSCAN "rql:lock_queue:some_lock" 0 MATCH "rql:acq:1234/1234/1234/*/idenity" COUNT 123 + # ZSCAN MATCH COUNT + # rubocop:enable Layout/LineLength + + loop do + current_cursor, acquirer_scores = rconn.call( + 'ZSCAN', lock_queue, current_cursor, + 'MATCH', host_acquirers_pattern, + 'COUNT', queue_cleanup_cursor_count + ) + + # NOTE: we should stop any iteration if no data found; + break if acquirer_scores.empty? + + acquirer_members = acquirer_scores.select { |member| member.start_with?('rql:acq:') } + released_requests = rconn.call('ZREM', lock_queue, *acquirer_members) + lock_queue_is_mutated = true + rel_req_cnt += released_requests + + # NOTE: + # - we should stop any iteration if data extraction is finished; + # - (if the returned cursor is 0, a full iteration is complete); + break if current_cursor == '0' + end + + tch_queue_cnt += 1 if lock_queue_is_mutated + end + end + + { ok: true, result: { rel_key_cnt:, tch_queue_cnt:, rel_req_cnt: } } + end + # rubocop:enable Metrics/MethodLength + end + # rubocop:enable Metrics/ClassLength +end +# rubocop:enable Metrics/ModuleLength diff --git a/lib/redis_queued_locks/client.rb b/lib/redis_queued_locks/client.rb index a347fbc..685b11a 100644 --- a/lib/redis_queued_locks/client.rb +++ b/lib/redis_queued_locks/client.rb @@ -2,7 +2,7 @@ # @api public # @since 1.0.0 -# @version 1.14.0 +# @version 1.16.0 # rubocop:disable Metrics/ClassLength class RedisQueuedLocks::Client # @return [RedisClient] @@ -548,9 +548,9 @@ def queue_info(lock_name) # Retrun the current acquirer identifier. # # @option process_id [Integer,Any] Process identifier. + # @option ractor_id [Integer,Any] Ractor identifier. # @option thread_id [Integer,Any] Thread identifier. # @option fiber_id [Integer,Any] Fiber identifier. - # @option ractor_id [Integer,Any] Ractor identifier. # @option identity [String] Unique per-process string. See `config['uniq_identifier']`. # @return [String] # @@ -563,18 +563,19 @@ def queue_info(lock_name) # # @api public # @since 1.8.0 + # @version 1.16.0 def current_acquirer_id( process_id: RedisQueuedLocks::Resource.get_process_id, + ractor_id: RedisQueuedLocks::Resource.get_ractor_id, thread_id: RedisQueuedLocks::Resource.get_thread_id, fiber_id: RedisQueuedLocks::Resource.get_fiber_id, - ractor_id: RedisQueuedLocks::Resource.get_ractor_id, identity: uniq_identity ) RedisQueuedLocks::Resource.acquirer_identifier( process_id, + ractor_id, thread_id, fiber_id, - ractor_id, identity ) end @@ -598,16 +599,17 @@ def current_acquirer_id( # # @api public # @since 1.9.0 + # @version 1.16.0 def current_host_id( process_id: RedisQueuedLocks::Resource.get_process_id, - thread_id: RedisQueuedLocks::Resource.get_thread_id, ractor_id: RedisQueuedLocks::Resource.get_ractor_id, + thread_id: RedisQueuedLocks::Resource.get_thread_id, identity: uniq_identity ) RedisQueuedLocks::Resource.host_identifier( process_id, - thread_id, ractor_id, + thread_id, identity ) end @@ -772,7 +774,16 @@ def clear_locks( # @option instr_sampler [#sampling_happened?,Module] # @option instr_sample_this [Boolean] # @return [Hash>] - # Example: { ok: true, result: { rel_key_cnt: 100, tch_queue_cnt: 2, rel_time: 0.01 } } + # Example: + # { + # ok: true, + # result: { + # rel_key_cnt: 100, + # rel_req_cnt: 123, + # tch_queue_cnt: 2, + # rel_time: 0.01 + # } + # } # # @example Release locks of the current process: # client.clear_locks_of( @@ -835,6 +846,119 @@ def clear_locks_of( end alias_method :release_locks_of, :clear_locks_of + # @option host_id [String] + # @option lock_scan_size [Integer] + # @option queue_scan_size [Integer] + # @option queue_cleanup_cursor_count [Integer] + # @option logger [::Logger,#debug] + # @option instrumenter [#notify] See `config['instrumenter']` docs for details. + # @option instrument [NilClass,Any] + # @option log_sampling_enabled [Boolean] + # @option log_sampling_percent [Integer] + # @option log_sampler [#sampling_happened?,Module] + # @option log_sample_this [Boolean] + # @option instr_sampling_enabled [Boolean] + # @option instr_sampling_percent [Integer] + # @option instr_sampler [#sampling_happened?,Module] + # @option instr_sample_this [Boolean] + # @return [Hash>] + # + # @api public + # @since 1.16.0 + # rubocop:disable Layout/LineLength + def clear_locks_of_host( + host_id, + lock_scan_size: config['clear_locks_of__lock_scan_size'], # steep:ignore + queue_scan_size: config['clear_locks_of__queue_scan_size'], # steep:ingore + queue_cleanup_cursor_count: config['clear_locks_of_host__queue_cleanup_cursor_count'], # steep:ingore + logger: config['logger'], # steep:ignore + instrumenter: config['instrumenter'], # steep:ignore + instrument: nil, + log_sampling_enabled: config['log_sampling_enabled'], # steep:ignore + log_sampling_percent: config['log_sampling_percent'], # steep:ignore + log_sampler: config['log_sampler'], # steep:ignore + log_sample_this: false, + instr_sampling_enabled: config['instr_sampling_enabled'], # steep:ignore + instr_sampling_percent: config['instr_sampling_percent'], # steep:ignore + instr_sampler: config['instr_sampler'], # steep:ignore + instr_sample_this: false + ) + RedisQueuedLocks::Acquirer::ReleaseLocksOfHost.release_locks_of_host( + host_id, + redis_client, + lock_scan_size, + queue_scan_size, + queue_cleanup_cursor_count, + logger, + instrumenter, + instrument, + log_sampling_enabled, + log_sampling_percent, + log_sampler, + log_sample_this, + instr_sampling_enabled, + instr_sampling_percent, + instr_sampler, + instr_sample_this + ) + end + alias_method :release_locks_of_host, :clear_locks_of_host + # rubocop:enable Layout/LineLength + + # @option acquirer_id [String] + # @option lock_scan_size [Integer] + # @option queue_scan_size [Integer] + # @option logger [::Logger,#debug] + # @option instrumenter [#notify] See `config['instrumenter']` docs for details. + # @option instrument [NilClass,Any] + # @option log_sampling_enabled [Boolean] + # @option log_sampling_percent [Integer] + # @option log_sampler [#sampling_happened?,Module] + # @option log_sample_this [Boolean] + # @option instr_sampling_enabled [Boolean] + # @option instr_sampling_percent [Integer] + # @option instr_sampler [#sampling_happened?,Module] + # @option instr_sample_this [Boolean] + # @return [Hash>] + # + # @api public + # @since 1.16.0 + def clear_locks_of_acquirer( + acquirer_id, + lock_scan_size: config['clear_locks_of__lock_scan_size'], # steep:ignore + queue_scan_size: config['clear_locks_of__queue_scan_size'], # steep:ingore + logger: config['logger'], # steep:ignore + instrumenter: config['instrumenter'], # steep:ignore + instrument: nil, + log_sampling_enabled: config['log_sampling_enabled'], # steep:ignore + log_sampling_percent: config['log_sampling_percent'], # steep:ignore + log_sampler: config['log_sampler'], # steep:ignore + log_sample_this: false, + instr_sampling_enabled: config['instr_sampling_enabled'], # steep:ignore + instr_sampling_percent: config['instr_sampling_percent'], # steep:ignore + instr_sampler: config['instr_sampler'], # steep:ignore + instr_sample_this: false + ) + RedisQueuedLocks::Acquirer::ReleaseLocksOfAcquirer.release_locks_of_acquirer( + acquirer_id, + redis_client, + lock_scan_size, + queue_scan_size, + logger, + instrumenter, + instrument, + log_sampling_enabled, + log_sampling_percent, + log_sampler, + log_sample_this, + instr_sampling_enabled, + instr_sampling_percent, + instr_sampler, + instr_sample_this + ) + end + alias_method :release_locks_of_acquirer, :clear_locks_of_acquirer + # Release all locks of the current acquirer/host and # remove the current acquirer/host from all queues; # diff --git a/lib/redis_queued_locks/config.rb b/lib/redis_queued_locks/config.rb index 6303809..1c9d4b3 100644 --- a/lib/redis_queued_locks/config.rb +++ b/lib/redis_queued_locks/config.rb @@ -21,6 +21,7 @@ class RedisQueuedLocks::Config setting('lock_release_batch_size', 100) setting('clear_locks_of__lock_scan_size', 300) setting('clear_locks_of__queue_scan_size', 300) + setting('clear_locks_of_host__queue_cleanup_cursor_count', 300) setting('key_extraction_batch_size', 500) setting('instrumenter', RedisQueuedLocks::Instrument::VoidNotifier) setting('uniq_identifier', -> { RedisQueuedLocks::Resource.calc_uniq_identity }) @@ -84,6 +85,7 @@ class RedisQueuedLocks::Config validate('lock_release_batch_size') { |val| val.is_a?(Integer) } validate('clear_locks_of__lock_scan_size') { |val| val.is_a?(Integer) } validate('clear_locks_of__queue_scan_size') { |val| val.is_a?(Integer) } + validate('clear_locks_of_host__queue_cleanup_cursor_count') { |val| val.is_a?(Integer) } validate('instrumenter') { |val| RedisQueuedLocks::Instrument.valid_interface?(val) } validate('uniq_identifier') { |val| val.is_a?(Proc) } validate('logger') { |val| RedisQueuedLocks::Logging.valid_interface?(val) } diff --git a/lib/redis_queued_locks/resource.rb b/lib/redis_queued_locks/resource.rb index b80a568..8e33b14 100644 --- a/lib/redis_queued_locks/resource.rb +++ b/lib/redis_queued_locks/resource.rb @@ -2,6 +2,7 @@ # @api private # @since 1.0.0 +# @version 1.16.0 module RedisQueuedLocks::Resource # @return [String] # @@ -67,8 +68,9 @@ def calc_uniq_identity # # @api private # @since 1.0.0 - def acquirer_identifier(process_id, thread_id, fiber_id, ractor_id, identity) - "rql:acq:#{process_id}/#{thread_id}/#{fiber_id}/#{ractor_id}/#{identity}" + # @version 1.16.0 + def acquirer_identifier(process_id, ractor_id, thread_id, fiber_id, identity) + "rql:acq:#{process_id}/#{ractor_id}/#{thread_id}/#{fiber_id}/#{identity}" end # @param process_id [Integer,String] @@ -79,12 +81,82 @@ def acquirer_identifier(process_id, thread_id, fiber_id, ractor_id, identity) # # @api private # @since 1.9.0 - def host_identifier(process_id, thread_id, ractor_id, identity) + # @version 1.16.0 + def host_identifier(process_id, ractor_id, thread_id, identity) # NOTE: # - fiber's object_id is not used cuz we can't analyze fiber objects via ObjectSpace # after the any new Ractor object is created in the current process # (ObjectSpace no longer sees objects of Thread and Fiber classes after that); - "rql:hst:#{process_id}/#{thread_id}/#{ractor_id}/#{identity}" + "rql:hst:#{process_id}/#{ractor_id}/#{thread_id}/#{identity}" + end + + # Сформировать host_id, который был под конкретным acuirer'ом (из acq_id). + # + # @param acq_id [String] + # @return [String] + # + # @api private + # @since 1.16.0 + def acquirer_host(acq_id) + # Step №1. extract an identity string + last_delimeter_position = acq_id.rindex('/') #: Integer + identity_part = acq_id[(last_delimeter_position + 1)...] #: String + + # TODO: rewrite with String#scan + process_id_ending_position = acq_id.index('/') #: Integer + ractor_id_ending_position = acq_id.index('/', process_id_ending_position + 1) #: Integer + thread_id_ending_position = acq_id.index('/', ractor_id_ending_position + 1) #: Integer + + # Step №2. extract host identifier part ("process_id/ractor_id/thread_id") + # - (9) is "rql:acq:" part of the acquirer identifier string + # - (-1) is "string part without `/` symbol at the end of the string" + host_identifier_parts = acq_id[9...(thread_id_ending_position - 1)] #: String + + # Step №3. build host identifier from extracted parts + "rql:hst:#{host_identifier_parts}/#{identity_part}" + end + + # Получаем MATCH-паттерн для Redis-команд для всех acquirer'ов, сооветтвующих + # кокнретному host_id. + # + # @param host_id [String] + # @return [String] + # + # @api private + # @since 1.16.0 + def acquirer_pattern_from_host(host_id) + host_identity = extract_identity(host_id) + non_identified_part = extract_non_identified_part(host_id) + + "rql:acq:#{non_identified_part}/*/#{host_identity}" + end + + # Вытащить identity-стрингу из имени acquirer'а или host'а. + # + # @param host_or_acquirer_id [String] + # @return [String] + # + # @api private + # @since 1.16.0 + def extract_identity(host_or_acquirer_id) + last_delimeter_position = host_or_acquirer_id.rindex('/') #: Integer + host_or_acquirer_id[(last_delimeter_position + 1)...] #: String + end + + # Вытащить из acquirer'а или host'а всю часть идентификатора без identity-части. + # Нужно для скана acquirer'ов по очередям, когда мы дропаем аквиреров из очередей + # только по host_id (не зная и не используя acquirer_id). + # 9 - это значит пропустить "rql:acq:" и "rql:hst:" с начала строки. + # + # + # @param host_or_acquirer_id [String] + # @return [String] + # + # @api private + # @since 1.16.0 + def extract_non_identified_part(host_or_acquirer_id) + last_delimeter_position = host_or_acquirer_id.rindex('/') #: Integer + host_or_acquirer_id[9...(last_delimeter_position - 1)] #: String end # @param lock_name [String] @@ -209,6 +281,7 @@ def get_process_id # # @api private # @since 1.9.0 + # @version 1.16.0 def possible_host_identifiers(identity) # NOTE №1: we can not use ObjectSpace.each_object for Thread and Fiber cuz after the any # ractor creation the ObjectSpace stops seeing ::Thread and ::Fiber objects: each_object @@ -219,10 +292,10 @@ def possible_host_identifiers(identity) # @type var current_process_id: Integer current_process_id = get_process_id - # @type var current_threads: Array[Thread] - current_threads = ::Thread.list # @type var current_ractor_id: Integer current_ractor_id = get_ractor_id + # @type var current_threads: Array[Thread] + current_threads = ::Thread.list # NOTE: steep can't resolve a type of dynamic `[]` literal mutated via inline tap; # steep:ignore:start @@ -231,8 +304,8 @@ def possible_host_identifiers(identity) current_threads.each do |thread| acquirers << host_identifier( current_process_id, - thread.object_id, current_ractor_id, + thread.object_id, identity ) end diff --git a/lib/redis_queued_locks/version.rb b/lib/redis_queued_locks/version.rb index 2b43dd3..1b41c33 100644 --- a/lib/redis_queued_locks/version.rb +++ b/lib/redis_queued_locks/version.rb @@ -5,6 +5,6 @@ module RedisQueuedLocks # # @api public # @since 0.0.1 - # @version 1.15.0 - VERSION = '1.15.0' + # @version 1.16.0 + VERSION = '1.16.0' end diff --git a/sig/redis_queued_locks/acquirer/release_lock.rbs b/sig/redis_queued_locks/acquirer/release_lock.rbs index 01439b5..c42607c 100644 --- a/sig/redis_queued_locks/acquirer/release_lock.rbs +++ b/sig/redis_queued_locks/acquirer/release_lock.rbs @@ -22,7 +22,7 @@ module RedisQueuedLocks RQL::instrObj instrumenter, RQL::loggerObj logger, bool log_sampling_enabled, - Integer log_samplingPercent, + Integer log_sampling_percent, RQL::Logging::samplerObj log_sampler, bool log_sample_this, bool instr_sampling_enabled, diff --git a/sig/redis_queued_locks/acquirer/release_locks_of.rbs b/sig/redis_queued_locks/acquirer/release_locks_of.rbs index 5dbca76..b160acf 100644 --- a/sig/redis_queued_locks/acquirer/release_locks_of.rbs +++ b/sig/redis_queued_locks/acquirer/release_locks_of.rbs @@ -10,6 +10,7 @@ module RedisQueuedLocks ok: bool, result: { rel_key_cnt: Integer, + rel_req_cnt: Integer, tch_queue_cnt: Integer, rel_time: Integer|Float } @@ -33,7 +34,14 @@ module RedisQueuedLocks bool instr_sample_this ) -> releaseResult - type fullyReleaseAllLocksResult = { ok: bool, result: { rel_key_cnt: Integer, tch_queue_cnt: Integer } } + type fullyReleaseAllLocksResult = { + ok: bool, + result: { + rel_key_cnt: Integer, + rel_req_cnt: Integer, + tch_queue_cnt: Integer + } + } private def self.fully_release_locks_of: ( String refused_host_id, String refused_acquirer_id, diff --git a/sig/redis_queued_locks/acquirer/release_locks_of_acquirer.rbs b/sig/redis_queued_locks/acquirer/release_locks_of_acquirer.rbs new file mode 100644 index 0000000..f42db3f --- /dev/null +++ b/sig/redis_queued_locks/acquirer/release_locks_of_acquirer.rbs @@ -0,0 +1,52 @@ +use RedisQueuedLocks as RQL +use RedisClient as RC + +module RedisQueuedLocks + module Acquirer + module ReleaseLocksOfAcquirer + extend RQL::Utilities + + type releaseResult = { + ok: bool, + result: { + rel_key_cnt: Integer, + rel_req_cnt: Integer, + tch_queue_cnt: Integer, + rel_time: Integer|Float + } + } + def self.release_locks_of_acquirer: ( + String acquirer_id, + RC::client redis, + Integer lock_scan_size, + Integer queue_scan_size, + RQL::loggerObj logger, + RQL::instrObj instrumenter, + untyped instrument, + bool log_sampling_enabled, + Integer log_sampling_percent, + RQL::Logging::samplerObj log_sampler, + bool log_sample_this, + bool instr_sampling_enabled, + Integer instr_sampling_percent, + RQL::Instrument::samplerObj instr_sampler, + bool instr_sample_this + ) -> releaseResult + + type fullyReleaseLocksOfAcquirerResult = { + ok: bool, + result: { + rel_key_cnt: Integer, + rel_req_cnt: Integer, + tch_queue_cnt: Integer + } + } + private def self.fully_release_locks_of_acquirer: ( + String refused_acquirer_id, + RC::client redis, + Integer lock_scan_size, + Integer queue_scan_size + ) -> fullyReleaseLocksOfAcquirerResult + end + end +end diff --git a/sig/redis_queued_locks/acquirer/release_locks_of_host.rbs b/sig/redis_queued_locks/acquirer/release_locks_of_host.rbs new file mode 100644 index 0000000..9b83739 --- /dev/null +++ b/sig/redis_queued_locks/acquirer/release_locks_of_host.rbs @@ -0,0 +1,54 @@ +use RedisQueuedLocks as RQL +use RedisClient as RC + +module RedisQueuedLocks + module Acquirer + module ReleaseLocksOfHost + extend RQL::Utilities + + type releaseResult = { + ok: bool, + result: { + rel_key_cnt: Integer, + rel_req_cnt: Integer, + tch_queue_cnt: Integer, + rel_time: Integer|Float + } + } + def self.release_locks_of_host: ( + String host_id, + RC::client redis, + Integer lock_scan_size, + Integer queue_scan_size, + Integer queue_cleanup_cursor_count, + RQL::loggerObj logger, + RQL::instrObj instrumenter, + untyped instrument, + bool log_sampling_enabled, + Integer log_sampling_percent, + RQL::Logging::samplerObj log_sampler, + bool log_sample_this, + bool instr_sampling_enabled, + Integer instr_sampling_percent, + RQL::Instrument::samplerObj instr_sampler, + bool instr_sample_this + ) -> releaseResult + + type fullyReleaseLocksOfHostResult = { + ok: bool, + result: { + rel_key_cnt: Integer, + rel_req_cnt: Integer, + tch_queue_cnt: Integer + } + } + private def self.fully_release_locks_of_host: ( + String refused_host_id, + RC::client redis, + Integer lock_scan_size, + Integer queue_scan_size, + Integer queue_cleanup_cursor_count + ) -> fullyReleaseLocksOfHostResult + end + end +end diff --git a/sig/redis_queued_locks/client.rbs b/sig/redis_queued_locks/client.rbs index f4c05a9..e677fdd 100644 --- a/sig/redis_queued_locks/client.rbs +++ b/sig/redis_queued_locks/client.rbs @@ -186,6 +186,43 @@ module RedisQueuedLocks ) -> RQL::Acquirer::ReleaseLocksOf::releaseResult alias release_locks_of clear_locks_of + def clear_locks_of_host: ( + String host_id, + ?lock_scan_size: Integer, + ?queue_scan_size: Integer, + ?queue_cleanup_cursor_count: Integer, + ?logger: RQL::loggerObj, + ?instrumenter: RQL::instrObj, + ?instrument: untyped?, + ?log_sampling_enabled: bool, + ?log_sampling_percent: Integer, + ?log_sampler: RQL::Logging::samplerObj, + ?log_sample_this: bool, + ?instr_sampling_enabled: bool, + ?instr_sampling_percent: Integer, + ?instr_sampler: RQL::Instrument::samplerObj, + ?instr_sample_this: bool + ) -> RQL::Acquirer::ReleaseLocksOfHost::releaseResult + alias release_locks_of_host clear_locks_of_host + + def clear_locks_of_acquirer: ( + String acquirer_id, + ?lock_scan_size: Integer, + ?queue_scan_size: Integer, + ?logger: RQL::loggerObj, + ?instrumenter: RQL::instrObj, + ?instrument: untyped?, + ?log_sampling_enabled: bool, + ?log_sampling_percent: Integer, + ?log_sampler: RQL::Logging::samplerObj, + ?log_sample_this: bool, + ?instr_sampling_enabled: bool, + ?instr_sampling_percent: Integer, + ?instr_sampler: RQL::Instrument::samplerObj, + ?instr_sample_this: bool + ) -> RQL::Acquirer::ReleaseLocksOfHost::releaseResult + alias release_locks_of_acquirer clear_locks_of_acquirer + def clear_current_locks: ( ?lock_scan_size: Integer, ?queue_scan_size: Integer, diff --git a/sig/redis_queued_locks/resource.rbs b/sig/redis_queued_locks/resource.rbs index 90290af..79183b5 100644 --- a/sig/redis_queued_locks/resource.rbs +++ b/sig/redis_queued_locks/resource.rbs @@ -11,19 +11,24 @@ module RedisQueuedLocks def self.calc_uniq_identity: () -> String def self.acquirer_identifier: ( Integer|String process_id, + Integer|String ractor_id, Integer|String thread_id, Integer|String fiber_id, - Integer|String ractor_id, String identity ) -> String def self.host_identifier: ( Integer|String process_id, - Integer|String thread_id, Integer|string ractor_id, + Integer|String thread_id, String identity ) -> String + def self.acquirer_host: (String acq_id) -> String + def self.acquirer_pattern_from_host: (String host_id) -> String + def self.extract_identity: (String host_or_acquirer_id) -> String + def self.extract_non_identified_part: (String host_or_acquirer_id) -> String + def self.prepare_lock_key: (String lock_name) -> String def self.prepare_lock_queue: (String lock_name) -> String def self.prepare_read_lock_queue: (String lock_name) -> String diff --git a/sig/vendor/semantic_logger.rbs b/sig/vendor/semantic_logger.rbs index f6d1b02..606d762 100644 --- a/sig/vendor/semantic_logger.rbs +++ b/sig/vendor/semantic_logger.rbs @@ -1,4 +1,5 @@ module SemanticLogger class Logger + def debug: (*untyped, **untyped) { (?) -> untyped } -> untyped end end diff --git a/spec/redis_queued_locks_spec.rb b/spec/redis_queued_locks_spec.rb index 14b736f..2b30300 100644 --- a/spec/redis_queued_locks_spec.rb +++ b/spec/redis_queued_locks_spec.rb @@ -41,12 +41,12 @@ # verify current host expect(client.current_host_id).to eq( - "rql:hst:#{current_process_id}/#{current_thread_id}/#{current_ractor_id}/#{uniq_identity}" + "rql:hst:#{current_process_id}/#{current_ractor_id}/#{current_thread_id}/#{uniq_identity}" ) # collect possible hosts expected_hosts = current_thread_list.map do |thread| - "rql:hst:#{current_process_id}/#{thread.object_id}/#{current_ractor_id}/#{uniq_identity}" + "rql:hst:#{current_process_id}/#{current_ractor_id}/#{thread.object_id}/#{uniq_identity}" end expect(client.possible_host_ids).to contain_exactly(*expected_hosts) @@ -55,9 +55,9 @@ new_thread1 = Thread.new { loop { sleep(0.5) } } new_thread2 = Thread.new { loop { sleep(0.5) } } new_thread1_host_id = - "rql:hst:#{current_process_id}/#{new_thread1.object_id}/#{current_ractor_id}/#{uniq_identity}" + "rql:hst:#{current_process_id}/#{current_ractor_id}/#{new_thread1.object_id}/#{uniq_identity}" new_thread2_host_id = - "rql:hst:#{current_process_id}/#{new_thread2.object_id}/#{current_ractor_id}/#{uniq_identity}" + "rql:hst:#{current_process_id}/#{current_ractor_id}/#{new_thread2.object_id}/#{uniq_identity}" expected_hosts << new_thread1_host_id expected_hosts << new_thread2_host_id expect(client.possible_host_ids).to contain_exactly(*expected_hosts) @@ -68,7 +68,7 @@ new_thread2.tap(&:kill).tap(&:join) new_current_thread_list = Thread.list expected_hosts = new_current_thread_list.map do |thread| - "rql:hst:#{current_process_id}/#{thread.object_id}/#{current_ractor_id}/#{uniq_identity}" + "rql:hst:#{current_process_id}/#{current_ractor_id}/#{thread.object_id}/#{uniq_identity}" end last_possible_host_ids = client.possible_host_ids expect(last_possible_host_ids).to contain_exactly(*expected_hosts) @@ -2061,6 +2061,7 @@ def notify(event, payload = {}) ok: true, result: match({ rel_key_cnt: 0, # <-- no any key is cleared for side__thread + rel_req_cnt: 1, # <-- one lock request is removed (request from side_thread-acquirer) tch_queue_cnt: 1, # <-- side_thread-acquirer is removed from the queue rel_time: be_a(Numeric) }) @@ -2089,6 +2090,7 @@ def notify(event, payload = {}) ok: true, result: match({ rel_key_cnt: 1, # <-- removed locks from in_fork__main_thread + rel_req_cnt: 0, # <-- no any requests are dropped from lock queues tch_queue_cnt: 0, # <-- no any queue is "touched" cuz main_thread "is not inside the any queue" rel_time: be_a(Numeric) }) @@ -2120,6 +2122,7 @@ def notify(event, payload = {}) ok: true, result: match({ rel_key_cnt: 1, # <-- removed locks from current process + rel_req_cnt: 0, # <-- no any requests are dropped from lock queues tch_queue_cnt: 0, # <-- no any queue is "touched" cuz current process "is not inside the any queue" rel_time: be_a(Numeric) }) @@ -2164,6 +2167,7 @@ def notify(event, payload = {}) ok: true, result: match({ rel_key_cnt: 1, # <-- removed locks from current process + rel_req_cnt: 0, # <-- no any requests are dropped from lock queues tch_queue_cnt: 0, # <-- no any queue is "touched" cuz current process "is not inside the any queue" rel_time: be_a(Numeric) }) @@ -2185,6 +2189,7 @@ def notify(event, payload = {}) 'acq_id' => be_a(String), 'rel_time' => be_a(Numeric), 'rel_key_cnt' => be_a(Integer), + 'rel_req_cnt' => be_a(Integer), 'tch_queue_cnt' => be_a(Integer) }) expect(test_notifier.last_in_memory_rlo_event).to match({ @@ -2193,6 +2198,7 @@ def notify(event, payload = {}) acq_id: client.acq_id, # (String) last lock is released from the current thread rel_time: be_a(Numeric), rel_key_cnt: be_a(Integer), + rel_req_cnt: be_a(Integer), tch_queue_cnt: be_a(Integer) }) end