Skip to content

Commit

Permalink
Add idle and reap methods to connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ttstarck committed Jul 23, 2024
1 parent 2b6fcaa commit 19f4a2b
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 3 deletions.
11 changes: 11 additions & 0 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def reload(&block)
@available.shutdown(reload: true, &block)
end

## Reaps idle connections that have been idle for over +idle_seconds+.
# +idle_seconds+ defaults to 0, i.e. reap all idle connections.
def reap(idle_seconds = 0, &block)
@available.reap(idle_seconds, &block)
end

# Size of this connection pool
attr_reader :size
# Automatically drop all connections after fork
Expand All @@ -169,6 +175,11 @@ def reload(&block)
def available
@available.length
end

# Number of pool entries created and idle in the pool.
def idle
@available.idle
end
end

require_relative "connection_pool/timed_stack"
Expand Down
48 changes: 46 additions & 2 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ def shutdown(reload: false, &block)
end
end

##
# Reaps connections that were checked in more than +idle_seconds+ ago.
def reap(idle_seconds, &block)
raise ArgumentError, "reap must receive a block" unless block
raise ArgumentError, "idle_seconds must be a number" unless idle_seconds.is_a?(Numeric)

@mutex.synchronize do
reap_start_time = current_time

reap_idle_connections(idle_seconds, reap_start_time, &block)
end
end

##
# Returns +true+ if there are no available connections.

Expand All @@ -112,6 +125,12 @@ def length
@max - @created + @que.length
end

##
# The number of connections created and available on the stack.
def idle
@que.length
end

private

def current_time
Expand All @@ -133,7 +152,7 @@ def connection_stored?(options = nil)
# This method must return a connection from the stack.

def fetch_connection(options = nil)
@que.pop
@que.pop&.first
end

##
Expand All @@ -149,13 +168,38 @@ def shutdown_connections(options = nil)
@created = 0
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method iterates over the connections in the stack and reaps the oldest idle connections one at a time until
# the first connection is not idle. This requires that the stack is kept in order of checked in time (oldest first).

def reap_idle_connections(idle_seconds, reap_start_time, &reap_block)
while idle_connections?(idle_seconds, reap_start_time)
conn, _last_checked_out = @que.shift
reap_block.call(conn)
end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# Returns true if the first connection in the stack has been idle for more than idle_seconds

def idle_connections?(idle_seconds, reap_start_time)
if connection_stored?
_conn, last_checked_out = @que.first
reap_start_time - last_checked_out > idle_seconds
end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return +obj+ to the stack.

def store_connection(obj, options = nil)
@que.push obj
@que.push [obj, current_time]
end

##
Expand Down
70 changes: 69 additions & 1 deletion test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def do_something_with_block
end

def respond_to?(method_id, *args)
method_id == :do_magic || super(method_id, *args)
method_id == :do_magic || super
end
end

Expand Down Expand Up @@ -467,6 +467,74 @@ def test_shutdown_is_executed_for_all_connections_in_wrapped_pool
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
end

def test_reap_removes_idle_connections
recorders = []
pool = ConnectionPool.new(size: 1) do
Recorder.new.tap { |r| recorders << r }
end

pool.with { |conn| conn }

assert_equal 1, pool.idle

pool.reap { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]], recorders.map(&:calls)
end

def test_reap_removes_all_idle_connections
recorders = []
pool = ConnectionPool.new(size: 3) do
Recorder.new.tap { |r| recorders << r }
end
threads = use_pool(pool, 3)
kill_threads(threads)

assert_equal 3, pool.idle

pool.reap { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]] * 3, recorders.map(&:calls)
end

def test_reap_does_not_remove_connections_if_outside_idle_time
pool = ConnectionPool.new(size: 1) { Object.new }

pool.with { |conn| conn }

pool.reap(1000) { |conn| flunk "should not reap active connection" }
end

def test_idle_returns_number_of_idle_connections
pool = ConnectionPool.new(size: 1) { Object.new }

assert_equal 0, pool.idle

pool.checkout

assert_equal 0, pool.idle

pool.checkin

assert_equal 1, pool.idle
end

def test_idle_with_multiple_connections
pool = ConnectionPool.new(size: 3) { Object.new }

assert_equal 0, pool.idle

threads = use_pool(pool, 3)

assert_equal 0, pool.idle

kill_threads(threads)

assert_equal 3, pool.idle
end

def test_wrapper_wrapped_pool
wrapper = ConnectionPool::Wrapper.new { NetworkConnection.new }
assert_equal ConnectionPool, wrapper.wrapped_pool.class
Expand Down
131 changes: 131 additions & 0 deletions test/test_connection_pool_timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def test_length
assert_equal 1, stack.length
end

def test_idle
stack = ConnectionPool::TimedStack.new(1) { Object.new }

assert_equal 0, stack.idle

popped = stack.pop

assert_equal 0, stack.idle

stack.push popped

assert_equal 1, stack.idle
end

def test_object_creation_fails
stack = ConnectionPool::TimedStack.new(2) { raise "failure" }

Expand Down Expand Up @@ -147,4 +161,121 @@ def test_shutdown
refute_empty called
assert_empty @stack
end

def test_reap
@stack.push Object.new

called = []

@stack.reap(0) do |object|
called << object
end

refute_empty called
assert_empty @stack
end

def test_reap_large_idle_seconds
@stack.push Object.new

called = []

@stack.reap(100) do |object|
called << object
end

assert_empty called
refute_empty @stack
end

def test_reap_no_block
assert_raises(ArgumentError) do
@stack.reap(0)
end
end

def test_reap_non_numeric_idle_seconds
assert_raises(ArgumentError) do
@stack.reap("0") { |object| object }
end
end

def test_reap_with_multiple_connections
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(1.5) do |object|
called << object
end
end

assert_equal [conn1], called
refute_empty stack
assert_equal 1, stack.idle
end

def test_reap_with_multiple_connections_and_zero_idle_seconds
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(0) do |object|
called << object
end
end

assert_equal [conn1, conn2], called
assert_empty stack
end

def test_reap_with_multiple_connections_and_idle_seconds_outside_range
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(3) do |object|
called << object
end
end

assert_empty called
assert_equal 2, stack.idle
end
end

0 comments on commit 19f4a2b

Please sign in to comment.