-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add Sync::Mutex and Sync::RWLock
#16399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
straight-shoota
merged 12 commits into
crystal-lang:master
from
ysbaddaden:feature/import-sync-mutex-and-rwlock
Nov 26, 2025
+912
−0
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
41b0e49
Import Sync::MU from the sync shard (undocumented)
ysbaddaden 16873d9
Add Sync::Type, Sync::Error and spec helpers
ysbaddaden 2be1837
Import Sync::Mutex from the sync shard
ysbaddaden 411f222
Import Sync::RWLock from the sync shard
ysbaddaden 6b4b29f
fixup! Import Sync::MU from the sync shard (undocumented)
ysbaddaden 935ecba
fixup: use DESIGNATED_WAKER instead of DESIG_WAKER
ysbaddaden 4137560
fixup: unused assignment
ysbaddaden 92e0053
Fix: don't use macro in Sync::Mutex spec
ysbaddaden 4dc5456
Use #owns_lock? helper
ysbaddaden 278e976
cleanup
ysbaddaden e75a005
Add src/sync to src/docs_main.cr
ysbaddaden 35a6206
Expand potentially confusing oneliner
ysbaddaden File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| require "./spec_helper" | ||
| require "sync/mutex" | ||
|
|
||
| describe Sync::Mutex do | ||
| Sync::Type.each do |type| | ||
| describe type do | ||
| it "locks and unlocks" do | ||
| state = Atomic.new(0) | ||
| m = Sync::Mutex.new(type) | ||
| m.lock | ||
|
|
||
| spawn do | ||
| state.set(1) | ||
| m.lock | ||
| state.set(2) | ||
| end | ||
|
|
||
| Sync.eventually { state.get.should eq(1) } | ||
| m.unlock | ||
| Sync.eventually { state.get.should eq(2) } | ||
| end | ||
|
|
||
| unless type.unchecked? | ||
| it "unlock raises when not locked" do | ||
| m = Sync::Mutex.new(type) | ||
| expect_raises(Sync::Error) { m.unlock } | ||
| end | ||
|
|
||
| it "unlock raises when another fiber tries to unlock" do | ||
| m = Sync::Mutex.new(:reentrant) | ||
| m.lock | ||
|
|
||
| Sync.async do | ||
| expect_raises(Sync::Error) { m.unlock } | ||
| end | ||
| end | ||
| end | ||
|
|
||
| it "synchronizes" do | ||
| m = Sync::Mutex.new(type) | ||
| counter = 0 | ||
|
|
||
| IO.pipe do |r, w| | ||
| consumer = WaitGroup.new | ||
| publishers = WaitGroup.new | ||
|
|
||
| # no races when writing to pipe (concurrency) | ||
| consumer.spawn do | ||
| c = 0 | ||
| while line = r.gets | ||
| line.to_i?.should eq(c += 1) | ||
| end | ||
| end | ||
|
|
||
| # no races when incrementing counter (parallelism) | ||
| 100.times do |i| | ||
| publishers.spawn do | ||
| 500.times do | ||
| m.synchronize do | ||
| w.puts (counter += 1).to_s | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
| publishers.wait | ||
| w.close | ||
| counter.should eq(100 * 500) | ||
|
|
||
| consumer.wait | ||
| end | ||
| end | ||
| end | ||
| end | ||
|
|
||
| describe "unchecked" do | ||
| it "hangs on deadlock" do | ||
| m = Sync::Mutex.new(:unchecked) | ||
| done = started = locked = false | ||
|
|
||
| spawn do | ||
| started = true | ||
|
|
||
| m.lock | ||
| locked = true | ||
|
|
||
| m.lock # deadlock | ||
| raise "ERROR: unreachable" unless done | ||
| end | ||
|
|
||
| Sync.eventually { started.should be_true } | ||
| Sync.eventually { locked.should be_true } | ||
| sleep 10.milliseconds | ||
|
|
||
| # unlock the fiber (cleanup) | ||
| done = true | ||
| m.unlock | ||
| end | ||
|
|
||
| it "unlocks from other fiber" do | ||
| m = Sync::Mutex.new(:unchecked) | ||
| m.lock | ||
| Sync.async { m.unlock } | ||
| end | ||
| end | ||
|
|
||
| describe "checked" do | ||
| it "raises on deadlock" do | ||
| m = Sync::Mutex.new(:checked) | ||
| m.lock | ||
| expect_raises(Sync::Error::Deadlock) { m.lock } | ||
| end | ||
| end | ||
|
|
||
| describe "reentrant" do | ||
| it "re-locks" do | ||
| m = Sync::Mutex.new(:reentrant) | ||
| m.lock | ||
| m.lock # nothing raised | ||
| end | ||
|
|
||
| it "unlocks as many times as it locked" do | ||
| m = Sync::Mutex.new(:reentrant) | ||
| 100.times { m.lock } | ||
| 100.times { m.unlock } | ||
| expect_raises(Sync::Error) { m.unlock } | ||
| end | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| require "./spec_helper" | ||
| require "sync/rw_lock" | ||
|
|
||
| describe Sync::RWLock do | ||
| it "lock write waits for all read locks to be unlocked" do | ||
| done = false | ||
|
|
||
| lock = Sync::RWLock.new | ||
| lock.lock_read | ||
| lock.lock_read | ||
|
|
||
| spawn do | ||
| lock.lock_write | ||
| done = true | ||
| end | ||
|
|
||
| sleep(10.milliseconds) | ||
| done.should be_false | ||
|
|
||
| lock.unlock_read | ||
| sleep(10.milliseconds) | ||
| done.should be_false | ||
|
|
||
| lock.unlock_read | ||
| sleep(10.milliseconds) | ||
| done.should be_true | ||
| end | ||
|
|
||
| it "can't lock read while locked for write" do | ||
| done = false | ||
|
|
||
| lock = Sync::RWLock.new | ||
| lock.lock_write | ||
|
|
||
| spawn do | ||
| lock.lock_read | ||
| done = true | ||
| end | ||
|
|
||
| sleep(10.milliseconds) | ||
| done.should be_false | ||
|
|
||
| lock.unlock_write | ||
| sleep(10.milliseconds) | ||
| done.should be_true | ||
| end | ||
|
|
||
| it "synchronizes locks" do | ||
| lock = Sync::RWLock.new | ||
| wg = WaitGroup.new | ||
|
|
||
| ary = [] of Int32 | ||
| counter = Atomic(Int64).new(0) | ||
|
|
||
| # readers can run concurrently, but are mutually exclusive to writers (the | ||
| # array can be safely read from): | ||
|
|
||
| 10.times do | ||
| spawn(name: "reader") do | ||
| 100.times do | ||
| lock.read do | ||
| ary.each { counter.add(1) } | ||
| end | ||
| Fiber.yield | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # writers are mutually exclusive: they can safely mutate the array | ||
|
|
||
| 5.times do | ||
| wg.spawn(name: "writer:increment") do | ||
| 100.times do | ||
| lock.write { 100.times { ary << ary.size } } | ||
| Fiber.yield | ||
| end | ||
| end | ||
| end | ||
|
|
||
| 4.times do | ||
| wg.spawn(name: "writer:decrement") do | ||
| 100.times do | ||
| lock.write { 100.times { ary.pop? } } | ||
| Fiber.yield | ||
| end | ||
| end | ||
| end | ||
|
|
||
| wg.wait | ||
|
|
||
| ary.should eq((0..(ary.size - 1)).to_a) | ||
| counter.lazy_get.should be > 0 | ||
| end | ||
|
|
||
| describe "unchecked" do | ||
| it "deadlocks on re-lock write" do | ||
| done = started = locked = false | ||
| lock = Sync::RWLock.new(:unchecked) | ||
|
|
||
| spawn do | ||
| started = true | ||
| lock.lock_write | ||
| locked = true | ||
| lock.lock_write # deadlock | ||
| raise "ERROR: unreachable" unless done | ||
| end | ||
|
|
||
| Sync.eventually { started.should be_true } | ||
| Sync.eventually { locked.should be_true } | ||
| sleep 10.milliseconds | ||
|
|
||
| # unlock the fiber (cleanup) | ||
| done = true | ||
| lock.unlock_write | ||
| end | ||
|
|
||
| it "unlocks write despite not being locked" do | ||
| lock = Sync::RWLock.new(:unchecked) | ||
| expect_raises(RuntimeError) { lock.unlock_write } # MU has a safety check | ||
| end | ||
|
|
||
| it "unlocks write from another fiber" do | ||
| lock = Sync::RWLock.new(:unchecked) | ||
| lock.lock_write | ||
| Sync.async { lock.unlock_write } # nothing raised | ||
| end | ||
| end | ||
|
|
||
| describe "checked" do | ||
| it "raises on re-kock write" do | ||
| lock = Sync::RWLock.new(:checked) | ||
| lock.lock_write | ||
| expect_raises(Sync::Error::Deadlock) { lock.lock_write } | ||
| end | ||
|
|
||
| it "raises on unlock_write when not locked" do | ||
| lock = Sync::RWLock.new(:checked) | ||
| expect_raises(Sync::Error) { lock.unlock_write } | ||
| end | ||
|
|
||
| it "raises on unlock_write from another fiber" do | ||
| lock = Sync::RWLock.new(:checked) | ||
| lock.lock_write | ||
| Sync.async do | ||
| expect_raises(Sync::Error) { lock.unlock_write } | ||
| end | ||
| end | ||
| end | ||
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| require "../spec_helper" | ||
| require "wait_group" | ||
|
|
||
| module Sync | ||
| def self.eventually(timeout : Time::Span = 1.second, &) | ||
| start = Time.monotonic | ||
|
|
||
| loop do | ||
| Fiber.yield | ||
|
|
||
| begin | ||
| yield | ||
| rescue ex | ||
| raise ex if (Time.monotonic - start) > timeout | ||
| else | ||
| break | ||
| end | ||
| end | ||
| end | ||
|
|
||
| def self.async(&block) : Nil | ||
| done = false | ||
| exception = nil | ||
|
|
||
| spawn do | ||
| block.call | ||
| rescue ex | ||
| exception = ex | ||
| ensure | ||
| done = true | ||
| end | ||
|
|
||
| eventually { done.should be_true, "Expected async fiber to have finished" } | ||
|
|
||
| if ex = exception | ||
| raise ex | ||
| end | ||
| end | ||
| end | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| module Sync | ||
| # Raised when a sync check fails. For example when trying to unlock an | ||
| # unlocked mutex. See `#message` for details. | ||
| class Error < Exception | ||
| # Raised when a lock would result in a deadlock. For example when trying to | ||
| # re-lock a checked mutex. | ||
| class Deadlock < Error | ||
| end | ||
| end | ||
| end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.