From 41b0e49ef4a013ef1e3b5b494eb3877233eb4bd7 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 16:33:35 +0100 Subject: [PATCH 01/12] Import Sync::MU from the sync shard (undocumented) This is the foundation type for building the actual Mutex and RWLock types that add some runtime checks (who owns the lock), reentrancy. --- src/sync/mu.cr | 380 +++++++++++++++++++++++++++++++++++++++++++++ src/sync/waiter.cr | 47 ++++++ 2 files changed, 427 insertions(+) create mode 100644 src/sync/mu.cr create mode 100644 src/sync/waiter.cr diff --git a/src/sync/mu.cr b/src/sync/mu.cr new file mode 100644 index 000000000000..a2db2f7d9c56 --- /dev/null +++ b/src/sync/mu.cr @@ -0,0 +1,380 @@ +# Crystal adaptation of "mu" from the "nsync" library with adaptations by +# Justine Alexandra Roberts Tunney in the "cosmopolitan" C library. +# +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# References: +# - +# - + +require "./waiter" + +module Sync + # :nodoc: + struct MU + UNLOCKED = 0_u32 + WLOCK = 1_u32 + SPINLOCK = 2_u32 + WAITING = 4_u32 + WRITER_WAITING = 8_u32 + LONG_WAIT = 16_u32 + DESIG_WAKER = 32_u32 + RLOCK = 256_u32 + + RMASK = ~(RLOCK - 1_u32) + ANY_LOCK = WLOCK | RMASK + + LONG_WAIT_THRESHOLD = 30 + + def initialize + @word = Atomic(UInt32).new(UNLOCKED) + @waiters = Crystal::PointerLinkedList(Waiter).new + end + + def synchronize(&) : Nil + lock + begin + yield + ensure + unlock + end + end + + def try_lock? : Bool + # uncontended + word, success = @word.compare_and_set(UNLOCKED, WLOCK, :acquire, :relaxed) + return true if success + + if (word & (ANY_LOCK | LONG_WAIT)) == 0 + # unlocked (no writer, no readers), no long waiter, try quick lock + _, success = @word.compare_and_set(word, word + WLOCK, :acquire, :relaxed) + success + else + false + end + end + + def try_rlock? : Bool + # uncontended + word, success = @word.compare_and_set(UNLOCKED, RLOCK, :release, :relaxed) + return true if success + + if (word & (WLOCK | WRITER_WAITING | LONG_WAIT)) == 0 + # no locked writer, no writer waiting, no long waiter, try quick lock + _, success = @word.compare_and_set(word, word + RLOCK, :acquire, :relaxed) + success + else + false + end + end + + def lock : Nil + lock_slow unless try_lock? + end + + def rlock : Nil + rlock_slow unless try_rlock? + end + + def lock_slow + waiter = Waiter.new(:writer) + + lock_slow_impl(pointerof(waiter), + zero_to_acquire: ANY_LOCK, + add_on_acquire: WLOCK, + set_on_waiting: WRITER_WAITING, + clear_on_acquire: WRITER_WAITING) + end + + def rlock_slow + waiter = Waiter.new(:reader) + + lock_slow_impl(pointerof(waiter), + zero_to_acquire: WLOCK | WRITER_WAITING, + add_on_acquire: RLOCK) + end + + protected def lock_slow(waiter : Pointer(Waiter), clear : UInt32) + if waiter.value.writer? + zero_to_acquire = ANY_LOCK + add_on_acquire = WLOCK + set_on_waiting = WRITER_WAITING + clear_on_acquire = WRITER_WAITING + else + zero_to_acquire = WLOCK | WRITER_WAITING + add_on_acquire = RLOCK + set_on_waiting = 0_u32 + clear_on_acquire = 0_u32 + end + lock_slow_impl(waiter, zero_to_acquire, add_on_acquire, set_on_waiting, clear_on_acquire, clear) + end + + private def lock_slow_impl(waiter, zero_to_acquire, add_on_acquire, set_on_waiting = 0_u32, clear_on_acquire = 0_u32, clear = 0_u32) : Nil + long_wait = 0_u32 + zero_to_acquire |= LONG_WAIT + set_on_waiting |= WAITING + + attempts = 0 + wait_count = 0 + + while true + word = @word.get(:relaxed) + + if (word & zero_to_acquire) == 0 + # unlocked, no long waiter, try to lock + word, success = @word.compare_and_set(word, (word + add_on_acquire) & ~(long_wait | clear | clear_on_acquire), :acquire, :relaxed) + return if success + elsif (word & SPINLOCK) == 0 + # locked by another fiber or there is a long waiter, spinlock is + # available, try to acquire spinlock + _, success = @word.compare_and_set(word, (word | SPINLOCK | set_on_waiting | long_wait) & ~clear, :acquire, :relaxed) + if success + waiter.value.waiting! + + if wait_count == 0 + # first wait goes to the tail + @waiters.push(waiter) + else + # subsequent ones go to the head + @waiters.unshift(waiter) + end + release_spinlock + + # wait... + waiter.value.wait + # ...resumed + + attempts = 0 + wait_count += 1 + + if wait_count == LONG_WAIT_THRESHOLD + long_wait = LONG_WAIT + end + + # woken fiber doesn't care about long wait or a writer waiting, and + # must clear the designated waker flag + zero_to_acquire &= ~(LONG_WAIT | WRITER_WAITING) + clear = DESIG_WAKER + end + end + + # yield the thread, not the fiber, because the above CAS are fighting + # against fibers running in parallel threads, trying to (spin)lock / + # unlock. + attempts = Thread.delay(attempts) + end + end + + def unlock : Nil + # uncontended + word, success = @word.compare_and_set(WLOCK, UNLOCKED, :acquire, :relaxed) + return true if success + + # sanity check + if (word & WLOCK) == 0 + raise RuntimeError.new("Can't unlock Sync::MU that isn't held") + end + + if (word & WAITING) == 0 && (word & DESIG_WAKER) != 0 + # no waiters, or there is a designated waker already (no need to wake + # another one), try quick unlock + _, success = @word.compare_and_set(word, word &- WLOCK, :release, :relaxed) + return if success + end + + # must try to wakeup a waiter + unlock_slow + end + + def runlock : Nil + # uncontended + word, success = @word.compare_and_set(RLOCK, UNLOCKED, :release, :relaxed) + return if success + + # sanity check + if (word & RMASK) == 0 + raise RuntimeError.new("Can't runlock Sync::MU that isn't held") + end + + if (word & WAITING) == 0 && (word & DESIG_WAKER) != 0 && (word & RMASK) > RLOCK + # no waiters, there is a designated waker already (no need to wake + # another one), and there are still readers, try quick unlock + _, success = @word.compare_and_set(word, word &- RLOCK, :release, :relaxed) + return if success + end + + # must try to wakeup a waiter + runlock_slow + end + + def unlock_slow : Nil + unlock_slow_impl(sub_on_release: WLOCK) + end + + def runlock_slow : Nil + unlock_slow_impl(sub_on_release: RLOCK) + end + + private def unlock_slow_impl(sub_on_release) : Nil + attempts = 0 + + while true + word = @word.get(:relaxed) + + if (word & WAITING) == 0 || (word & DESIG_WAKER) != 0 || (word & RMASK) > RLOCK + # no waiters, there is a designated waker (no need to wake another + # one), or there are still readers, try release lock + word, success = @word.compare_and_set(word, word - sub_on_release, :release, :relaxed) + return if success + elsif (word & SPINLOCK) == 0 + # there might be a waiter, and no designated waker, try to acquire + # spinlock, and release the lock (early) + _, success = @word.compare_and_set(word, (word | SPINLOCK | DESIG_WAKER) &- sub_on_release, :acquire_release, :relaxed) + if success + # spinlock is held, resume a single writer, or resume all readers + wake = Crystal::PointerLinkedList(Waiter).new + writer_waiting = 0_u32 + + if first_waiter = @waiters.shift? + wake.push(first_waiter) + + if first_waiter.value.reader? + @waiters.each do |waiter| + if waiter.value.reader? + @waiters.delete(waiter) + wake.push(waiter) + else + # found a writer, prevent new readers from locking + writer_waiting = WRITER_WAITING + end + end + end + end + + # update flags + clear = 0_u32 + clear |= DESIG_WAKER if wake.empty? # nothing to wake => no designated waker + clear |= WAITING if @waiters.empty? # no more waiters => nothing waiting + + release_spinlock(set: writer_waiting, clear: clear) + + wake.consume_each do |waiter| + waiter.value.wake + end + + return + end + end + + attempts = Thread.delay(attempts) + end + end + + def held? : Bool + word = @word.get(:relaxed) + (word & WLOCK) != 0 + end + + def rheld? : Bool + word = @word.get(:relaxed) + (word & RMASK) != 0 + end + + private def release_spinlock(set = 0_u32, clear = 0_u32) + word = @word.get(:relaxed) + + while true + word, success = @word.compare_and_set(word, (word | set) & ~(SPINLOCK | clear), :release, :relaxed) + return if success + end + end + + protected def try_transfer(wake : Pointer(Crystal::PointerLinkedList(Waiter)), first_waiter : Pointer(Waiter), all_readers : Bool) : Nil + first_is_writer = first_waiter.value.writer? + next_waiter = wake.value.next?(first_waiter) + zero_to_acquire = first_is_writer ? ANY_LOCK : WLOCK | WRITER_WAITING + + # there's no pointerof(self), so we make do: + mu = first_waiter.value.cv_mu + + old_word = @word.get(:relaxed) + first_cant_acquire = (old_word & zero_to_acquire) != 0 + + # We will transfer elements of *wake* to @waiters if all of: + # - some thread holds the lock, and + # - the spinlock is not held, and + # - mu cannot be acquired in the mode of the first waiter, or there's more + # than one thread on wake and not all are readers, and + # - we acquire the spinlock on the first try. + # + # The requirement that some thread holds the lock ensures that at least + # one of the transferred waiters will be woken. + if ((old_word & ANY_LOCK) != 0 && + (old_word & SPINLOCK) == 0 && + (first_cant_acquire || (!next_waiter.null? && !all_readers))) + # acquire the spinlock + mark mu as having waiters + _, success = @word.compare_and_set(old_word, old_word | SPINLOCK | WAITING, :acquire, :relaxed) + return unless success + + set_on_release = 0_u32 + transferred_a_writer = false + woke_a_reader = false + + if first_cant_acquire + transfer(first_waiter, from: wake) + transferred_a_writer = first_is_writer + else + woke_a_reader = !first_is_writer + end + + wake.value.each do |waiter| + is_writer = waiter.value.writer? + + # we transfer this waiter if any of: + # - the first waiter can't acquire mu, + # - the first waiter is a writer, or + # - this element is a writer + if waiter.value.cv_mu != mu + # edge case: waiter doesn't wait for this mu, wake it + elsif first_cant_acquire || first_is_writer || is_writer + transfer(waiter, from: wake) + transferred_a_writer ||= is_writer + else + woke_a_reader ||= !is_writer + end + end + + # claim a waiting writer if we transferred one, except if we woke + # readers, in which case we want those readers to be able to acquire + # immediately + if transferred_a_writer && !woke_a_reader + set_on_release |= WRITER_WAITING + end + + # release spinlock (WAITING has already been set on acquire) + release_spinlock(set_on_release) + end + end + + private def transfer(waiter, from) + from.value.delete(waiter) + @waiters.push(waiter) + + # no need to set waiting (it's already true) but we must tell CV#wait + # that the waiter has been transferred and is no longer a CV waiter + waiter.value.cv_mu = Pointer(MU).null + end + end +end diff --git a/src/sync/waiter.cr b/src/sync/waiter.cr new file mode 100644 index 000000000000..9a17c3b5b1db --- /dev/null +++ b/src/sync/waiter.cr @@ -0,0 +1,47 @@ +require "crystal/pointer_linked_list" + +module Sync + # :nodoc: + struct Waiter + enum Type + Reader + Writer + end + + include Crystal::PointerLinkedList::Node + + def initialize(@type : Type) + # protects against spurious wakeups (invalid manual fiber enqueues) that + # could lead to insert a waiter in the list a second time (oops) or keep + # the waiter in the list while the caller returned + @waiting = Atomic(Bool).new(true) + @fiber = Fiber.current + end + + def reader? : Bool + @type.reader? + end + + def writer? : Bool + @type.writer? + end + + def waiting! : Nil + @waiting.set(true, :relaxed) + end + + def wait : Nil + # we could avoid suspending the fiber if @waiting is already true but + # #wake ALWAYS enqueues the fiber, so #wait MUST suspend + while true + Fiber.suspend + break unless @waiting.get(:relaxed) + end + end + + def wake : Nil + @waiting.set(false, :relaxed) + @fiber.enqueue + end + end +end From 16873d9820eebf380a107d78e1d0b3d170f8c181 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 18:09:36 +0100 Subject: [PATCH 02/12] Add Sync::Type, Sync::Error and spec helpers --- spec/std/sync/spec_helper.cr | 39 ++++++++++++++++++++++++++++++++++++ src/sync/errors.cr | 10 +++++++++ src/sync/type.cr | 17 ++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 spec/std/sync/spec_helper.cr create mode 100644 src/sync/errors.cr create mode 100644 src/sync/type.cr diff --git a/spec/std/sync/spec_helper.cr b/spec/std/sync/spec_helper.cr new file mode 100644 index 000000000000..a7a6c420acb3 --- /dev/null +++ b/spec/std/sync/spec_helper.cr @@ -0,0 +1,39 @@ +require "../spec_helper" +require "wait_group" + +module Sync + def self.eventually(timeout : Time::Span = 1.second, &) + start = Time.monotonic + + loop do + Fiber.yield + + begin + yield + rescue ex + raise ex if (Time.monotonic - start) > timeout + else + break + end + end + end + + def self.async(&block) : Nil + done = false + exception = nil + + spawn do + block.call + rescue ex + exception = ex + ensure + done = true + end + + eventually { done.should be_true, "Expected async fiber to have finished" } + + if ex = exception + raise ex + end + end +end diff --git a/src/sync/errors.cr b/src/sync/errors.cr new file mode 100644 index 000000000000..beb51f76d580 --- /dev/null +++ b/src/sync/errors.cr @@ -0,0 +1,10 @@ +module Sync + # Raised when a sync check fails. For example when trying to unlock an + # unlocked mutex. See `#message` for details. + class Error < Exception + # Raised when a lock would result in a deadlock. For example when trying to + # re-lock a checked mutex. + class Deadlock < Error + end + end +end diff --git a/src/sync/type.cr b/src/sync/type.cr new file mode 100644 index 000000000000..588e1d821927 --- /dev/null +++ b/src/sync/type.cr @@ -0,0 +1,17 @@ +module Sync + enum Type + # The lock doesn't do any checks. Trying to relock will cause a deadlock, + # unlocking from any fiber is undefined behavior. + Unchecked + + # The lock checks whether the current fiber owns the lock. Trying to + # relock will raise a `Error::Deadlock` exception, unlocking when unlocked + # or while another fiber holds the lock will raise an `Error`. + Checked + + # Same as `Checked` with the difference that the lock allows the same + # fiber to re-lock as many times as needed, then must be unlocked as many + # times as it was re-locked. + Reentrant + end +end From 2be18372c9fc52a6591a299492bedde995a3ff57 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 18:09:55 +0100 Subject: [PATCH 03/12] Import Sync::Mutex from the sync shard --- spec/std/sync/mutex_spec.cr | 129 ++++++++++++++++++++++++++++++++++++ src/sync/mutex.cr | 80 ++++++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 spec/std/sync/mutex_spec.cr create mode 100644 src/sync/mutex.cr diff --git a/spec/std/sync/mutex_spec.cr b/spec/std/sync/mutex_spec.cr new file mode 100644 index 000000000000..21d82c7b62bd --- /dev/null +++ b/spec/std/sync/mutex_spec.cr @@ -0,0 +1,129 @@ +require "./spec_helper" +require "sync/mutex" + +describe Sync::Mutex do + {% for type in %i[checked unchecked reentrant] %} + describe {{type}} do + it "locks and unlocks" do + state = Atomic.new(0) + m = Sync::Mutex.new({{type}}) + m.lock + + ::spawn do + state.set(1) + m.lock + state.set(2) + end + + Sync.eventually { state.get.should eq(1) } + m.unlock + Sync.eventually { state.get.should eq(2) } + end + + {% unless type == :unchecked %} + it "unlock raises when not locked" do + m = Sync::Mutex.new({{type}}) + expect_raises(Sync::Error) { m.unlock } + end + + it "unlock raises when another fiber tries to unlock" do + m = Sync::Mutex.new(:reentrant) + m.lock + + Sync.async do + expect_raises(Sync::Error) { m.unlock } + end + end + {% end %} + + it "synchronizes" do + m = Sync::Mutex.new({{type}}) + counter = 0 + + IO.pipe do |r, w| + consumer = WaitGroup.new + publishers = WaitGroup.new + + # no races when writing to pipe (concurrency) + consumer.spawn do + c = 0 + while line = r.gets + line.to_i?.should eq(c += 1) + end + end + + # no races when incrementing counter (parallelism) + 100.times do |i| + publishers.spawn do + 500.times do + m.synchronize do + w.puts (counter += 1).to_s + end + end + end + end + + publishers.wait + w.close + counter.should eq(100 * 500) + + consumer.wait + end + end + end + {% end %} + + describe "unchecked" do + it "hangs on deadlock" do + m = Sync::Mutex.new(:unchecked) + done = started = locked = false + + fiber = ::spawn do + started = true + + m.lock + locked = true + + m.lock # deadlock + raise "ERROR: unreachable" unless done + end + + Sync.eventually { started.should be_true } + Sync.eventually { locked.should be_true } + sleep 10.milliseconds + + # unlock the fiber (cleanup) + done = true + m.unlock + end + + it "unlocks from other fiber" do + m = Sync::Mutex.new(:unchecked) + m.lock + Sync.async { m.unlock } + end + end + + describe "checked" do + it "raises on deadlock" do + m = Sync::Mutex.new(:checked) + m.lock + expect_raises(Sync::Error::Deadlock) { m.lock } + end + end + + describe "reentrant" do + it "re-locks" do + m = Sync::Mutex.new(:reentrant) + m.lock + m.lock # nothing raised + end + + it "unlocks as many times as it locked" do + m = Sync::Mutex.new(:reentrant) + 100.times { m.lock } + 100.times { m.unlock } + expect_raises(Sync::Error) { m.unlock } + end + end +end diff --git a/src/sync/mutex.cr b/src/sync/mutex.cr new file mode 100644 index 000000000000..34782abe04cd --- /dev/null +++ b/src/sync/mutex.cr @@ -0,0 +1,80 @@ +require "./mu" +require "./type" +require "./errors" + +module Sync + # A mutual exclusion lock to protect critical sections. + # + # A single fiber can acquire the lock at a time. No other fiber can acquire + # the lock while a fiber holds it. + # + # This lock can for example be used to protect the access to some resources, + # with the guarantee that only one section of code can ever read, write or + # mutate said resources. + class Mutex + def initialize(@type : Type = :checked) + @counter = 0 + @mu = MU.new + end + + # Acquires the exclusive lock for the duration of the block. The lock will + # be released automatically before returning, or if the block raises an + # exception. + def synchronize(& : -> U) : U forall U + lock + begin + yield + ensure + unlock + end + end + + # Acquires the exclusive lock. + def lock : Nil + unless @mu.try_lock? + unless @type.unchecked? + if @locked_by == Fiber.current + raise Error::Deadlock.new unless @type.reentrant? + @counter += 1 + return + end + end + @mu.lock_slow + end + + unless @type.unchecked? + @locked_by = Fiber.current + @counter = 1 if @type.reentrant? + end + end + + # Releases the exclusive lock. + def unlock : Nil + unless @type.unchecked? + unless owns_lock? + message = + if @locked_by + "Can't unlock Sync::Mutex locked by another fiber" + else + "Can't unlock Sync::Mutex that isn't locked" + end + raise Error.new(message) + end + if @type.reentrant? + return unless (@counter -= 1) == 0 + end + @locked_by = nil + end + @mu.unlock + end + + protected def owns_lock? : Bool + @locked_by == Fiber.current + end + + # :nodoc: + def dup + {% raise "Can't dup {{@type}}" %} + end + end +end From 411f2229c63d62bde23560c1961eff3182f58ca1 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 18:10:16 +0100 Subject: [PATCH 04/12] Import Sync::RWLock from the sync shard --- spec/std/sync/rw_lock_spec.cr | 149 ++++++++++++++++++++++++++++++++++ src/sync/rw_lock.cr | 132 ++++++++++++++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100644 spec/std/sync/rw_lock_spec.cr create mode 100644 src/sync/rw_lock.cr diff --git a/spec/std/sync/rw_lock_spec.cr b/spec/std/sync/rw_lock_spec.cr new file mode 100644 index 000000000000..db459f3fe726 --- /dev/null +++ b/spec/std/sync/rw_lock_spec.cr @@ -0,0 +1,149 @@ +require "./spec_helper" +require "sync/rw_lock" + +describe Sync::RWLock do + it "lock write waits for all read locks to be unlocked" do + done = false + + lock = Sync::RWLock.new + lock.lock_read + lock.lock_read + + spawn do + lock.lock_write + done = true + end + + sleep(10.milliseconds) + done.should be_false + + lock.unlock_read + sleep(10.milliseconds) + done.should be_false + + lock.unlock_read + sleep(10.milliseconds) + done.should be_true + end + + it "can't lock read while locked for write" do + done = false + + lock = Sync::RWLock.new + lock.lock_write + + spawn do + lock.lock_read + done = true + end + + sleep(10.milliseconds) + done.should be_false + + lock.unlock_write + sleep(10.milliseconds) + done.should be_true + end + + it "synchronizes locks" do + lock = Sync::RWLock.new + wg = WaitGroup.new + + ary = [] of Int32 + counter = Atomic(Int64).new(0) + + # readers can run concurrently, but are mutually exclusive to writers (the + # array can be safely read from): + + 10.times do + spawn(name: "reader") do + 100.times do + lock.read do + ary.each { counter.add(1) } + end + Fiber.yield + end + end + end + + # writers are mutually exclusive: they can safely mutate the array + + 5.times do + wg.spawn(name: "writer:increment") do + 100.times do + lock.write { 100.times { ary << ary.size } } + Fiber.yield + end + end + end + + 4.times do + wg.spawn(name: "writer:decrement") do + 100.times do + lock.write { 100.times { ary.pop? } } + Fiber.yield + end + end + end + + wg.wait + + ary.should eq((0..(ary.size - 1)).to_a) + counter.lazy_get.should be > 0 + end + + describe "unchecked" do + it "deadlocks on re-lock write" do + done = started = locked = false + lock = Sync::RWLock.new(:unchecked) + + fiber = spawn do + started = true + lock.lock_write + locked = true + lock.lock_write # deadlock + raise "ERROR: unreachable" unless done + end + + Sync.eventually { started.should be_true } + Sync.eventually { locked.should be_true } + sleep 10.milliseconds + + # unlock the fiber (cleanup) + done = true + lock.unlock_write + end + + it "unlocks write despite not being locked" do + lock = Sync::RWLock.new(:unchecked) + expect_raises(RuntimeError) { lock.unlock_write } # MU has a safety check + end + + it "unlocks write from another fiber" do + lock = Sync::RWLock.new(:unchecked) + lock.lock_write + Sync.async { lock.unlock_write } # nothing raised + end + end + + describe "checked" do + it "raises on re-kock write" do + lock = Sync::RWLock.new(:checked) + lock.lock_write + expect_raises(Sync::Error::Deadlock) { lock.lock_write } + end + + it "raises on unlock_write when not locked" do + lock = Sync::RWLock.new(:checked) + expect_raises(Sync::Error) { lock.unlock_write } + end + + it "raises on unlock_write from another fiber" do + lock = Sync::RWLock.new(:checked) + lock.lock_write + Sync.async do + expect_raises(Sync::Error) { lock.unlock_write } + end + end + end +end diff --git a/src/sync/rw_lock.cr b/src/sync/rw_lock.cr new file mode 100644 index 000000000000..cb575b8e27ea --- /dev/null +++ b/src/sync/rw_lock.cr @@ -0,0 +1,132 @@ +require "./mu" +require "./type" +require "./errors" + +module Sync + # A multiple readers and exclusive writer lock to protect critical sections. + # + # Multiple fibers can acquire the shared lock (read) to allow some critical + # sections to run concurrently. However a single fiber can acquire the + # exclusive lock at a time to protect a single critical section to ever run in + # parallel. When the lock has been acquired in exclusive mode, no other fiber + # can lock it, be it in shared or exclusive mode. + # + # For example, the shared mode can allow to read one or many resources, albeit + # the resources must be safe to be accessed in such manner, while the + # exclusive mode allows to safely replace or mutate the resources with the + # guarantee that nothing else is accessing said resources. + # + # The implementation doesn't favor readers or writers in particular. + class RWLock + def initialize(@type : Type = :checked) + @counter = 0 + @mu = MU.new + end + + # Acquires the shared (read) lock for the duration of the block. + # + # Multiple fibers can acquire the shared (read) lock at the same time. The + # block will never run concurrently to an exclusive (write) lock. + def read(& : -> U) : U forall U + lock_read + begin + yield + ensure + unlock_read + end + end + + # Tries to acquire the shared (read) lock without blocking. Returns true + # when acquired, otherwise returns false immediately. + def try_lock_read? : Bool + @mu.try_rlock? + end + + # Acquires the shared (read) lock. + # + # The shared lock is always reentrant, multiple fibers can lock it multiple + # times each, and never checked. Blocks the calling fiber while the + # exclusive (write) lock is held. + def lock_read : Nil + @mu.rlock + end + + # Releases the shared (read) lock. + # + # Every fiber that locked must unlock to actually release the reader lock + # (so a writer can lock). If a fiber locked multiple times (reentrant + # behavior) then it must unlock that many times. + def unlock_read : Nil + @mu.runlock + end + + # Acquires the exclusive (write) lock for the duration of the block. + # + # Only one fiber can acquire the exclusive (write) lock at the same time. + # The block will never run concurrently to a shared (read) lock or another + # exclusive (write) lock. + def write(& : -> U) : U forall U + lock_write + begin + yield + ensure + unlock_write + end + end + + # Tries to acquire the exclusive (write) lock without blocking. Returns true + # when acquired, otherwise returns false immediately. + def try_lock_write? : Bool + @mu.try_lock? + end + + # Acquires the exclusive (write) lock. Blocks the calling fiber while the + # shared or exclusive (write) lock is held. + def lock_write : Nil + unless @mu.try_lock? + unless @type.unchecked? + if @locked_by == Fiber.current + raise Error::Deadlock.new unless @type.reentrant? + @counter += 1 + return + end + end + @mu.lock_slow + end + + unless @type.unchecked? + @locked_by = Fiber.current + @counter = 1 if @type.reentrant? + end + end + + # Releases the exclusive (write) lock. + def unlock_write : Nil + unless @type.unchecked? + unless owns_lock? + message = + if @locked_by + "Can't unlock Sync::RWLock locked by another fiber" + else + "Can't unlock Sync::RWLock that isn't locked" + end + raise Error.new(message) + end + if @type.reentrant? + return unless (@counter -= 1) == 0 + end + @locked_by = nil + end + @mu.unlock + end + + protected def owns_lock? : Bool + @locked_by == Fiber.current + end + + # :nodoc: + def dup + {% raise "Can't dup {{@type}}" %} + end + end +end From 6b4b29f30a217af1f93b4f63bac55a79e2dab015 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 19:03:16 +0100 Subject: [PATCH 05/12] fixup! Import Sync::MU from the sync shard (undocumented) --- src/sync/mu.cr | 76 -------------------------------------------------- 1 file changed, 76 deletions(-) diff --git a/src/sync/mu.cr b/src/sync/mu.cr index a2db2f7d9c56..27d4f8fced90 100644 --- a/src/sync/mu.cr +++ b/src/sync/mu.cr @@ -300,81 +300,5 @@ module Sync return if success end end - - protected def try_transfer(wake : Pointer(Crystal::PointerLinkedList(Waiter)), first_waiter : Pointer(Waiter), all_readers : Bool) : Nil - first_is_writer = first_waiter.value.writer? - next_waiter = wake.value.next?(first_waiter) - zero_to_acquire = first_is_writer ? ANY_LOCK : WLOCK | WRITER_WAITING - - # there's no pointerof(self), so we make do: - mu = first_waiter.value.cv_mu - - old_word = @word.get(:relaxed) - first_cant_acquire = (old_word & zero_to_acquire) != 0 - - # We will transfer elements of *wake* to @waiters if all of: - # - some thread holds the lock, and - # - the spinlock is not held, and - # - mu cannot be acquired in the mode of the first waiter, or there's more - # than one thread on wake and not all are readers, and - # - we acquire the spinlock on the first try. - # - # The requirement that some thread holds the lock ensures that at least - # one of the transferred waiters will be woken. - if ((old_word & ANY_LOCK) != 0 && - (old_word & SPINLOCK) == 0 && - (first_cant_acquire || (!next_waiter.null? && !all_readers))) - # acquire the spinlock + mark mu as having waiters - _, success = @word.compare_and_set(old_word, old_word | SPINLOCK | WAITING, :acquire, :relaxed) - return unless success - - set_on_release = 0_u32 - transferred_a_writer = false - woke_a_reader = false - - if first_cant_acquire - transfer(first_waiter, from: wake) - transferred_a_writer = first_is_writer - else - woke_a_reader = !first_is_writer - end - - wake.value.each do |waiter| - is_writer = waiter.value.writer? - - # we transfer this waiter if any of: - # - the first waiter can't acquire mu, - # - the first waiter is a writer, or - # - this element is a writer - if waiter.value.cv_mu != mu - # edge case: waiter doesn't wait for this mu, wake it - elsif first_cant_acquire || first_is_writer || is_writer - transfer(waiter, from: wake) - transferred_a_writer ||= is_writer - else - woke_a_reader ||= !is_writer - end - end - - # claim a waiting writer if we transferred one, except if we woke - # readers, in which case we want those readers to be able to acquire - # immediately - if transferred_a_writer && !woke_a_reader - set_on_release |= WRITER_WAITING - end - - # release spinlock (WAITING has already been set on acquire) - release_spinlock(set_on_release) - end - end - - private def transfer(waiter, from) - from.value.delete(waiter) - @waiters.push(waiter) - - # no need to set waiting (it's already true) but we must tell CV#wait - # that the waiter has been transferred and is no longer a CV waiter - waiter.value.cv_mu = Pointer(MU).null - end end end From 935ecba23d16be73bb6db1e4bbd3608762c88184 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 19:05:57 +0100 Subject: [PATCH 06/12] fixup: use DESIGNATED_WAKER instead of DESIG_WAKER --- src/sync/mu.cr | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/sync/mu.cr b/src/sync/mu.cr index 27d4f8fced90..f4552dbfeadc 100644 --- a/src/sync/mu.cr +++ b/src/sync/mu.cr @@ -24,14 +24,14 @@ require "./waiter" module Sync # :nodoc: struct MU - UNLOCKED = 0_u32 - WLOCK = 1_u32 - SPINLOCK = 2_u32 - WAITING = 4_u32 - WRITER_WAITING = 8_u32 - LONG_WAIT = 16_u32 - DESIG_WAKER = 32_u32 - RLOCK = 256_u32 + UNLOCKED = 0_u32 + WLOCK = 1_u32 + SPINLOCK = 2_u32 + WAITING = 4_u32 + WRITER_WAITING = 8_u32 + LONG_WAIT = 16_u32 + DESIGNATED_WAKER = 32_u32 + RLOCK = 256_u32 RMASK = ~(RLOCK - 1_u32) ANY_LOCK = WLOCK | RMASK @@ -166,7 +166,7 @@ module Sync # woken fiber doesn't care about long wait or a writer waiting, and # must clear the designated waker flag zero_to_acquire &= ~(LONG_WAIT | WRITER_WAITING) - clear = DESIG_WAKER + clear = DESIGNATED_WAKER end end @@ -187,7 +187,7 @@ module Sync raise RuntimeError.new("Can't unlock Sync::MU that isn't held") end - if (word & WAITING) == 0 && (word & DESIG_WAKER) != 0 + if (word & WAITING) == 0 && (word & DESIGNATED_WAKER) != 0 # no waiters, or there is a designated waker already (no need to wake # another one), try quick unlock _, success = @word.compare_and_set(word, word &- WLOCK, :release, :relaxed) @@ -208,7 +208,7 @@ module Sync raise RuntimeError.new("Can't runlock Sync::MU that isn't held") end - if (word & WAITING) == 0 && (word & DESIG_WAKER) != 0 && (word & RMASK) > RLOCK + if (word & WAITING) == 0 && (word & DESIGNATED_WAKER) != 0 && (word & RMASK) > RLOCK # no waiters, there is a designated waker already (no need to wake # another one), and there are still readers, try quick unlock _, success = @word.compare_and_set(word, word &- RLOCK, :release, :relaxed) @@ -233,7 +233,7 @@ module Sync while true word = @word.get(:relaxed) - if (word & WAITING) == 0 || (word & DESIG_WAKER) != 0 || (word & RMASK) > RLOCK + if (word & WAITING) == 0 || (word & DESIGNATED_WAKER) != 0 || (word & RMASK) > RLOCK # no waiters, there is a designated waker (no need to wake another # one), or there are still readers, try release lock word, success = @word.compare_and_set(word, word - sub_on_release, :release, :relaxed) @@ -241,7 +241,7 @@ module Sync elsif (word & SPINLOCK) == 0 # there might be a waiter, and no designated waker, try to acquire # spinlock, and release the lock (early) - _, success = @word.compare_and_set(word, (word | SPINLOCK | DESIG_WAKER) &- sub_on_release, :acquire_release, :relaxed) + _, success = @word.compare_and_set(word, (word | SPINLOCK | DESIGNATED_WAKER) &- sub_on_release, :acquire_release, :relaxed) if success # spinlock is held, resume a single writer, or resume all readers wake = Crystal::PointerLinkedList(Waiter).new @@ -265,8 +265,8 @@ module Sync # update flags clear = 0_u32 - clear |= DESIG_WAKER if wake.empty? # nothing to wake => no designated waker - clear |= WAITING if @waiters.empty? # no more waiters => nothing waiting + clear |= DESIGNATED_WAKER if wake.empty? # nothing to wake => no designated waker + clear |= WAITING if @waiters.empty? # no more waiters => nothing waiting release_spinlock(set: writer_waiting, clear: clear) From 4137560ba9d9c6dbe4ac908f7388bf35f51d6c74 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 21 Nov 2025 19:06:16 +0100 Subject: [PATCH 07/12] fixup: unused assignment --- spec/std/sync/rw_lock_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/std/sync/rw_lock_spec.cr b/spec/std/sync/rw_lock_spec.cr index db459f3fe726..9164df628d21 100644 --- a/spec/std/sync/rw_lock_spec.cr +++ b/spec/std/sync/rw_lock_spec.cr @@ -97,7 +97,7 @@ describe Sync::RWLock do done = started = locked = false lock = Sync::RWLock.new(:unchecked) - fiber = spawn do + spawn do started = true lock.lock_write locked = true From 92e005337aa32118c93b6bba054e920d0661ba6d Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 24 Nov 2025 15:15:27 +0100 Subject: [PATCH 08/12] Fix: don't use macro in Sync::Mutex spec --- spec/std/sync/mutex_spec.cr | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/spec/std/sync/mutex_spec.cr b/spec/std/sync/mutex_spec.cr index 21d82c7b62bd..a83b0928fcc8 100644 --- a/spec/std/sync/mutex_spec.cr +++ b/spec/std/sync/mutex_spec.cr @@ -2,11 +2,11 @@ require "./spec_helper" require "sync/mutex" describe Sync::Mutex do - {% for type in %i[checked unchecked reentrant] %} - describe {{type}} do + Sync::Type.each do |type| + describe type do it "locks and unlocks" do state = Atomic.new(0) - m = Sync::Mutex.new({{type}}) + m = Sync::Mutex.new(type) m.lock ::spawn do @@ -20,9 +20,9 @@ describe Sync::Mutex do Sync.eventually { state.get.should eq(2) } end - {% unless type == :unchecked %} + unless type.unchecked? it "unlock raises when not locked" do - m = Sync::Mutex.new({{type}}) + m = Sync::Mutex.new(type) expect_raises(Sync::Error) { m.unlock } end @@ -34,10 +34,10 @@ describe Sync::Mutex do expect_raises(Sync::Error) { m.unlock } end end - {% end %} + end it "synchronizes" do - m = Sync::Mutex.new({{type}}) + m = Sync::Mutex.new(type) counter = 0 IO.pipe do |r, w| @@ -71,7 +71,7 @@ describe Sync::Mutex do end end end - {% end %} + end describe "unchecked" do it "hangs on deadlock" do From 4dc54564b199e84dc12adb75d9905dfa552a5247 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 24 Nov 2025 15:20:53 +0100 Subject: [PATCH 09/12] Use #owns_lock? helper --- src/sync/mutex.cr | 2 +- src/sync/rw_lock.cr | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sync/mutex.cr b/src/sync/mutex.cr index 34782abe04cd..b0683adec099 100644 --- a/src/sync/mutex.cr +++ b/src/sync/mutex.cr @@ -33,7 +33,7 @@ module Sync def lock : Nil unless @mu.try_lock? unless @type.unchecked? - if @locked_by == Fiber.current + if owns_lock? raise Error::Deadlock.new unless @type.reentrant? @counter += 1 return diff --git a/src/sync/rw_lock.cr b/src/sync/rw_lock.cr index cb575b8e27ea..9d1e04417b4b 100644 --- a/src/sync/rw_lock.cr +++ b/src/sync/rw_lock.cr @@ -85,7 +85,7 @@ module Sync def lock_write : Nil unless @mu.try_lock? unless @type.unchecked? - if @locked_by == Fiber.current + if owns_lock? raise Error::Deadlock.new unless @type.reentrant? @counter += 1 return From 278e9762330025efd79cd9b6f27d09c68ef8c97b Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 24 Nov 2025 16:15:58 +0100 Subject: [PATCH 10/12] cleanup --- spec/std/sync/mutex_spec.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/std/sync/mutex_spec.cr b/spec/std/sync/mutex_spec.cr index a83b0928fcc8..6f9aae63d5be 100644 --- a/spec/std/sync/mutex_spec.cr +++ b/spec/std/sync/mutex_spec.cr @@ -9,7 +9,7 @@ describe Sync::Mutex do m = Sync::Mutex.new(type) m.lock - ::spawn do + spawn do state.set(1) m.lock state.set(2) @@ -78,7 +78,7 @@ describe Sync::Mutex do m = Sync::Mutex.new(:unchecked) done = started = locked = false - fiber = ::spawn do + spawn do started = true m.lock From e75a0058534229427bae22b63f897d0179ab7636 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 24 Nov 2025 18:45:10 +0100 Subject: [PATCH 11/12] Add src/sync to src/docs_main.cr --- src/docs_main.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/docs_main.cr b/src/docs_main.cr index a2fe130c068b..6112ad1fb6d1 100644 --- a/src/docs_main.cr +++ b/src/docs_main.cr @@ -20,6 +20,7 @@ require "./math/**" require "./random/**" require "./spec/helpers/**" require "./string/**" +require "./sync/**" require "./system/**" require "./uri/**" require "./uuid/**" From 35a62062b6730654e353a4262c80e042f6fd9281 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 25 Nov 2025 11:01:34 +0100 Subject: [PATCH 12/12] Expand potentially confusing oneliner --- src/sync/mu.cr | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sync/mu.cr b/src/sync/mu.cr index f4552dbfeadc..899d8978ef2d 100644 --- a/src/sync/mu.cr +++ b/src/sync/mu.cr @@ -81,11 +81,15 @@ module Sync end def lock : Nil - lock_slow unless try_lock? + unless try_lock? + lock_slow + end end def rlock : Nil - rlock_slow unless try_rlock? + unless try_rlock? + rlock_slow + end end def lock_slow