Skip to content

Commit

Permalink
Add reap functionality (#187)
Browse files Browse the repository at this point in the history
* Add idle and reap methods to connection pool

* Change ConnectionPool#reap idle_seconds default value from 0 to 60

0 seconds is on the aggresive, setting to a more sane value of 60 seconds to prevent connection reaping/creation thrash.

* Update README with information on new ConnectionPool methods #reap and #idle

* add section to README about creating a reaper thread
  • Loading branch information
ttstarck authored Jul 27, 2024
1 parent 2b6fcaa commit 062500b
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 3 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ cp.with { |conn| conn.get('some-count') }

Like `shutdown`, this will block until all connections are checked in and closed.

## Reap

You can reap idle connections in the ConnectionPool instance to close connections that were created but have not been used for a certain amount of time. This can be useful to run periodically in a separate thread especially if keeping the connection open is resource intensive.

You can specify how many seconds the connections have to be idle for them to be reaped.
Defaults to 60 seconds.

```ruby
cp = ConnectionPool.new { Redis.new }
cp.reap(300) { |conn| conn.close } # Reaps connections that have been idle for 300 seconds (5 minutes).
```

### Reaper Thread

You can start your own reaper thread to reap idle connections in the ConnectionPool instance on a regular interval.

```ruby
cp = ConnectionPool.new { Redis.new }

# Start a reaper thread to reap connections that have been idle for 300 seconds (5 minutes).
Thread.new do
loop do
cp.reap(300) { |conn| conn.close }
sleep 300
end
end
```

## Current State

There are several methods that return information about a pool.
Expand All @@ -109,11 +137,15 @@ There are several methods that return information about a pool.
cp = ConnectionPool.new(size: 10) { Redis.new }
cp.size # => 10
cp.available # => 10
cp.idle # => 0

cp.with do |conn|
cp.size # => 10
cp.available # => 9
cp.idle # => 0
end

cp.idle # => 1
```

Notes
Expand Down
11 changes: 11 additions & 0 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def reload(&block)
@available.shutdown(reload: true, &block)
end

## Reaps idle connections that have been idle for over +idle_seconds+.
# +idle_seconds+ defaults to 60.
def reap(idle_seconds = 60, &block)
@available.reap(idle_seconds, &block)
end

# Size of this connection pool
attr_reader :size
# Automatically drop all connections after fork
Expand All @@ -169,6 +175,11 @@ def reload(&block)
def available
@available.length
end

# Number of pool entries created and idle in the pool.
def idle
@available.idle
end
end

require_relative "connection_pool/timed_stack"
Expand Down
48 changes: 46 additions & 2 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ def shutdown(reload: false, &block)
end
end

##
# Reaps connections that were checked in more than +idle_seconds+ ago.
def reap(idle_seconds, &block)
raise ArgumentError, "reap must receive a block" unless block
raise ArgumentError, "idle_seconds must be a number" unless idle_seconds.is_a?(Numeric)

@mutex.synchronize do
reap_start_time = current_time

reap_idle_connections(idle_seconds, reap_start_time, &block)
end
end

##
# Returns +true+ if there are no available connections.

Expand All @@ -112,6 +125,12 @@ def length
@max - @created + @que.length
end

##
# The number of connections created and available on the stack.
def idle
@que.length
end

private

def current_time
Expand All @@ -133,7 +152,7 @@ def connection_stored?(options = nil)
# This method must return a connection from the stack.

def fetch_connection(options = nil)
@que.pop
@que.pop&.first
end

##
Expand All @@ -149,13 +168,38 @@ def shutdown_connections(options = nil)
@created = 0
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method iterates over the connections in the stack and reaps the oldest idle connections one at a time until
# the first connection is not idle. This requires that the stack is kept in order of checked in time (oldest first).

def reap_idle_connections(idle_seconds, reap_start_time, &reap_block)
while idle_connections?(idle_seconds, reap_start_time)
conn, _last_checked_out = @que.shift
reap_block.call(conn)
end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# Returns true if the first connection in the stack has been idle for more than idle_seconds

def idle_connections?(idle_seconds, reap_start_time)
if connection_stored?
_conn, last_checked_out = @que.first
reap_start_time - last_checked_out > idle_seconds
end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return +obj+ to the stack.

def store_connection(obj, options = nil)
@que.push obj
@que.push [obj, current_time]
end

##
Expand Down
70 changes: 69 additions & 1 deletion test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def do_something_with_block
end

def respond_to?(method_id, *args)
method_id == :do_magic || super(method_id, *args)
method_id == :do_magic || super
end
end

Expand Down Expand Up @@ -467,6 +467,74 @@ def test_shutdown_is_executed_for_all_connections_in_wrapped_pool
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
end

def test_reap_removes_idle_connections
recorders = []
pool = ConnectionPool.new(size: 1) do
Recorder.new.tap { |r| recorders << r }
end

pool.with { |conn| conn }

assert_equal 1, pool.idle

pool.reap(0) { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]], recorders.map(&:calls)
end

def test_reap_removes_all_idle_connections
recorders = []
pool = ConnectionPool.new(size: 3) do
Recorder.new.tap { |r| recorders << r }
end
threads = use_pool(pool, 3)
kill_threads(threads)

assert_equal 3, pool.idle

pool.reap(0) { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]] * 3, recorders.map(&:calls)
end

def test_reap_does_not_remove_connections_if_outside_idle_time
pool = ConnectionPool.new(size: 1) { Object.new }

pool.with { |conn| conn }

pool.reap(1000) { |conn| flunk "should not reap active connection" }
end

def test_idle_returns_number_of_idle_connections
pool = ConnectionPool.new(size: 1) { Object.new }

assert_equal 0, pool.idle

pool.checkout

assert_equal 0, pool.idle

pool.checkin

assert_equal 1, pool.idle
end

def test_idle_with_multiple_connections
pool = ConnectionPool.new(size: 3) { Object.new }

assert_equal 0, pool.idle

threads = use_pool(pool, 3)

assert_equal 0, pool.idle

kill_threads(threads)

assert_equal 3, pool.idle
end

def test_wrapper_wrapped_pool
wrapper = ConnectionPool::Wrapper.new { NetworkConnection.new }
assert_equal ConnectionPool, wrapper.wrapped_pool.class
Expand Down
131 changes: 131 additions & 0 deletions test/test_connection_pool_timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def test_length
assert_equal 1, stack.length
end

def test_idle
stack = ConnectionPool::TimedStack.new(1) { Object.new }

assert_equal 0, stack.idle

popped = stack.pop

assert_equal 0, stack.idle

stack.push popped

assert_equal 1, stack.idle
end

def test_object_creation_fails
stack = ConnectionPool::TimedStack.new(2) { raise "failure" }

Expand Down Expand Up @@ -147,4 +161,121 @@ def test_shutdown
refute_empty called
assert_empty @stack
end

def test_reap
@stack.push Object.new

called = []

@stack.reap(0) do |object|
called << object
end

refute_empty called
assert_empty @stack
end

def test_reap_large_idle_seconds
@stack.push Object.new

called = []

@stack.reap(100) do |object|
called << object
end

assert_empty called
refute_empty @stack
end

def test_reap_no_block
assert_raises(ArgumentError) do
@stack.reap(0)
end
end

def test_reap_non_numeric_idle_seconds
assert_raises(ArgumentError) do
@stack.reap("0") { |object| object }
end
end

def test_reap_with_multiple_connections
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(1.5) do |object|
called << object
end
end

assert_equal [conn1], called
refute_empty stack
assert_equal 1, stack.idle
end

def test_reap_with_multiple_connections_and_zero_idle_seconds
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(0) do |object|
called << object
end
end

assert_equal [conn1, conn2], called
assert_empty stack
end

def test_reap_with_multiple_connections_and_idle_seconds_outside_range
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(3) do |object|
called << object
end
end

assert_empty called
assert_equal 2, stack.idle
end
end

0 comments on commit 062500b

Please sign in to comment.