From 32814ccf2283c8f03718cde7c8b97ea98c31859a Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 14 Oct 2025 14:29:13 +0200 Subject: [PATCH 1/2] Add Crystal::FdLock to count system fd references This patch implements a reference counted lock to protect IO objects that depend on a reusable system fd (IO::FileDescriptor, File and Socket) to protect them against thread safety issues around close: - Thread 1 wants to read from fd 123; - The OS preempts Thread 1; - Thread 2 closes fd 123; - Thread 2 opens something else and the OS reuses fd 123; - The OS resumes Thread 1; - Thread 1 reads from the reused fd 123!!! The issue arises for any operation that would mutate the fd: write, fchown, ftruncate, setsockopt, ... as they risk affecting a reused fd instead of the expected one. --- spec/std/crystal/fd_lock_spec.cr | 137 +++++++++++++++++++++++++++++++ src/crystal/fd_lock.cr | 5 ++ src/crystal/fd_lock_mt.cr | 117 ++++++++++++++++++++++++++ src/crystal/fd_lock_no_mt.cr | 35 ++++++++ 4 files changed, 294 insertions(+) create mode 100644 spec/std/crystal/fd_lock_spec.cr create mode 100644 src/crystal/fd_lock.cr create mode 100644 src/crystal/fd_lock_mt.cr create mode 100644 src/crystal/fd_lock_no_mt.cr diff --git a/spec/std/crystal/fd_lock_spec.cr b/spec/std/crystal/fd_lock_spec.cr new file mode 100644 index 000000000000..99e15fa73e1b --- /dev/null +++ b/spec/std/crystal/fd_lock_spec.cr @@ -0,0 +1,137 @@ +require "spec" +require "../../support/interpreted.cr" +require "crystal/fd_lock" +require "wait_group" + +describe Crystal::FdLock do + describe "#reference" do + it "acquires" do + lock = Crystal::FdLock.new + called = 0 + + lock.reference { called += 1 } + lock.reference { called += 1 } + + called.should eq(2) + end + + it "allows reentrancy (side effect)" do + lock = Crystal::FdLock.new + called = 0 + + lock.reference { called += 1 } + lock.reference do + lock.reference { called += 1 } + end + + called.should eq(2) + end + + it "acquires shared reference" do + lock = Crystal::FdLock.new + + ready = WaitGroup.new(1) + release = Channel(String).new + + spawn do + lock.reference do + ready.done + + select + when release.send("ok") + when timeout(1.second) + release.send("timeout") + end + end + end + + ready.wait + lock.reference { } + + release.receive.should eq("ok") + end + + it "raises when closed" do + lock = Crystal::FdLock.new + lock.try_close? { } + + called = false + expect_raises(IO::Error, "Closed") do + lock.reference { called = true } + end + + called.should be_false + end + end + + describe "#try_close?" do + it "closes" do + lock = Crystal::FdLock.new + lock.closed?.should be_false + + called = false + lock.try_close? { called = true }.should be_true + lock.closed?.should be_true + called.should be_true + end + + it "closes once" do + lock = Crystal::FdLock.new + + called = 0 + + WaitGroup.wait do |wg| + 10.times do + wg.spawn do + lock.try_close? { called += 1 } + lock.try_close? { called += 1 } + end + end + end + + called.should eq(1) + end + + # FIXME: the interpreter segfaults while running this spec (NULL pointer) + pending_interpreted "waits for all references to return" do + lock = Crystal::FdLock.new + + ready = WaitGroup.new(10) + exceptions = Channel(Exception).new(10) + + WaitGroup.wait do |wg| + 10.times do + wg.spawn do + begin + lock.reference do + ready.done + Fiber.yield + end + rescue ex + exceptions.send(ex) + end + end + end + + ready.wait + + called = false + lock.try_close? { called = true }.should be_true + lock.closed?.should be_true + called.should be_true + end + exceptions.close + + if ex = exceptions.receive? + raise ex + end + end + end + + it "#reset" do + lock = Crystal::FdLock.new + lock.try_close? { } + lock.reset + lock.try_close? { }.should eq(true) + end +end diff --git a/src/crystal/fd_lock.cr b/src/crystal/fd_lock.cr new file mode 100644 index 000000000000..0a8c329f9704 --- /dev/null +++ b/src/crystal/fd_lock.cr @@ -0,0 +1,5 @@ +{% if flag?(:preview_mt) %} + require "./fd_lock_mt" +{% else %} + require "./fd_lock_no_mt" +{% end %} diff --git a/src/crystal/fd_lock_mt.cr b/src/crystal/fd_lock_mt.cr new file mode 100644 index 000000000000..c6789fda596f --- /dev/null +++ b/src/crystal/fd_lock_mt.cr @@ -0,0 +1,117 @@ +# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause, +# Copyright Google): +# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go + +# :nodoc: +# +# Tracks active references over a system file descriptor (fd) and serializes +# reads and writes. +# +# Every access to the fd that may affect its system state or system buffers must +# acquire a shared lock. +# +# The fdlock can be closed at any time, but the actual system close will wait +# until there are no more references left. This avoids potential races when a +# thread might try to read a fd that has been closed and has been reused by the +# OS for example. +# +# FIXME: the interpreter segfaults when interpreted code uses this type; for now +# it uses the thread unsafe alternative (fd_lock_no_mt). +struct Crystal::FdLock + CLOSED = 1_u32 << 0 # the fdlock has been closed + REF = 1_u32 << 1 # the reference counter increment + MASK = ~(REF - 1) # mask for the reference counter + + @m = Atomic(UInt32).new(0_u32) + @closing : Fiber? + + # Borrows a reference for the duration of the block. Raises if the fdlock is + # closed while trying to borrow. + def reference(& : -> F) : F forall F + m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed) + increment_slow(m) unless success + + begin + yield + ensure + m = @m.sub(REF, :release) + handle_last_ref(m) + end + end + + private def increment_slow(m) + while true + if (m & CLOSED) == CLOSED + raise IO::Error.new("Closed") + end + m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed) + break if success + end + end + + private def handle_last_ref(m) + return unless (m & CLOSED) == CLOSED # is closed? + return unless (m & MASK) == REF # was the last ref? + + # the last ref after close is responsible to resume the closing fiber + @closing.not_nil!("BUG: expected a closing fiber to resume.").enqueue + end + + # Closes the fdlock. Blocks for as long as there are references. + # + # The *callback* block must cancel any external waiters (e.g. pending evloop + # reads or writes). + # + # Returns true if the fdlock has been closed: no fiber can acquire a reference + # anymore, the calling fiber fully owns the fd and can safely close it. + # + # Returns false if the fdlock has already been closed: the calling fiber + # doesn't own the fd and musn't close it, as there might still be active + # references and another fiber will close anyway. + def try_close?(&callback : ->) : Bool + attempts = 0 + + while true + m = @m.get(:relaxed) + + if (m & CLOSED) == CLOSED + # already closed: abort + return false + end + + # close + increment ref + m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed) + break if success + + attempts = Thread.delay(attempts) + end + + # set the current fiber as the closing fiber (to be resumed by the last ref) + @closing = Fiber.current + + # decrement the last ref + m = @m.sub(REF, :release) + + begin + yield + ensure + # wait for the last ref... unless we're the last ref! + Fiber.suspend unless (m & MASK) == REF + end + + @closing = nil + true + end + + # Resets the fdlock back to its pristine state so it can be used again. + # Assumes the caller owns the fdlock. This is required by + # `TCPSocket#initialize`. + def reset : Nil + @m.lazy_set(0_u32) + @closing = nil + end + + def closed? : Bool + (@m.get(:relaxed) & CLOSED) == CLOSED + end +end diff --git a/src/crystal/fd_lock_no_mt.cr b/src/crystal/fd_lock_no_mt.cr new file mode 100644 index 000000000000..14a3cad6720b --- /dev/null +++ b/src/crystal/fd_lock_no_mt.cr @@ -0,0 +1,35 @@ +# :nodoc: +# +# Simpler, but thread unsafe, alternative to Crystal::FdLock that only +# serializes reads and writes and otherwise doesn't count references or waits +# for references before closing. This is mostly needed for the interpreter that +# happens to segfault with the thread safe alternative (see fd_lock_mt). +struct Crystal::FdLock + CLOSED = 1_u8 << 0 + + @m = 0_u8 + + def reference(&) + raise IO::Error.new("Closed") if closed? + yield + end + + def reset : Nil + @m = 0_u8 + end + + def closed? + (@m & CLOSED) == CLOSED + end + + def try_close?(&) + if closed? + false + else + @m |= CLOSED + + yield + true + end + end +end From 315077200c6aaa4abc823d2058dcf846dbd8a58f Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 2 Oct 2025 15:13:10 +0200 Subject: [PATCH 2/2] Integrate Crystal::FdLock into IO::FileDescriptor, File and Socket Only operations that can affect the file descriptor are counted, for example read or write, truncating a file or changing file permissions. Mere queries with no side effects go through normally because at worst they will fail (they would have anyway). --- spec/std/crystal/fd_lock_spec.cr | 4 +- src/crystal/fd_lock.cr | 123 ++++++++++++++++++++- src/crystal/fd_lock_mt.cr | 117 -------------------- src/crystal/fd_lock_no_mt.cr | 35 ------ src/crystal/system/unix/file.cr | 10 +- src/crystal/system/unix/file_descriptor.cr | 69 +++++++----- src/crystal/system/unix/socket.cr | 51 +++++++-- src/crystal/system/wasi/socket.cr | 4 + 8 files changed, 212 insertions(+), 201 deletions(-) delete mode 100644 src/crystal/fd_lock_mt.cr delete mode 100644 src/crystal/fd_lock_no_mt.cr diff --git a/spec/std/crystal/fd_lock_spec.cr b/spec/std/crystal/fd_lock_spec.cr index 99e15fa73e1b..f75a41f2d0d9 100644 --- a/spec/std/crystal/fd_lock_spec.cr +++ b/spec/std/crystal/fd_lock_spec.cr @@ -1,5 +1,4 @@ require "spec" -require "../../support/interpreted.cr" require "crystal/fd_lock" require "wait_group" @@ -92,8 +91,7 @@ describe Crystal::FdLock do called.should eq(1) end - # FIXME: the interpreter segfaults while running this spec (NULL pointer) - pending_interpreted "waits for all references to return" do + it "waits for all references to return" do lock = Crystal::FdLock.new ready = WaitGroup.new(10) diff --git a/src/crystal/fd_lock.cr b/src/crystal/fd_lock.cr index 0a8c329f9704..368bb07eda95 100644 --- a/src/crystal/fd_lock.cr +++ b/src/crystal/fd_lock.cr @@ -1,5 +1,118 @@ -{% if flag?(:preview_mt) %} - require "./fd_lock_mt" -{% else %} - require "./fd_lock_no_mt" -{% end %} +# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause, +# Copyright Google): +# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go + +# :nodoc: +# +# Tracks active references over a system file descriptor (fd) and serializes +# reads and writes. +# +# Every access to the fd that may affect its system state or system buffers must +# acquire a shared lock. +# +# The fdlock can be closed at any time, but the actual system close will wait +# until there are no more references left. This avoids potential races when a +# thread might try to read a fd that has been closed and has been reused by the +# OS for example. +struct Crystal::FdLock + CLOSED = 1_u32 << 0 # the fdlock has been closed + REF = 1_u32 << 1 # the reference counter increment + MASK = ~(REF - 1) # mask for the reference counter + + @m = Atomic(UInt32).new(0_u32) + @closing : Fiber? + + # Borrows a reference for the duration of the block. Raises if the fdlock is + # closed while trying to borrow. + def reference(& : -> F) : F forall F + m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed) + increment_slow(m) unless success + + begin + yield + ensure + m = @m.sub(REF, :release) + handle_last_ref(m) + end + end + + private def increment_slow(m) + while true + if (m & CLOSED) == CLOSED + raise IO::Error.new("Closed") + end + m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed) + break if success + end + end + + private def handle_last_ref(m) + return unless (m & CLOSED) == CLOSED # is closed? + return unless (m & MASK) == REF # was the last ref? + + # the last ref after close is responsible to resume the closing fiber + if fiber = @closing + fiber.enqueue + else + raise NilAssertionError.new("BUG: expected a closing fiber to resume.") + end + end + + # Closes the fdlock. Blocks for as long as there are references. + # + # The *callback* block must cancel any external waiters (e.g. pending evloop + # reads or writes). + # + # Returns true if the fdlock has been closed: no fiber can acquire a reference + # anymore, the calling fiber fully owns the fd and can safely close it. + # + # Returns false if the fdlock has already been closed: the calling fiber + # doesn't own the fd and musn't close it, as there might still be active + # references and another fiber will close anyway. + def try_close?(&callback : ->) : Bool + attempts = 0 + + while true + m = @m.get(:relaxed) + + if (m & CLOSED) == CLOSED + # already closed: abort + return false + end + + # close + increment ref + m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed) + break if success + + attempts = Thread.delay(attempts) + end + + # set the current fiber as the closing fiber (to be resumed by the last ref) + @closing = Fiber.current + + # decrement the last ref + m = @m.sub(REF, :release) + + begin + yield + ensure + # wait for the last ref... unless we're the last ref! + Fiber.suspend unless (m & MASK) == REF + end + + @closing = nil + true + end + + # Resets the fdlock back to its pristine state so it can be used again. + # Assumes the caller owns the fdlock. This is required by + # `TCPSocket#initialize`. + def reset : Nil + @m.lazy_set(0_u32) + @closing = nil + end + + def closed? : Bool + (@m.get(:relaxed) & CLOSED) == CLOSED + end +end diff --git a/src/crystal/fd_lock_mt.cr b/src/crystal/fd_lock_mt.cr deleted file mode 100644 index c6789fda596f..000000000000 --- a/src/crystal/fd_lock_mt.cr +++ /dev/null @@ -1,117 +0,0 @@ -# The general design is influenced by fdMutex in Go (LICENSE: BSD 3-Clause, -# Copyright Google): -# https://github.com/golang/go/blob/go1.25.1/src/internal/poll/fd_mutex.go - -# :nodoc: -# -# Tracks active references over a system file descriptor (fd) and serializes -# reads and writes. -# -# Every access to the fd that may affect its system state or system buffers must -# acquire a shared lock. -# -# The fdlock can be closed at any time, but the actual system close will wait -# until there are no more references left. This avoids potential races when a -# thread might try to read a fd that has been closed and has been reused by the -# OS for example. -# -# FIXME: the interpreter segfaults when interpreted code uses this type; for now -# it uses the thread unsafe alternative (fd_lock_no_mt). -struct Crystal::FdLock - CLOSED = 1_u32 << 0 # the fdlock has been closed - REF = 1_u32 << 1 # the reference counter increment - MASK = ~(REF - 1) # mask for the reference counter - - @m = Atomic(UInt32).new(0_u32) - @closing : Fiber? - - # Borrows a reference for the duration of the block. Raises if the fdlock is - # closed while trying to borrow. - def reference(& : -> F) : F forall F - m, success = @m.compare_and_set(0_u32, REF, :acquire, :relaxed) - increment_slow(m) unless success - - begin - yield - ensure - m = @m.sub(REF, :release) - handle_last_ref(m) - end - end - - private def increment_slow(m) - while true - if (m & CLOSED) == CLOSED - raise IO::Error.new("Closed") - end - m, success = @m.compare_and_set(m, m + REF, :acquire, :relaxed) - break if success - end - end - - private def handle_last_ref(m) - return unless (m & CLOSED) == CLOSED # is closed? - return unless (m & MASK) == REF # was the last ref? - - # the last ref after close is responsible to resume the closing fiber - @closing.not_nil!("BUG: expected a closing fiber to resume.").enqueue - end - - # Closes the fdlock. Blocks for as long as there are references. - # - # The *callback* block must cancel any external waiters (e.g. pending evloop - # reads or writes). - # - # Returns true if the fdlock has been closed: no fiber can acquire a reference - # anymore, the calling fiber fully owns the fd and can safely close it. - # - # Returns false if the fdlock has already been closed: the calling fiber - # doesn't own the fd and musn't close it, as there might still be active - # references and another fiber will close anyway. - def try_close?(&callback : ->) : Bool - attempts = 0 - - while true - m = @m.get(:relaxed) - - if (m & CLOSED) == CLOSED - # already closed: abort - return false - end - - # close + increment ref - m, success = @m.compare_and_set(m, (m + REF) | CLOSED, :acquire, :relaxed) - break if success - - attempts = Thread.delay(attempts) - end - - # set the current fiber as the closing fiber (to be resumed by the last ref) - @closing = Fiber.current - - # decrement the last ref - m = @m.sub(REF, :release) - - begin - yield - ensure - # wait for the last ref... unless we're the last ref! - Fiber.suspend unless (m & MASK) == REF - end - - @closing = nil - true - end - - # Resets the fdlock back to its pristine state so it can be used again. - # Assumes the caller owns the fdlock. This is required by - # `TCPSocket#initialize`. - def reset : Nil - @m.lazy_set(0_u32) - @closing = nil - end - - def closed? : Bool - (@m.get(:relaxed) & CLOSED) == CLOSED - end -end diff --git a/src/crystal/fd_lock_no_mt.cr b/src/crystal/fd_lock_no_mt.cr deleted file mode 100644 index 14a3cad6720b..000000000000 --- a/src/crystal/fd_lock_no_mt.cr +++ /dev/null @@ -1,35 +0,0 @@ -# :nodoc: -# -# Simpler, but thread unsafe, alternative to Crystal::FdLock that only -# serializes reads and writes and otherwise doesn't count references or waits -# for references before closing. This is mostly needed for the interpreter that -# happens to segfault with the thread safe alternative (see fd_lock_mt). -struct Crystal::FdLock - CLOSED = 1_u8 << 0 - - @m = 0_u8 - - def reference(&) - raise IO::Error.new("Closed") if closed? - yield - end - - def reset : Nil - @m = 0_u8 - end - - def closed? - (@m & CLOSED) == CLOSED - end - - def try_close?(&) - if closed? - false - else - @m |= CLOSED - - yield - true - end - end -end diff --git a/src/crystal/system/unix/file.cr b/src/crystal/system/unix/file.cr index 2dae08579727..ac8ef8edcc94 100644 --- a/src/crystal/system/unix/file.cr +++ b/src/crystal/system/unix/file.cr @@ -109,7 +109,7 @@ module Crystal::System::File end private def system_chown(uid : Int, gid : Int) - ret = LibC.fchown(fd, uid, gid) + ret = @fd_lock.reference { LibC.fchown(fd, uid, gid) } raise ::File::Error.from_errno("Error changing owner", file: path) if ret == -1 end @@ -120,7 +120,7 @@ module Crystal::System::File end private def system_chmod(mode) - if LibC.fchmod(fd, mode) == -1 + if @fd_lock.reference { LibC.fchmod(fd, mode) } == -1 raise ::File::Error.from_errno("Error changing permissions", file: path) end end @@ -201,12 +201,12 @@ module Crystal::System::File timespecs = uninitialized LibC::Timespec[2] timespecs[0] = Crystal::System::Time.to_timespec(atime) timespecs[1] = Crystal::System::Time.to_timespec(mtime) - LibC.futimens(fd, timespecs) + @fd_lock.reference { LibC.futimens(fd, timespecs) } {% elsif LibC.has_method?("futimes") %} timevals = uninitialized LibC::Timeval[2] timevals[0] = Crystal::System::Time.to_timeval(atime) timevals[1] = Crystal::System::Time.to_timeval(mtime) - LibC.futimes(fd, timevals) + @fd_lock.reference { LibC.futimes(fd, timevals) } {% else %} {% raise "Missing futimens & futimes" %} {% end %} @@ -218,7 +218,7 @@ module Crystal::System::File private def system_truncate(size) : Nil flush - code = LibC.ftruncate(fd, size) + code = @fd_lock.reference { LibC.ftruncate(fd, size) } if code != 0 raise ::File::Error.from_errno("Error truncating file", file: path) end diff --git a/src/crystal/system/unix/file_descriptor.cr b/src/crystal/system/unix/file_descriptor.cr index 003d036985e6..9dec85effe02 100644 --- a/src/crystal/system/unix/file_descriptor.cr +++ b/src/crystal/system/unix/file_descriptor.cr @@ -3,6 +3,7 @@ require "termios" {% if flag?(:android) && LibC::ANDROID_API < 28 %} require "c/sys/ioctl" {% end %} +require "crystal/fd_lock" # :nodoc: module Crystal::System::FileDescriptor @@ -18,13 +19,15 @@ module Crystal::System::FileDescriptor STDOUT_HANDLE = 1 STDERR_HANDLE = 2 + @fd_lock = FdLock.new + private def system_blocking? - flags = system_fcntl(LibC::F_GETFL) + flags = FileDescriptor.fcntl(fd, LibC::F_GETFL) !flags.bits_set? LibC::O_NONBLOCK end private def system_blocking=(value) - FileDescriptor.set_blocking(fd, value) + @fd_lock.reference { FileDescriptor.set_blocking(fd, value) } end protected def self.get_blocking(fd : Handle) @@ -56,7 +59,7 @@ module Crystal::System::FileDescriptor end private def system_close_on_exec? - flags = system_fcntl(LibC::F_GETFD) + flags = FileDescriptor.fcntl(fd, LibC::F_GETFD) flags.bits_set? LibC::FD_CLOEXEC end @@ -76,7 +79,7 @@ module Crystal::System::FileDescriptor end private def system_fcntl(cmd, arg = 0) - FileDescriptor.fcntl(fd, cmd, arg) + @fd_lock.reference { FileDescriptor.fcntl(fd, cmd, arg) } end def self.system_info(fd) @@ -91,11 +94,11 @@ module Crystal::System::FileDescriptor end private def system_info - FileDescriptor.system_info fd + @fd_lock.reference { FileDescriptor.system_info(fd) } end private def system_seek(offset, whence : IO::Seek) : Nil - seek_value = LibC.lseek(fd, offset, whence) + seek_value = @fd_lock.reference { LibC.lseek(fd, offset, whence) } if seek_value == -1 raise IO::Error.from_errno "Unable to seek", target: self @@ -113,19 +116,23 @@ module Crystal::System::FileDescriptor end private def system_reopen(other : IO::FileDescriptor) - {% if LibC.has_method?(:dup3) %} - flags = other.close_on_exec? ? LibC::O_CLOEXEC : 0 - if LibC.dup3(other.fd, fd, flags) == -1 - raise IO::Error.from_errno("Could not reopen file descriptor") - end - {% else %} - Process.lock_read do - if LibC.dup2(other.fd, fd) == -1 - raise IO::Error.from_errno("Could not reopen file descriptor") - end - self.close_on_exec = other.close_on_exec? + other.@fd_lock.reference do + @fd_lock.reference do + {% if LibC.has_method?(:dup3) %} + flags = other.close_on_exec? ? LibC::O_CLOEXEC : 0 + if LibC.dup3(other.fd, fd, flags) == -1 + raise IO::Error.from_errno("Could not reopen file descriptor") + end + {% else %} + Process.lock_read do + if LibC.dup2(other.fd, fd) == -1 + raise IO::Error.from_errno("Could not reopen file descriptor") + end + self.close_on_exec = other.close_on_exec? + end + {% end %} end - {% end %} + end # Mark the handle open, since we had to have dup'd a live handle. @closed = false @@ -134,8 +141,9 @@ module Crystal::System::FileDescriptor end private def system_close - event_loop.shutdown(self) - event_loop.close(self) + if @fd_lock.try_close? { event_loop.shutdown(self) } + event_loop.close(self) + end end def file_descriptor_close(&) : Nil @@ -196,7 +204,7 @@ module Crystal::System::FileDescriptor end private def flock(op) : Bool - if 0 == LibC.flock(fd, op) + if 0 == @fd_lock.reference { LibC.flock(fd, op) } true else errno = Errno.value @@ -209,7 +217,7 @@ module Crystal::System::FileDescriptor end private def system_fsync(flush_metadata = true) : Nil - ret = + ret = @fd_lock.reference do if flush_metadata LibC.fsync(fd) else @@ -219,6 +227,7 @@ module Crystal::System::FileDescriptor LibC.fdatasync(fd) {% end %} end + end if ret != 0 raise IO::Error.from_errno("Error syncing file", target: self) @@ -246,7 +255,9 @@ module Crystal::System::FileDescriptor end def self.pread(file, buffer, offset) - bytes_read = LibC.pread(file.fd, buffer, buffer.size, offset).to_i64 + bytes_read = file.@fd_lock.reference do + LibC.pread(file.fd, buffer, buffer.size, offset).to_i64 + end if bytes_read == -1 raise IO::Error.from_errno("Error reading file", target: file) @@ -351,7 +362,7 @@ module Crystal::System::FileDescriptor @[AlwaysInline] private def system_tcsetattr(optional_actions, termios_p) {% if LibC.has_method?(:tcsetattr) %} - LibC.tcsetattr(fd, optional_actions, termios_p) + @fd_lock.reference { LibC.tcsetattr(fd, optional_actions, termios_p) } {% else %} optional_actions = optional_actions.value if optional_actions.is_a?(Termios::LineControl) cmd = case optional_actions @@ -366,7 +377,7 @@ module Crystal::System::FileDescriptor return LibC::Int.new(-1) end - LibC.ioctl(fd, cmd, termios_p) + @fd_lock.reference { LibC.ioctl(fd, cmd, termios_p) } {% end %} end @@ -385,4 +396,12 @@ module Crystal::System::FileDescriptor {% end %} termios end + + private def system_read(slice : Bytes) : Int32 + @fd_lock.reference { event_loop.read(self, slice) } + end + + private def system_write(slice : Bytes) : Int32 + @fd_lock.reference { event_loop.write(self, slice) } + end end diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index f0205930b1c1..6ff2b4ea8dfc 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -1,6 +1,7 @@ require "c/netdb" require "c/netinet/tcp" require "c/sys/socket" +require "crystal/fd_lock" module Crystal::System::Socket {% if IO.has_constant?(:Evented) %} @@ -9,6 +10,8 @@ module Crystal::System::Socket alias Handle = Int32 + @fd_lock = FdLock.new + def self.socket(family, type, protocol, blocking) : Handle {% if LibC.has_constant?(:SOCK_CLOEXEC) %} flags = type.value | LibC::SOCK_CLOEXEC @@ -36,29 +39,29 @@ module Crystal::System::Socket # Tries to bind the socket to a local address. # Yields an `Socket::BindError` if the binding failed. private def system_bind(addr, addrstr, &) - unless LibC.bind(fd, addr, addr.size) == 0 + unless @fd_lock.reference { LibC.bind(fd, addr, addr.size) } == 0 yield ::Socket::BindError.from_errno("Could not bind to '#{addrstr}'") end end private def system_listen(backlog, &) - unless LibC.listen(fd, backlog) == 0 + unless @fd_lock.reference { LibC.listen(fd, backlog) } == 0 yield ::Socket::Error.from_errno("Listen failed") end end private def system_accept : {Handle, Bool}? - event_loop.accept(self) + @fd_lock.reference { event_loop.accept(self) } end private def system_close_read - if LibC.shutdown(fd, LibC::SHUT_RD) != 0 + if @fd_lock.reference { LibC.shutdown(fd, LibC::SHUT_RD) } != 0 raise ::Socket::Error.from_errno("shutdown read") end end private def system_close_write - if LibC.shutdown(fd, LibC::SHUT_WR) != 0 + if @fd_lock.reference { LibC.shutdown(fd, LibC::SHUT_WR) } != 0 raise ::Socket::Error.from_errno("shutdown write") end end @@ -154,7 +157,9 @@ module Crystal::System::Socket private def system_setsockopt(optname, optval, level = LibC::SOL_SOCKET) optsize = LibC::SocklenT.new(sizeof(typeof(optval))) - ret = LibC.setsockopt(fd, level, optname, pointerof(optval), optsize) + ret = @fd_lock.reference do + LibC.setsockopt(fd, level, optname, pointerof(optval), optsize) + end raise ::Socket::Error.from_errno("setsockopt #{optname}") if ret == -1 ret end @@ -164,7 +169,9 @@ module Crystal::System::Socket end private def system_blocking=(value) - FileDescriptor.set_blocking(fd, value) + @fd_lock.reference do + FileDescriptor.set_blocking(fd, value) + end end def self.get_blocking(fd : Handle) @@ -176,7 +183,7 @@ module Crystal::System::Socket end private def system_close_on_exec? - flags = system_fcntl(LibC::F_GETFD) + flags = FileDescriptor.fcntl(fd, LibC::F_GETFD) (flags & LibC::FD_CLOEXEC) == LibC::FD_CLOEXEC end @@ -190,7 +197,7 @@ module Crystal::System::Socket end private def system_fcntl(cmd, arg = 0) - FileDescriptor.fcntl(fd, cmd, arg) + @fd_lock.reference { FileDescriptor.fcntl(fd, cmd, arg) } end def self.socketpair(type : ::Socket::Type, protocol : ::Socket::Protocol, blocking : Bool) : {Handle, Handle} @@ -224,8 +231,10 @@ module Crystal::System::Socket end private def system_close - event_loop.shutdown(self) - event_loop.close(self) + if @fd_lock.try_close? { event_loop.shutdown(self) } + event_loop.close(self) + @fd_lock.reset + end end def socket_close(&) @@ -348,4 +357,24 @@ module Crystal::System::Socket val end {% end %} + + private def system_send_to(bytes : Bytes, addr : ::Socket::Address) + @fd_lock.reference { event_loop.send_to(self, bytes, addr) } + end + + private def system_receive_from(bytes : Bytes) : Tuple(Int32, ::Socket::Address) + @fd_lock.reference { event_loop.receive_from(self, bytes) } + end + + private def system_connect(addr, timeout = nil) + @fd_lock.reference { event_loop.connect(self, addr, timeout) } + end + + private def system_read(slice : Bytes) : Int32 + @fd_lock.reference { event_loop.read(self, slice) } + end + + private def system_write(slice : Bytes) : Int32 + @fd_lock.reference { event_loop.write(self, slice) } + end end diff --git a/src/crystal/system/wasi/socket.cr b/src/crystal/system/wasi/socket.cr index baf5cbfb5414..f8a0576c7085 100644 --- a/src/crystal/system/wasi/socket.cr +++ b/src/crystal/system/wasi/socket.cr @@ -37,6 +37,10 @@ module Crystal::System::Socket end end + private def system_accept : {::Socket::Handle, Bool}? + event_loop.accept(self) + end + private def system_send_buffer_size : Int raise NotImplementedError.new "Crystal::System::Socket#system_send_buffer_size" end