Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions spec/std/crystal/fd_lock_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,90 @@ require "crystal/fd_lock"
require "wait_group"

describe Crystal::FdLock do
describe "#read" do
it "acquires read lock" do
lock = Crystal::FdLock.new
called = 0

lock.read { called += 1 }
lock.read { called += 1 }

called.should eq(2)
end

it "acquires exclusive lock" do
lock = Crystal::FdLock.new
increment = 0

WaitGroup.wait do |wg|
10.times do
wg.spawn do
100_000.times do |i|
lock.read do
increment += 1
Fiber.yield if i % 1000 == 1
end
end
end
end
end

increment.should eq(1_000_000)
end

it "raises when closed" do
lock = Crystal::FdLock.new
called = false

lock.try_close? { }
expect_raises(IO::Error, "Closed") { lock.read { called = true; Fiber.yield } }

called.should eq(false)
end
end

describe "#write" do
it "acquires write lock" do
lock = Crystal::FdLock.new
called = 0

lock.write { called += 1 }
lock.write { called += 1 }

called.should eq(2)
end

it "acquires exclusive lock" do
lock = Crystal::FdLock.new
increment = 0

WaitGroup.wait do |wg|
10.times do
wg.spawn do
100_000.times do |i|
lock.write do
increment += 1
Fiber.yield if i % 1000 == 1
end
end
end
end
end

increment.should eq(1_000_000)
end

it "raises when closed" do
lock = Crystal::FdLock.new
called = false

lock.try_close? { }
expect_raises(IO::Error, "Closed") { lock.read { called = true } }

called.should eq(false)
end
end

describe "#reference" do
it "acquires" do
lock = Crystal::FdLock.new
Expand Down Expand Up @@ -124,6 +208,68 @@ describe Crystal::FdLock do
raise ex
end
end

it "resumes waiters" do
lock = Crystal::FdLock.new

ready = WaitGroup.new(8)
running = WaitGroup.new
exceptions = Channel(Exception).new(8)

# acquire locks
lock.read do
lock.write do
# spawn concurrent fibers
4.times do |i|
running.spawn do
ready.done
lock.read { }
rescue ex
exceptions.send(ex)
end

running.spawn do
ready.done
lock.write { }
rescue ex
exceptions.send(ex)
end
end

# wait for all the concurrent fibers to be trying to lock
ready.wait
end
end

# close, then wait for the fibers to be resumed (and fail)
lock.try_close? { }.should eq(true)
running.wait
exceptions.close

# fibers should have failed (unlikely: one may succeed to lock)
failed = 0
while ex = exceptions.receive?
failed += 1
ex.should be_a(IO::Error)
ex.message.should eq("Closed")
end
failed.should be > 0
end
end

it "locks read + write + shared reference" do
lock = Crystal::FdLock.new
called = 0

lock.read do
lock.write do
lock.reference do
called += 1
end
end
end

called.should eq(1)
end

it "#reset" do
Expand Down
181 changes: 169 additions & 12 deletions src/crystal/fd_lock.cr
Original file line number Diff line number Diff line change
@@ -1,26 +1,174 @@
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
# Copyright Google):
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go
#
# The internal details (spinlock, designated waker) of the locks are heavily
# influenced by the nsync library (LICENSE: Apache-2.0, Copyright Google):
# https://github.com/google/nsync

# :nodoc:
#
# Tracks active references over a system file descriptor (fd) and serializes
# reads and writes.
#
# Every access to the fd that may affect its system state or system buffers must
# acquire a shared lock.
# Every read on the fd must lock read.
# Every write on the fd must lock write.
# Other operations (fcntl, setsockopt, ...) must acquire a shared reference.
#
# The read and write locks are exclusive but distinct: each can only be acquired
# once, but both can be acquire at the same time. Shared references can happen
# at any time. Both locks also acquire a shared reference while the lock is
# acquired.
#
# The fdlock can be closed at any time (at which point we can't lock for read,
# write or acquire a shared reference anymore), but the actual system close will
# wait until there are no more references left. This avoids potential races when
# a thread might try to read a fd that has been closed... and has been reused by
# the OS for example.
#
# NOTE: since only one attempt to read (or write) can go through, it avoids
# situations where multiple fibers are waiting, then the first fiber is resumed but
# doesn't consume/fill everything, and... won't resume the next fiber! The lock
# will always resume a waiting fiber (if any).
#
# The fdlock can be closed at any time, but the actual system close will wait
# until there are no more references left. This avoids potential races when a
# thread might try to read a fd that has been closed and has been reused by the
# OS for example.
# Lock concepts
#
# Spinlock: slow-path for lock/unlock will spin until it acquires the spinlock
# bit to add/remove waiters; the CPU is relaxed between each attempt.
#
# Designated waker: set on unlock to report that a waiter has been scheduler and
# there's no need to wake another one. It's unset when a waiter acquires or
# fails to acquire and adds itself again as a waiter. This leads to an
# impressive performance boost when the lock is contended.
struct Crystal::FdLock
CLOSED = 1_u32 << 0 # the fdlock has been closed
REF = 1_u32 << 1 # the reference counter increment
RLOCK = 1_u32 << 1 # reader lock
RWAIT = 1_u32 << 2 # reader wait bit (at least one reader)
RSPIN = 1_u32 << 3 # reader spinlock (protects @readers)
RWAKER = 1_u32 << 4 # reader designated waker (a reader is being awoken)
WLOCK = 1_u32 << 5 # writer lock
WWAIT = 1_u32 << 6 # writer wait bit (at least one writer)
WSPIN = 1_u32 << 7 # writer spinlock (protects @writers)
WWAKER = 1_u32 << 8 # writer designated waker (a writer is being awoken)
REF = 1_u32 << 9 # the reference counter increment
MASK = ~(REF - 1) # mask for the reference counter

@m = Atomic(UInt32).new(0_u32)
@closing : Fiber?
@readers = PointerLinkedList(Fiber::PointerLinkedListNode).new
@writers = PointerLinkedList(Fiber::PointerLinkedListNode).new

# Locks for read and increments the references by one for the duration of the
# block. Raises if the fdlock is closed while trying to acquire the lock.
def read(& : -> F) : F forall F
m, success = @m.compare_and_set(0_u32, RLOCK + REF, :acquire, :relaxed)
lock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success

begin
yield
ensure
m, success = @m.compare_and_set(RLOCK + REF, 0_u32, :release, :relaxed)
m = unlock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success
handle_last_ref(m)
end
end

# Locks for write and increments the references by one for the duration of the
# block. Raises if the fdlock is closed while trying to acquire the lock.
def write(& : -> F) : F forall F
m, success = @m.compare_and_set(0_u32, WLOCK + REF, :acquire, :relaxed)
lock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success

begin
yield
ensure
m, success = @m.compare_and_set(WLOCK + REF, 0_u32, :release, :relaxed)
m = unlock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success
handle_last_ref(m)
end
end

@[NoInline]
private def lock_slow(xlock, xwait, xspin, xwaker, waiters)
waiter = Fiber::PointerLinkedListNode.new(Fiber.current)
attempts = 0
clear = 0_u32

while true
m = @m.get(:relaxed)

if (m & CLOSED) == CLOSED
# abort
raise IO::Error.new("Closed")
elsif (m & xlock) == 0_u32
# acquire the lock + increment ref
m, success = @m.compare_and_set(m, ((m | xlock) + REF) & ~clear, :acquire, :relaxed)
return if success
elsif (m & xspin) == 0_u32
# acquire spinlock + forward declare pending waiter
m, success = @m.compare_and_set(m, (m | xspin | xwait) & ~clear, :acquire, :relaxed)
if success
# new waiters are added to the tail, while woken waiters that failed
# to lock again are added to the head to give them some edge
if (clear & xwaker) == 0_u32
waiters.value.push(pointerof(waiter))
else
waiters.value.unshift(pointerof(waiter))
end

# release spinlock before suspending the fiber
@m.and(~xspin, :release)

Fiber.suspend

# the designated waker has woken: clear the flag
clear |= xwaker
end
end

attempts = Thread.delay(attempts)
end
end

@[NoInline]
private def unlock_slow(xlock, xwait, xspin, xwaker, waiters)
attempts = 0

while true
m = @m.get(:relaxed)

if (m & CLOSED) == CLOSED
# decrement ref and abort
m = @m.sub(REF, :relaxed)
return m
elsif (m & xwait) == 0_u32 || (m & xwaker) != 0_u32
# no waiter, or there is a designated waker (no need to wake another
# one): unlock & decrement ref
m, success = @m.compare_and_set(m, (m & ~xlock) - REF, :release, :relaxed)
return m if success
elsif (m & xspin) == 0_u32
# there is a waiter and no designated waker: acquire spinlock + declare
# a designated waker + release lock & decrement ref early
m, success = @m.compare_and_set(m, ((m | xspin | xwaker) & ~xlock) - REF, :acquire_release, :relaxed)
if success
waiter = waiters.value.shift?

# clear flags and release spinlock
clear = xspin
clear |= xwaker unless waiter # no designated waker
clear |= xwait if waiters.value.empty? # no more waiters
@m.and(~clear, :release)

waiter.value.enqueue if waiter

# return the m that decremented ref (for #handle_last_ref)
return m
end
end

attempts = Thread.delay(attempts)
end
end

# Borrows a reference for the duration of the block. Raises if the fdlock is
# closed while trying to borrow.
Expand Down Expand Up @@ -58,20 +206,25 @@ struct Crystal::FdLock
end
end

# Closes the fdlock. Blocks for as long as there are references.
# Closes the fdlock. Wakes waiting readers and writers. Blocks for as long as
# there are references.
#
# The *callback* block must cancel any external waiters (e.g. pending evloop
# reads or writes).
#
# Returns true if the fdlock has been closed: no fiber can acquire a reference
# anymore, the calling fiber fully owns the fd and can safely close it.
# Returns true if the fdlock has been closed: no fiber can lock for read,
# write or acquire a reference anymore, the calling fiber fully owns the fd
# and can safely close it.
#
# Returns false if the fdlock has already been closed: the calling fiber
# doesn't own the fd and musn't close it, as there might still be active
# references and another fiber will close anyway.
def try_close?(&callback : ->) : Bool
attempts = 0

# close + increment ref + acquire both spinlocks so we own both @readers and
# @writers; parallel attempts to acquire a spinlock will fail, notice that
# the lock is closed, and abort
while true
m = @m.get(:relaxed)

Expand All @@ -80,8 +233,7 @@ struct Crystal::FdLock
return false
end

# close + increment ref
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
m, success = @m.compare_and_set(m, (m + REF) | CLOSED | RSPIN | WSPIN, :acquire, :relaxed)
break if success

attempts = Thread.delay(attempts)
Expand All @@ -90,6 +242,11 @@ struct Crystal::FdLock
# set the current fiber as the closing fiber (to be resumed by the last ref)
@closing = Fiber.current

# resume waiters so they can fail (the fdlock is closed); this is safe
# because we acquired the spinlocks above:
@readers.consume_each(&.value.enqueue)
@writers.consume_each(&.value.enqueue)

# decrement the last ref
m = @m.sub(REF, :release)

Expand Down
Loading