diff --git a/spec/std/crystal/fd_lock_spec.cr b/spec/std/crystal/fd_lock_spec.cr index f75a41f2d0d9..e53ba937c376 100644 --- a/spec/std/crystal/fd_lock_spec.cr +++ b/spec/std/crystal/fd_lock_spec.cr @@ -3,6 +3,90 @@ require "crystal/fd_lock" require "wait_group" describe Crystal::FdLock do + describe "#read" do + it "acquires read lock" do + lock = Crystal::FdLock.new + called = 0 + + lock.read { called += 1 } + lock.read { called += 1 } + + called.should eq(2) + end + + it "acquires exclusive lock" do + lock = Crystal::FdLock.new + increment = 0 + + WaitGroup.wait do |wg| + 10.times do + wg.spawn do + 100_000.times do |i| + lock.read do + increment += 1 + Fiber.yield if i % 1000 == 1 + end + end + end + end + end + + increment.should eq(1_000_000) + end + + it "raises when closed" do + lock = Crystal::FdLock.new + called = false + + lock.try_close? { } + expect_raises(IO::Error, "Closed") { lock.read { called = true; Fiber.yield } } + + called.should eq(false) + end + end + + describe "#write" do + it "acquires write lock" do + lock = Crystal::FdLock.new + called = 0 + + lock.write { called += 1 } + lock.write { called += 1 } + + called.should eq(2) + end + + it "acquires exclusive lock" do + lock = Crystal::FdLock.new + increment = 0 + + WaitGroup.wait do |wg| + 10.times do + wg.spawn do + 100_000.times do |i| + lock.write do + increment += 1 + Fiber.yield if i % 1000 == 1 + end + end + end + end + end + + increment.should eq(1_000_000) + end + + it "raises when closed" do + lock = Crystal::FdLock.new + called = false + + lock.try_close? { } + expect_raises(IO::Error, "Closed") { lock.read { called = true } } + + called.should eq(false) + end + end + describe "#reference" do it "acquires" do lock = Crystal::FdLock.new @@ -124,6 +208,68 @@ describe Crystal::FdLock do raise ex end end + + it "resumes waiters" do + lock = Crystal::FdLock.new + + ready = WaitGroup.new(8) + running = WaitGroup.new + exceptions = Channel(Exception).new(8) + + # acquire locks + lock.read do + lock.write do + # spawn concurrent fibers + 4.times do |i| + running.spawn do + ready.done + lock.read { } + rescue ex + exceptions.send(ex) + end + + running.spawn do + ready.done + lock.write { } + rescue ex + exceptions.send(ex) + end + end + + # wait for all the concurrent fibers to be trying to lock + ready.wait + end + end + + # close, then wait for the fibers to be resumed (and fail) + lock.try_close? { }.should eq(true) + running.wait + exceptions.close + + # fibers should have failed (unlikely: one may succeed to lock) + failed = 0 + while ex = exceptions.receive? + failed += 1 + ex.should be_a(IO::Error) + ex.message.should eq("Closed") + end + failed.should be > 0 + end + end + + it "locks read + write + shared reference" do + lock = Crystal::FdLock.new + called = 0 + + lock.read do + lock.write do + lock.reference do + called += 1 + end + end + end + + called.should eq(1) end it "#reset" do diff --git a/src/crystal/fd_lock.cr b/src/crystal/fd_lock.cr index 368bb07eda95..ce9d51a1ebab 100644 --- a/src/crystal/fd_lock.cr +++ b/src/crystal/fd_lock.cr @@ -1,26 +1,174 @@ # The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause, # Copyright Google): # https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go +# +# The internal details (spinlock, designated waker) of the locks are heavily +# influenced by the nsync library (LICENSE: Apache-2.0, Copyright Google): +# https://github.com/google/nsync # :nodoc: # # Tracks active references over a system file descriptor (fd) and serializes # reads and writes. # -# Every access to the fd that may affect its system state or system buffers must -# acquire a shared lock. +# Every read on the fd must lock read. +# Every write on the fd must lock write. +# Other operations (fcntl, setsockopt, ...) must acquire a shared reference. +# +# The read and write locks are exclusive but distinct: each can only be acquired +# once, but both can be acquire at the same time. Shared references can happen +# at any time. Both locks also acquire a shared reference while the lock is +# acquired. +# +# The fdlock can be closed at any time (at which point we can't lock for read, +# write or acquire a shared reference anymore), but the actual system close will +# wait until there are no more references left. This avoids potential races when +# a thread might try to read a fd that has been closed... and has been reused by +# the OS for example. +# +# NOTE: since only one attempt to read (or write) can go through, it avoids +# situations where multiple fibers are waiting, then the first fiber is resumed but +# doesn't consume/fill everything, and... won't resume the next fiber! The lock +# will always resume a waiting fiber (if any). # -# The fdlock can be closed at any time, but the actual system close will wait -# until there are no more references left. This avoids potential races when a -# thread might try to read a fd that has been closed and has been reused by the -# OS for example. +# Lock concepts +# +# Spinlock: slow-path for lock/unlock will spin until it acquires the spinlock +# bit to add/remove waiters; the CPU is relaxed between each attempt. +# +# Designated waker: set on unlock to report that a waiter has been scheduler and +# there's no need to wake another one. It's unset when a waiter acquires or +# fails to acquire and adds itself again as a waiter. This leads to an +# impressive performance boost when the lock is contended. struct Crystal::FdLock CLOSED = 1_u32 << 0 # the fdlock has been closed - REF = 1_u32 << 1 # the reference counter increment + RLOCK = 1_u32 << 1 # reader lock + RWAIT = 1_u32 << 2 # reader wait bit (at least one reader) + RSPIN = 1_u32 << 3 # reader spinlock (protects @readers) + RWAKER = 1_u32 << 4 # reader designated waker (a reader is being awoken) + WLOCK = 1_u32 << 5 # writer lock + WWAIT = 1_u32 << 6 # writer wait bit (at least one writer) + WSPIN = 1_u32 << 7 # writer spinlock (protects @writers) + WWAKER = 1_u32 << 8 # writer designated waker (a writer is being awoken) + REF = 1_u32 << 9 # the reference counter increment MASK = ~(REF - 1) # mask for the reference counter @m = Atomic(UInt32).new(0_u32) @closing : Fiber? + @readers = PointerLinkedList(Fiber::PointerLinkedListNode).new + @writers = PointerLinkedList(Fiber::PointerLinkedListNode).new + + # Locks for read and increments the references by one for the duration of the + # block. Raises if the fdlock is closed while trying to acquire the lock. + def read(& : -> F) : F forall F + m, success = @m.compare_and_set(0_u32, RLOCK + REF, :acquire, :relaxed) + lock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success + + begin + yield + ensure + m, success = @m.compare_and_set(RLOCK + REF, 0_u32, :release, :relaxed) + m = unlock_slow(RLOCK, RWAIT, RSPIN, RWAKER, pointerof(@readers)) unless success + handle_last_ref(m) + end + end + + # Locks for write and increments the references by one for the duration of the + # block. Raises if the fdlock is closed while trying to acquire the lock. + def write(& : -> F) : F forall F + m, success = @m.compare_and_set(0_u32, WLOCK + REF, :acquire, :relaxed) + lock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success + + begin + yield + ensure + m, success = @m.compare_and_set(WLOCK + REF, 0_u32, :release, :relaxed) + m = unlock_slow(WLOCK, WWAIT, WSPIN, WWAKER, pointerof(@writers)) unless success + handle_last_ref(m) + end + end + + @[NoInline] + private def lock_slow(xlock, xwait, xspin, xwaker, waiters) + waiter = Fiber::PointerLinkedListNode.new(Fiber.current) + attempts = 0 + clear = 0_u32 + + while true + m = @m.get(:relaxed) + + if (m & CLOSED) == CLOSED + # abort + raise IO::Error.new("Closed") + elsif (m & xlock) == 0_u32 + # acquire the lock + increment ref + m, success = @m.compare_and_set(m, ((m | xlock) + REF) & ~clear, :acquire, :relaxed) + return if success + elsif (m & xspin) == 0_u32 + # acquire spinlock + forward declare pending waiter + m, success = @m.compare_and_set(m, (m | xspin | xwait) & ~clear, :acquire, :relaxed) + if success + # new waiters are added to the tail, while woken waiters that failed + # to lock again are added to the head to give them some edge + if (clear & xwaker) == 0_u32 + waiters.value.push(pointerof(waiter)) + else + waiters.value.unshift(pointerof(waiter)) + end + + # release spinlock before suspending the fiber + @m.and(~xspin, :release) + + Fiber.suspend + + # the designated waker has woken: clear the flag + clear |= xwaker + end + end + + attempts = Thread.delay(attempts) + end + end + + @[NoInline] + private def unlock_slow(xlock, xwait, xspin, xwaker, waiters) + attempts = 0 + + while true + m = @m.get(:relaxed) + + if (m & CLOSED) == CLOSED + # decrement ref and abort + m = @m.sub(REF, :relaxed) + return m + elsif (m & xwait) == 0_u32 || (m & xwaker) != 0_u32 + # no waiter, or there is a designated waker (no need to wake another + # one): unlock & decrement ref + m, success = @m.compare_and_set(m, (m & ~xlock) - REF, :release, :relaxed) + return m if success + elsif (m & xspin) == 0_u32 + # there is a waiter and no designated waker: acquire spinlock + declare + # a designated waker + release lock & decrement ref early + m, success = @m.compare_and_set(m, ((m | xspin | xwaker) & ~xlock) - REF, :acquire_release, :relaxed) + if success + waiter = waiters.value.shift? + + # clear flags and release spinlock + clear = xspin + clear |= xwaker unless waiter # no designated waker + clear |= xwait if waiters.value.empty? # no more waiters + @m.and(~clear, :release) + + waiter.value.enqueue if waiter + + # return the m that decremented ref (for #handle_last_ref) + return m + end + end + + attempts = Thread.delay(attempts) + end + end # Borrows a reference for the duration of the block. Raises if the fdlock is # closed while trying to borrow. @@ -58,13 +206,15 @@ struct Crystal::FdLock end end - # Closes the fdlock. Blocks for as long as there are references. + # Closes the fdlock. Wakes waiting readers and writers. Blocks for as long as + # there are references. # # The *callback* block must cancel any external waiters (e.g. pending evloop # reads or writes). # - # Returns true if the fdlock has been closed: no fiber can acquire a reference - # anymore, the calling fiber fully owns the fd and can safely close it. + # Returns true if the fdlock has been closed: no fiber can lock for read, + # write or acquire a reference anymore, the calling fiber fully owns the fd + # and can safely close it. # # Returns false if the fdlock has already been closed: the calling fiber # doesn't own the fd and musn't close it, as there might still be active @@ -72,6 +222,9 @@ struct Crystal::FdLock def try_close?(&callback : ->) : Bool attempts = 0 + # close + increment ref + acquire both spinlocks so we own both @readers and + # @writers; parallel attempts to acquire a spinlock will fail, notice that + # the lock is closed, and abort while true m = @m.get(:relaxed) @@ -80,8 +233,7 @@ struct Crystal::FdLock return false end - # close + increment ref - m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed) + m, success = @m.compare_and_set(m, (m + REF) | CLOSED | RSPIN | WSPIN, :acquire, :relaxed) break if success attempts = Thread.delay(attempts) @@ -90,6 +242,11 @@ struct Crystal::FdLock # set the current fiber as the closing fiber (to be resumed by the last ref) @closing = Fiber.current + # resume waiters so they can fail (the fdlock is closed); this is safe + # because we acquired the spinlocks above: + @readers.consume_each(&.value.enqueue) + @writers.consume_each(&.value.enqueue) + # decrement the last ref m = @m.sub(REF, :release) diff --git a/src/crystal/system/unix/file_descriptor.cr b/src/crystal/system/unix/file_descriptor.cr index 9dec85effe02..aacc598b67a1 100644 --- a/src/crystal/system/unix/file_descriptor.cr +++ b/src/crystal/system/unix/file_descriptor.cr @@ -398,10 +398,10 @@ module Crystal::System::FileDescriptor end private def system_read(slice : Bytes) : Int32 - @fd_lock.reference { event_loop.read(self, slice) } + @fd_lock.read { event_loop.read(self, slice) } end private def system_write(slice : Bytes) : Int32 - @fd_lock.reference { event_loop.write(self, slice) } + @fd_lock.write { event_loop.write(self, slice) } end end diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 6ff2b4ea8dfc..0fc227ee204b 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -51,7 +51,7 @@ module Crystal::System::Socket end private def system_accept : {Handle, Bool}? - @fd_lock.reference { event_loop.accept(self) } + @fd_lock.read { event_loop.accept(self) } end private def system_close_read @@ -359,22 +359,22 @@ module Crystal::System::Socket {% end %} private def system_send_to(bytes : Bytes, addr : ::Socket::Address) - @fd_lock.reference { event_loop.send_to(self, bytes, addr) } + @fd_lock.write { event_loop.send_to(self, bytes, addr) } end private def system_receive_from(bytes : Bytes) : Tuple(Int32, ::Socket::Address) - @fd_lock.reference { event_loop.receive_from(self, bytes) } + @fd_lock.read { event_loop.receive_from(self, bytes) } end private def system_connect(addr, timeout = nil) - @fd_lock.reference { event_loop.connect(self, addr, timeout) } + @fd_lock.write { event_loop.connect(self, addr, timeout) } end private def system_read(slice : Bytes) : Int32 - @fd_lock.reference { event_loop.read(self, slice) } + @fd_lock.read { event_loop.read(self, slice) } end private def system_write(slice : Bytes) : Int32 - @fd_lock.reference { event_loop.write(self, slice) } + @fd_lock.write { event_loop.write(self, slice) } end end