From b7e32f0d94ffc85655b045bac28404ab11c3a0aa Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Fri, 5 Jan 2024 09:46:40 +1100 Subject: [PATCH 1/4] Wrap job invocation in Rails executor when using ActiveRecord --- lib/que/active_record/connection.rb | 30 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/lib/que/active_record/connection.rb b/lib/que/active_record/connection.rb index aafeb878..c00d30eb 100644 --- a/lib/que/active_record/connection.rb +++ b/lib/que/active_record/connection.rb @@ -2,35 +2,39 @@ module Que module ActiveRecord + class << self + # Use Rails' executor (if present) to make sure that the connection + # we're using isn't taken from us while the block runs. See + # https://github.com/que-rb/que/issues/166#issuecomment-274218910 + def wrap_in_rails_executor(&block) + if defined?(::Rails.application.executor) + ::Rails.application.executor.wrap(&block) + else + yield + end + end + end + module Connection class << self private # Check out a PG::Connection object from ActiveRecord's pool. def checkout - wrap_in_rails_executor do + Que::ActiveRecord.wrap_in_rails_executor do ::ActiveRecord::Base.connection_pool.with_connection do |conn| yield conn.raw_connection end end end - - # Use Rails' executor (if present) to make sure that the connection - # we're using isn't taken from us while the block runs. See - # https://github.com/que-rb/que/issues/166#issuecomment-274218910 - def wrap_in_rails_executor(&block) - if defined?(::Rails.application.executor) - ::Rails.application.executor.wrap(&block) - else - yield - end - end end module JobMiddleware class << self def call(job) - yield + Que::ActiveRecord.wrap_in_rails_executor do + yield + end # ActiveRecord will check out connections to the current thread when # queries are executed and not return them to the pool until From e675fee8af9280c61d76f66a4afd01b139569c47 Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Fri, 5 Jan 2024 10:33:54 +1100 Subject: [PATCH 2/4] Add comment --- lib/que/active_record/connection.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/que/active_record/connection.rb b/lib/que/active_record/connection.rb index c00d30eb..c6be8a18 100644 --- a/lib/que/active_record/connection.rb +++ b/lib/que/active_record/connection.rb @@ -3,9 +3,6 @@ module Que module ActiveRecord class << self - # Use Rails' executor (if present) to make sure that the connection - # we're using isn't taken from us while the block runs. See - # https://github.com/que-rb/que/issues/166#issuecomment-274218910 def wrap_in_rails_executor(&block) if defined?(::Rails.application.executor) ::Rails.application.executor.wrap(&block) @@ -21,6 +18,9 @@ class << self # Check out a PG::Connection object from ActiveRecord's pool. def checkout + # Use Rails' executor (if present) to make sure that the connection + # we're using isn't taken from us while the block runs. See + # https://github.com/que-rb/que/issues/166#issuecomment-274218910 Que::ActiveRecord.wrap_in_rails_executor do ::ActiveRecord::Base.connection_pool.with_connection do |conn| yield conn.raw_connection @@ -32,6 +32,9 @@ def checkout module JobMiddleware class << self def call(job) + # Use Rails' executor (if present) to make sure that the connection + # used by the job isn't returned to the pool prematurely. See + # https://github.com/que-rb/que/issues/411 Que::ActiveRecord.wrap_in_rails_executor do yield end From f24adb21fd3b80bf28eb681033c32a0390ee6332 Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Thu, 27 Jun 2024 20:53:18 +1000 Subject: [PATCH 3/4] Don't clear ActiveRecord connections when in nested job --- lib/que/active_record/connection.rb | 41 +++++++++++++++++------ spec/que/active_record/connection_spec.rb | 19 +++++++++++ 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/lib/que/active_record/connection.rb b/lib/que/active_record/connection.rb index c6be8a18..55ef4965 100644 --- a/lib/que/active_record/connection.rb +++ b/lib/que/active_record/connection.rb @@ -3,6 +3,10 @@ module Que module ActiveRecord class << self + def active_rails_executor? + defined?(::Rails.application.executor) && ::Rails.application.executor.active? + end + def wrap_in_rails_executor(&block) if defined?(::Rails.application.executor) ::Rails.application.executor.wrap(&block) @@ -39,17 +43,32 @@ def call(job) yield end - # ActiveRecord will check out connections to the current thread when - # queries are executed and not return them to the pool until - # explicitly requested to. I'm not wild about this API design, and - # it doesn't pose a problem for the typical case of workers using a - # single PG connection (since we ensure that connection is checked - # in and checked out responsibly), but since ActiveRecord supports - # connections to multiple databases, it's easy for people using that - # feature to unknowingly leak connections to other databases. So, - # take the additional step of telling ActiveRecord to check in all - # of the current thread's connections after each job is run. - ::ActiveRecord::Base.clear_active_connections! unless job.class.resolve_que_setting(:run_synchronously) + clear_active_connections_if_needed!(job) + end + + private + + # ActiveRecord will check out connections to the current thread when + # queries are executed and not return them to the pool until + # explicitly requested to. I'm not wild about this API design, and + # it doesn't pose a problem for the typical case of workers using a + # single PG connection (since we ensure that connection is checked + # in and checked out responsibly), but since ActiveRecord supports + # connections to multiple databases, it's easy for people using that + # feature to unknowingly leak connections to other databases. So, + # take the additional step of telling ActiveRecord to check in all + # of the current thread's connections after each job is run. + def clear_active_connections_if_needed!(job) + # don't clean in synchronous mode + # see https://github.com/que-rb/que/pull/393 + return if job.class.resolve_que_setting(:run_synchronously) + + # don't clear connections in nested jobs, + # i.e. clear only if this is the outermost instance of + # Que::ActiveRecord::Connection::JobMiddleware + return if Que::ActiveRecord.active_rails_executor? + + ::ActiveRecord::Base.clear_active_connections! end end end diff --git a/spec/que/active_record/connection_spec.rb b/spec/que/active_record/connection_spec.rb index 21458d92..ed541632 100644 --- a/spec/que/active_record/connection_spec.rb +++ b/spec/que/active_record/connection_spec.rb @@ -62,5 +62,24 @@ def run(*args) assert SecondDatabaseModel.connection_handler.active_connections? end + + it "shouldn't clear connections to secondary DBs if within an active rails executor" do + # This is a hacky spec, but it's better than requiring Rails. + rails, application, executor = 3.times.map { Object.new } + application.define_singleton_method(:executor) { executor } + rails.define_singleton_method(:application) { application } + executor.define_singleton_method(:wrap) { |&block| block.call } + executor.define_singleton_method(:active?) { true } + + refute defined?(::Rails) + ::Rails = rails + + SecondDatabaseModelJob.run + + assert SecondDatabaseModel.connection_handler.active_connections? + + Object.send :remove_const, :Rails + refute defined?(::Rails) + end end end From ea71a078862e0d839cfd30a3d4785ef16ef4ccef Mon Sep 17 00:00:00 2001 From: Tomasz Gieniusz Date: Thu, 27 Jun 2024 21:47:04 +1000 Subject: [PATCH 4/4] Improve comment --- lib/que/active_record/connection.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/que/active_record/connection.rb b/lib/que/active_record/connection.rb index 55ef4965..6212c8b9 100644 --- a/lib/que/active_record/connection.rb +++ b/lib/que/active_record/connection.rb @@ -63,9 +63,9 @@ def clear_active_connections_if_needed!(job) # see https://github.com/que-rb/que/pull/393 return if job.class.resolve_que_setting(:run_synchronously) - # don't clear connections in nested jobs, - # i.e. clear only if this is the outermost instance of - # Que::ActiveRecord::Connection::JobMiddleware + # don't clear connections in nested jobs executed synchronously + # i.e. while we're still inside of the rails executor + # see https://github.com/que-rb/que/pull/412#issuecomment-2194412783 return if Que::ActiveRecord.active_rails_executor? ::ActiveRecord::Base.clear_active_connections!