-
Notifications
You must be signed in to change notification settings - Fork 375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Formalize Barrier behavior during waiting #3464
Changes from all commits
aa67ab1
5fcde1f
bd5ded8
7d63a93
f1f889d
7f9195d
7e7be77
8106692
0434899
0c8c034
4991002
7d13ea6
dd784af
3b7a6f1
163d08c
1c46f57
8121883
d54d815
3e91f3b
0f2fb9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,60 +87,106 @@ def shutdown! | |
@worker.stop | ||
end | ||
|
||
# Barrier provides a mechanism to fence execution until a condition happens | ||
# Provides a mechanism to fence execution until a condition happens. | ||
# | ||
# The barrier is created when a lengthy process (e.g. remote | ||
# configuration retrieval over network) starts. | ||
# The barrier is initialized with an optional timeout, which is | ||
# the upper bound on how long the clients want to wait for the work | ||
# to complete. | ||
# | ||
# When work completes, the thread performing the work should call | ||
# +lift+ to lift the barrier. | ||
# | ||
# Other threads can call +wait_once+ at any time to wait for the | ||
# work to complete, up to the smaller of the barrier timeout since | ||
# the work started or the per-wait timeout since waiting began. | ||
# Once the barrier timeout elapsed since creation of the barrier, | ||
# all waits return immediately. | ||
# | ||
# @note This is an internal class. | ||
class Barrier | ||
def initialize(timeout = nil) | ||
@once = false | ||
@timeout = timeout | ||
@lifted = false | ||
@deadline = timeout && Core::Utils::Time.get_time + timeout | ||
|
||
@mutex = Mutex.new | ||
@condition = ConditionVariable.new | ||
end | ||
|
||
# Wait for first lift to happen, otherwise don't wait | ||
# Wait for first lift to happen, up to the barrier timeout since | ||
# the barrier was created. | ||
# | ||
# If timeout is provided in this call, waits up to the smaller of | ||
# the provided wait timeout and the barrier timeout since the | ||
# barrier was created. | ||
Comment on lines
+120
to
+122
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a private class, and we never actually need to provide this second |
||
# | ||
# If neither wait timeout is provided in this call nor the | ||
# barrier timeout in the constructor, waits indefinitely until | ||
# the barrier is lifted. | ||
Comment on lines
+124
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should remove this feature too, since again, the one user of this class doesn't actually use this functionality ;) |
||
# | ||
# Returns: | ||
# - :lift if the barrier was lifted while this method was waiting | ||
# on it | ||
# - :pass if the barrier had been lifted prior to this method | ||
# being called | ||
# - :timeout if this method waited for the maximum permitted time | ||
# and the barrier has not been lifted | ||
# - :expired if the barrier timeout had elapsed but barrier had | ||
# not yet been lifted | ||
def wait_once(timeout = nil) | ||
# TTAS (Test and Test-And-Set) optimisation | ||
# Since @once only ever goes from false to true, this is semantically valid | ||
return :pass if @once | ||
|
||
begin | ||
@mutex.lock | ||
|
||
@mutex.synchronize do | ||
return :pass if @once | ||
|
||
timeout ||= @timeout | ||
now = Core::Utils::Time.get_time | ||
deadline = [ | ||
timeout ? now + timeout : nil, | ||
@deadline, | ||
].compact.min | ||
|
||
timeout = deadline ? deadline - now : nil | ||
# workaround for rubocop & steep trying to mangle the code | ||
if timeout && timeout.public_send(:<=, 0) | ||
Comment on lines
+152
to
+153
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Rubocop is being annoying, I suggest using an inline I'm curious about the issue with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. steep refuses to permit |
||
ret = :expired | ||
return ret | ||
end | ||
|
||
# - starting with Ruby 3.2, ConditionVariable#wait returns nil on | ||
# timeout and an integer otherwise | ||
# - before Ruby 3.2, ConditionVariable returns itself | ||
# so we have to rely on @once having been set | ||
if RUBY_VERSION >= '3.2' | ||
lifted = @condition.wait(@mutex, timeout) | ||
else | ||
@condition.wait(@mutex, timeout) | ||
lifted = @once | ||
end | ||
# so we have to rely on @lifted having been set | ||
lifted = if RUBY_VERSION >= '3.2' | ||
!!@condition.wait(@mutex, timeout) | ||
else | ||
@condition.wait(@mutex, timeout) | ||
@lifted | ||
end | ||
Comment on lines
158
to
+167
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: To be honest, I'm not sure it's worth keeping a multiple line comment + 2 implementations, rather than just using the one implementation that works on all Rubies ;) |
||
|
||
if lifted | ||
:lift | ||
else | ||
@once = true | ||
:timeout | ||
end | ||
ensure | ||
@mutex.unlock | ||
end | ||
end | ||
|
||
# Release all current waiters | ||
# Lift the barrier, releasing all current waiters. | ||
# | ||
# Internally we only use Barrier to wait for one event, thus | ||
# in practice there should only ever be one call to +lift+ | ||
# per instance of Barrier. But, multiple calls to +lift+ are | ||
# technically permitted; second and subsequent calls have no | ||
# effect. | ||
def lift | ||
@mutex.lock | ||
|
||
@once ||= true | ||
@mutex.synchronize do | ||
@once ||= true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this still correct? Should this be |
||
|
||
@condition.broadcast | ||
ensure | ||
@mutex.unlock | ||
@condition.broadcast | ||
end | ||
end | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'spec_helper' | ||
require 'datadog/core/remote/component' | ||
|
||
RSpec.describe Datadog::Core::Remote::Component::Barrier do | ||
let(:delay) { 1.0 } | ||
let(:record) { [] } | ||
let(:timeout) { nil } | ||
let(:instance_timeout) { nil } | ||
|
||
subject(:barrier) { described_class.new(instance_timeout) } | ||
|
||
shared_context('lifter thread') do | ||
let(:thr) do | ||
Thread.new do | ||
loop do | ||
sleep delay | ||
record << :lift | ||
barrier.lift | ||
end | ||
end | ||
end | ||
|
||
before do | ||
record | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: I believe this can be simplified by making |
||
thr.run | ||
end | ||
|
||
after do | ||
thr.kill | ||
thr.join | ||
end | ||
end | ||
|
||
describe '#initialize' do | ||
it 'accepts one argument' do | ||
expect { described_class.new(instance_timeout) }.to_not raise_error | ||
end | ||
|
||
it 'accepts zero argument' do | ||
expect { described_class.new }.to_not raise_error | ||
end | ||
end | ||
|
||
describe '#lift' do | ||
context 'without waiters' do | ||
it 'does not block' do | ||
record << :one | ||
barrier.lift | ||
record << :two | ||
|
||
expect(record).to eq [:one, :two] | ||
end | ||
end | ||
|
||
context 'with waiters' do | ||
it 'unblocks waiters' do | ||
waiter_thread = Thread.new(record) do |record| | ||
record << :one | ||
expect(barrier.wait_once).to eq :lift | ||
record << :two | ||
end.run | ||
|
||
sleep delay | ||
|
||
record << :lift | ||
barrier.lift | ||
waiter_thread.join | ||
|
||
expect(record).to eq [:one, :lift, :two] | ||
end | ||
end | ||
end | ||
|
||
describe '#wait_once' do | ||
include_context 'lifter thread' | ||
|
||
it 'blocks once' do | ||
record << :one | ||
expect(barrier.wait_once).to eq :lift | ||
record << :two | ||
|
||
expect(record).to eq [:one, :lift, :two] | ||
end | ||
|
||
it 'blocks only once' do | ||
record << :one | ||
expect(barrier.wait_once).to eq :lift | ||
record << :two | ||
expect(barrier.wait_once).to eq :pass | ||
record << :three | ||
|
||
expect(record).to eq [:one, :lift, :two, :three] | ||
end | ||
|
||
context('with a local timeout') do | ||
let(:timeout) { delay / 4 } | ||
|
||
context('shorter than lift') do | ||
it 'unblocks on timeout' do | ||
elapsed = Datadog::Core::Utils::Time.measure do | ||
record << :one | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :two | ||
end | ||
|
||
expect(record).to eq [:one, :two] | ||
|
||
# Should have waited just over the timeout. | ||
expect(elapsed).to be < delay | ||
expect(elapsed).to be < timeout * 1.1 | ||
expect(elapsed).to be > timeout | ||
end | ||
|
||
context 'when waiting repeatedly' do | ||
context 'and barrier is lifted' do | ||
it 'waits up to barrier timeout' do | ||
record << :one | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :two | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :three | ||
# Small sleep to make the tests not flaky. | ||
sleep(timeout / 2) | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :four | ||
# Due to the added sleep, the fourth wait should always exceed | ||
# the delay, thus the fourth wait should happen while the | ||
# barrier is being lifted. | ||
expect(barrier.wait_once(timeout)).to eq :lift | ||
record << :five | ||
|
||
expect(record).to eq [:one, :two, :three, :four, :lift, :five] | ||
end | ||
end | ||
|
||
context 'and barrier is not lifted' do | ||
let(:instance_timeout) { delay / 2 } | ||
|
||
it 'waits up to barrier timeout' do | ||
record << :one | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :two | ||
# This call should time out, but the barrier timeout is | ||
# passed here and subsequent waits will be expired. | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :three | ||
expect(barrier.wait_once(timeout)).to eq :expired | ||
record << :four | ||
|
||
expect(record).to eq [:one, :two, :three, :four] | ||
end | ||
end | ||
end | ||
end | ||
|
||
context('longer than lift') do | ||
let(:timeout) { delay * 2 } | ||
|
||
it 'unblocks before timeout' do | ||
elapsed = Datadog::Core::Utils::Time.measure do | ||
record << :one | ||
expect(barrier.wait_once(timeout)).to eq :lift | ||
record << :two | ||
expect(barrier.wait_once(timeout)).to eq :pass | ||
record << :three | ||
end | ||
|
||
expect(record).to eq [:one, :lift, :two, :three] | ||
|
||
# We should have waited strictly more than the delay time. | ||
expect(elapsed).to be > delay | ||
# But, the only wait should have been for the delay to pass, | ||
# i.e. the elapsed time should be only slightly greater than the | ||
# delay time | ||
expect(elapsed).to be < delay * 1.1 | ||
# And, just to verify, this is below the timeout. | ||
expect(elapsed).to be < timeout | ||
end | ||
end | ||
|
||
context('and an instance timeout') do | ||
let(:instance_timeout) { delay * 2 } | ||
|
||
it 'prefers the local timeout' do | ||
record << :one | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :two | ||
expect(barrier.wait_once(timeout)).to eq :timeout | ||
record << :three | ||
|
||
expect(record).to eq [:one, :two, :three] | ||
end | ||
end | ||
end | ||
|
||
context('with an instance timeout') do | ||
let(:instance_timeout) { delay / 4 } | ||
|
||
it 'unblocks on timeout' do | ||
record << :one | ||
expect(barrier.wait_once).to eq :timeout | ||
record << :two | ||
expect(barrier.wait_once).to eq :expired | ||
record << :three | ||
|
||
expect(record).to eq [:one, :two, :three] | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm not sure this is the correct intended semantics.
From reading the code, my understanding is that the intention is that the
timeout
gets configured at component initialization time, but the actualtimeout
would only start counting later, when the worker gets lazily initialized.I guess cc @lloeki can help clarify :)