Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions spec/std/crystal/fd_lock_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
require "spec"
require "crystal/fd_lock"
require "wait_group"

describe Crystal::FdLock do
describe "#reference" do
it "acquires" do
lock = Crystal::FdLock.new
called = 0

lock.reference { called += 1 }
lock.reference { called += 1 }

called.should eq(2)
end

it "allows reentrancy (side effect)" do
lock = Crystal::FdLock.new
called = 0

lock.reference { called += 1 }
lock.reference do
lock.reference { called += 1 }
end

called.should eq(2)
end

it "acquires shared reference" do
lock = Crystal::FdLock.new

ready = WaitGroup.new(1)
release = Channel(String).new

spawn do
lock.reference do
ready.done

select
when release.send("ok")
when timeout(1.second)
release.send("timeout")
end
end
end

ready.wait
lock.reference { }

release.receive.should eq("ok")
end

it "raises when closed" do
lock = Crystal::FdLock.new
lock.try_close? { }

called = false
expect_raises(IO::Error, "Closed") do
lock.reference { called = true }
end

called.should be_false
end
end

describe "#try_close?" do
it "closes" do
lock = Crystal::FdLock.new
lock.closed?.should be_false

called = false
lock.try_close? { called = true }.should be_true
lock.closed?.should be_true
called.should be_true
end

it "closes once" do
lock = Crystal::FdLock.new

called = 0

WaitGroup.wait do |wg|
10.times do
wg.spawn do
lock.try_close? { called += 1 }
lock.try_close? { called += 1 }
end
end
end

called.should eq(1)
end

it "waits for all references to return" do
lock = Crystal::FdLock.new

ready = WaitGroup.new(10)
exceptions = Channel(Exception).new(10)

WaitGroup.wait do |wg|
10.times do
wg.spawn do
begin
lock.reference do
ready.done
Fiber.yield
end
rescue ex
exceptions.send(ex)
end
end
end

ready.wait

called = false
lock.try_close? { called = true }.should be_true
lock.closed?.should be_true
called.should be_true
end
exceptions.close

if ex = exceptions.receive?
raise ex
end
end
end

it "#reset" do
lock = Crystal::FdLock.new
lock.try_close? { }
lock.reset
lock.try_close? { }.should eq(true)
end
end
118 changes: 118 additions & 0 deletions src/crystal/fd_lock.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause,
# Copyright Google):
# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go

# :nodoc:
#
# Tracks active references over a system file descriptor (fd) and serializes
# reads and writes.
#
# Every access to the fd that may affect its system state or system buffers must
# acquire a shared lock.
#
# The fdlock can be closed at any time, but the actual system close will wait
# until there are no more references left. This avoids potential races when a
# thread might try to read a fd that has been closed and has been reused by the
# OS for example.
struct Crystal::FdLock
CLOSED = 1_u32 << 0 # the fdlock has been closed
REF = 1_u32 << 1 # the reference counter increment
MASK = ~(REF - 1) # mask for the reference counter

@m = Atomic(UInt32).new(0_u32)
@closing : Fiber?

# Borrows a reference for the duration of the block. Raises if the fdlock is
# closed while trying to borrow.
def reference(& : -> F) : F forall F
m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed)
increment_slow(m) unless success

begin
yield
ensure
m = @m.sub(REF, :release)
handle_last_ref(m)
end
end

private def increment_slow(m)
while true
if (m & CLOSED) == CLOSED
raise IO::Error.new("Closed")
end
m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed)
break if success
end
end

private def handle_last_ref(m)
return unless (m & CLOSED) == CLOSED # is closed?
return unless (m & MASK) == REF # was the last ref?

# the last ref after close is responsible to resume the closing fiber
if fiber = @closing
fiber.enqueue
else
raise NilAssertionError.new("BUG: expected a closing fiber to resume.")
end
end

# Closes the fdlock. Blocks for as long as there are references.
#
# The *callback* block must cancel any external waiters (e.g. pending evloop
# reads or writes).
#
# Returns true if the fdlock has been closed: no fiber can acquire a reference
# anymore, the calling fiber fully owns the fd and can safely close it.
#
# Returns false if the fdlock has already been closed: the calling fiber
# doesn't own the fd and musn't close it, as there might still be active
# references and another fiber will close anyway.
def try_close?(&callback : ->) : Bool
attempts = 0

while true
m = @m.get(:relaxed)

if (m & CLOSED) == CLOSED
# already closed: abort
return false
end

# close + increment ref
m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed)
break if success

attempts = Thread.delay(attempts)
end

# set the current fiber as the closing fiber (to be resumed by the last ref)
@closing = Fiber.current

# decrement the last ref
m = @m.sub(REF, :release)

begin
yield
ensure
# wait for the last ref... unless we're the last ref!
Fiber.suspend unless (m & MASK) == REF
end

@closing = nil
true
end

# Resets the fdlock back to its pristine state so it can be used again.
# Assumes the caller owns the fdlock. This is required by
# `TCPSocket#initialize`.
def reset : Nil
@m.lazy_set(0_u32)
@closing = nil
end

def closed? : Bool
(@m.get(:relaxed) & CLOSED) == CLOSED
end
end
10 changes: 5 additions & 5 deletions src/crystal/system/unix/file.cr
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ module Crystal::System::File
end

private def system_chown(uid : Int, gid : Int)
ret = LibC.fchown(fd, uid, gid)
ret = @fd_lock.reference { LibC.fchown(fd, uid, gid) }
raise ::File::Error.from_errno("Error changing owner", file: path) if ret == -1
end

Expand All @@ -120,7 +120,7 @@ module Crystal::System::File
end

private def system_chmod(mode)
if LibC.fchmod(fd, mode) == -1
if @fd_lock.reference { LibC.fchmod(fd, mode) } == -1
raise ::File::Error.from_errno("Error changing permissions", file: path)
end
end
Expand Down Expand Up @@ -201,12 +201,12 @@ module Crystal::System::File
timespecs = uninitialized LibC::Timespec[2]
timespecs[0] = Crystal::System::Time.to_timespec(atime)
timespecs[1] = Crystal::System::Time.to_timespec(mtime)
LibC.futimens(fd, timespecs)
@fd_lock.reference { LibC.futimens(fd, timespecs) }
{% elsif LibC.has_method?("futimes") %}
timevals = uninitialized LibC::Timeval[2]
timevals[0] = Crystal::System::Time.to_timeval(atime)
timevals[1] = Crystal::System::Time.to_timeval(mtime)
LibC.futimes(fd, timevals)
@fd_lock.reference { LibC.futimes(fd, timevals) }
{% else %}
{% raise "Missing futimens & futimes" %}
{% end %}
Expand All @@ -217,7 +217,7 @@ module Crystal::System::File
end

private def system_truncate(size) : Nil
code = LibC.ftruncate(fd, size)
code = @fd_lock.reference { LibC.ftruncate(fd, size) }
if code != 0
raise ::File::Error.from_errno("Error truncating file", file: path)
end
Expand Down
Loading