From 057880898e4f14b7b1109ef34f598749aa6b4e8b Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 27 Jan 2025 13:42:50 +0100 Subject: [PATCH 1/5] Add EventLoop#wait_[readable|writable] methods --- src/crystal/event_loop/file_descriptor.cr | 6 ++++ src/crystal/event_loop/iocp.cr | 22 ++++++++++++++ src/crystal/event_loop/libevent.cr | 35 ++++++++++++++++++++--- src/crystal/event_loop/polling.cr | 24 ++++++++++++++++ src/crystal/event_loop/wasi.cr | 24 ++++++++++++++++ src/io/evented.cr | 14 ++------- 6 files changed, 109 insertions(+), 16 deletions(-) diff --git a/src/crystal/event_loop/file_descriptor.cr b/src/crystal/event_loop/file_descriptor.cr index 0304f7d9b969..75a455a89c9a 100644 --- a/src/crystal/event_loop/file_descriptor.cr +++ b/src/crystal/event_loop/file_descriptor.cr @@ -9,6 +9,9 @@ abstract class Crystal::EventLoop # Returns 0 when EOF is reached. abstract def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 + # Blocks the current fiber until the file descriptor is ready for read. + abstract def wait_readable(file_descriptor Crystal::System::FileDescriptor) : Nil + # Writes at least one byte from *slice* to the file descriptor. # # Blocks the current fiber if the file descriptor isn't ready for writing, @@ -17,6 +20,9 @@ abstract class Crystal::EventLoop # Returns the number of bytes written (up to `slice.size`). abstract def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 + # Blocks the current fiber until the file descriptor is ready for write. + abstract def wait_writable(file_descriptor Crystal::System::FileDescriptor) : Nil + # Closes the file descriptor resource. abstract def close(file_descriptor : Crystal::System::FileDescriptor) : Nil end diff --git a/src/crystal/event_loop/iocp.cr b/src/crystal/event_loop/iocp.cr index 6e4175e3daee..da827079312a 100644 --- a/src/crystal/event_loop/iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -190,6 +190,10 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end.to_i32 end + def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil + raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(FileDescriptor)") + end + def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped| ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped) @@ -197,6 +201,10 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop end.to_i32 end + def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil + raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(FileDescriptor)") + end + def close(file_descriptor : Crystal::System::FileDescriptor) : Nil LibC.CancelIoEx(file_descriptor.windows_handle, nil) unless file_descriptor.system_blocking? end @@ -220,6 +228,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop bytes_read.to_i32 end + def wait_readable(socket : ::Socket) : Nil + # NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to + # a completion port and be notified of socket readiness. See + # + raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(Socket)") + end + def write(socket : ::Socket, slice : Bytes) : Int32 wsabuf = wsa_buffer(slice) @@ -231,6 +246,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop bytes.to_i32 end + def wait_writable(socket : ::Socket) : Nil + # NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to + # a completion port and be notified of socket readiness. See + # + raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(Socket)") + end + def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32 wsabuf = wsa_buffer(slice) bytes_written = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped| diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index 9c0b3d33b15c..f57685a41959 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -84,6 +84,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end + def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil + file_descriptor.evented_wait_readable(raise_if_closed: false) do + raise IO::TimeoutError.new("Read timed out") + end + end + def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 evented_write(file_descriptor, "Error writing file_descriptor") do LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code| @@ -94,6 +100,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end + def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil + file_descriptor.evented_wait_writable do + raise IO::TimeoutError.new("Write timed out") + end + end + def close(file_descriptor : Crystal::System::FileDescriptor) : Nil file_descriptor.evented_close end @@ -104,12 +116,23 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end end + def wait_readable(socket : ::Socket) : Nil + socket.evented_wait_readable(raise_if_closed: false) do + raise IO::TimeoutError.new("Read timed out") + end + end + def write(socket : ::Socket, slice : Bytes) : Int32 evented_write(socket, "Error writing to socket") do LibC.send(socket.fd, slice, slice.size, 0).to_i32 end end + def wait_writable(socket : ::Socket) : Nil + socket.evented_wait_writable do + raise IO::TimeoutError.new("Write timed out") + end + def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) # initialize sockaddr with the initialized family of the socket @@ -142,7 +165,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop when Errno::EISCONN return when Errno::EINPROGRESS, Errno::EALREADY - socket.wait_writable(timeout: timeout) do + socket.evented_wait_writable(timeout: timeout) do return IO::TimeoutError.new("connect timed out") end else @@ -174,7 +197,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop if socket.closed? return elsif Errno.value == Errno::EAGAIN - socket.wait_readable(raise_if_closed: false) do + socket.evented_wait_readable(raise_if_closed: false) do raise IO::TimeoutError.new("Accept timed out") end return if socket.closed? @@ -200,7 +223,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end if Errno.value == Errno::EAGAIN - target.wait_readable + target.evented_wait_readable do + raise IO::TimeoutError.new("Read timed out") + end else raise IO::Error.from_errno(errno_msg, target: target) end @@ -218,7 +243,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop end if Errno.value == Errno::EAGAIN - target.wait_writable + target.evented_wait_writable do + raise IO::TimeoutError.new("Write timed out") + end else raise IO::Error.from_errno(errno_msg, target: target) end diff --git a/src/crystal/event_loop/polling.cr b/src/crystal/event_loop/polling.cr index 3eb17c0e313e..4df9eff7bc8e 100644 --- a/src/crystal/event_loop/polling.cr +++ b/src/crystal/event_loop/polling.cr @@ -146,6 +146,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end end + def wait_readable(file_descriptor : System::FileDescriptor) : Nil + wait_readable(file_descriptor, file_descriptor.@read_timeout) do + raise IO::TimeoutError.new + end + end + def write(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32 size = evented_write(file_descriptor, slice, file_descriptor.@write_timeout) @@ -160,6 +166,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop end end + def wait_writable(file_descriptor : System::FileDescriptor) : Nil + wait_writable(file_descriptor, file_descriptor.@write_timeout) do + raise IO::TimeoutError.new + end + end + def close(file_descriptor : System::FileDescriptor) : Nil evented_close(file_descriptor) end @@ -176,12 +188,24 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop size end + def wait_readable(socket : ::Socket) : Nil + wait_readable(socket, socket.@read_timeout) do + raise IO::TimeoutError.new + end + end + def write(socket : ::Socket, slice : Bytes) : Int32 size = evented_write(socket, slice, socket.@write_timeout) raise IO::Error.from_errno("write", target: socket) if size == -1 size end + def wait_writable(socket : ::Socket) : Nil + wait_writable(socket, socket.@write_timeout) do + raise IO::TimeoutError.new + end + end + def accept(socket : ::Socket) : ::Socket::Handle? loop do client_fd = diff --git a/src/crystal/event_loop/wasi.cr b/src/crystal/event_loop/wasi.cr index 08781b4fb950..1465dbf880ba 100644 --- a/src/crystal/event_loop/wasi.cr +++ b/src/crystal/event_loop/wasi.cr @@ -39,6 +39,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop end end + def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil + file_descriptor.evented_wait_readable(raise_if_closed: false) do + raise IO::TimeoutError.new("Read timed out") + end + end + def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 evented_write(file_descriptor, "Error writing file_descriptor") do LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code| @@ -49,6 +55,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop end end + def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil + file_descriptor.evented_wait_writable(raise_if_closed: false) do + raise IO::TimeoutError.new("Write timed out") + end + end + def close(file_descriptor : Crystal::System::FileDescriptor) : Nil file_descriptor.evented_close end @@ -59,12 +71,24 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop end end + def wait_readable(socket : ::Socket) : Nil + socket.evented_wait_readable do + raise IO::TimeoutError.new("Read timed out") + end + end + def write(socket : ::Socket, slice : Bytes) : Int32 evented_write(socket, "Error writing to socket") do LibC.send(socket.fd, slice, slice.size, 0) end end + def wait_writable(socket : ::Socket) : Nil + socket.evented_wait_writable do + raise IO::TimeoutError.new("Write timed out") + end + end + def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from" end diff --git a/src/io/evented.cr b/src/io/evented.cr index 1f95d1870b0b..635c399d9239 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -34,12 +34,7 @@ module IO::Evented end # :nodoc: - def wait_readable(timeout = @read_timeout) : Nil - wait_readable(timeout: timeout) { raise TimeoutError.new("Read timed out") } - end - - # :nodoc: - def wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil + def evented_wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil readers = @readers.get { Deque(Fiber).new } readers << Fiber.current add_read_event(timeout) @@ -59,12 +54,7 @@ module IO::Evented end # :nodoc: - def wait_writable(timeout = @write_timeout) : Nil - wait_writable(timeout: timeout) { raise TimeoutError.new("Write timed out") } - end - - # :nodoc: - def wait_writable(timeout = @write_timeout, &) : Nil + def evented_wait_writable(timeout = @write_timeout, &) : Nil writers = @writers.get { Deque(Fiber).new } writers << Fiber.current add_write_event(timeout) From ea32e8b3e3edaa02462fb0b8d7b23600e0d1d913 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 27 Jan 2025 14:58:53 +0100 Subject: [PATCH 2/5] Fix: syntax errors --- src/crystal/event_loop/file_descriptor.cr | 4 ++-- src/crystal/event_loop/libevent.cr | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/crystal/event_loop/file_descriptor.cr b/src/crystal/event_loop/file_descriptor.cr index 75a455a89c9a..633fa180db68 100644 --- a/src/crystal/event_loop/file_descriptor.cr +++ b/src/crystal/event_loop/file_descriptor.cr @@ -10,7 +10,7 @@ abstract class Crystal::EventLoop abstract def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 # Blocks the current fiber until the file descriptor is ready for read. - abstract def wait_readable(file_descriptor Crystal::System::FileDescriptor) : Nil + abstract def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil # Writes at least one byte from *slice* to the file descriptor. # @@ -21,7 +21,7 @@ abstract class Crystal::EventLoop abstract def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 # Blocks the current fiber until the file descriptor is ready for write. - abstract def wait_writable(file_descriptor Crystal::System::FileDescriptor) : Nil + abstract def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil # Closes the file descriptor resource. abstract def close(file_descriptor : Crystal::System::FileDescriptor) : Nil diff --git a/src/crystal/event_loop/libevent.cr b/src/crystal/event_loop/libevent.cr index f57685a41959..636d01331624 100644 --- a/src/crystal/event_loop/libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -132,6 +132,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop socket.evented_wait_writable do raise IO::TimeoutError.new("Write timed out") end + end def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) From 930d52151c7fb6e3e3be5a9d31ee0900f343e044 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 27 Jan 2025 13:44:40 +0100 Subject: [PATCH 3/5] Add #wait_[readable|writable] to IO::FileDescriptor and Socket --- src/crystal/event_loop/socket.cr | 6 ++++++ src/crystal/system/file_descriptor.cr | 8 ++++++++ src/io/file_descriptor.cr | 14 ++++++++++++++ src/socket.cr | 14 ++++++++++++++ 4 files changed, 42 insertions(+) diff --git a/src/crystal/event_loop/socket.cr b/src/crystal/event_loop/socket.cr index 1f4fc629d8ca..756592794b08 100644 --- a/src/crystal/event_loop/socket.cr +++ b/src/crystal/event_loop/socket.cr @@ -15,6 +15,9 @@ abstract class Crystal::EventLoop # Use `#receive_from` for capturing the source address of a message. abstract def read(socket : ::Socket, slice : Bytes) : Int32 + # Blocks the current fiber until the socket is ready for read. + abstract def wait_readable(socket : ::Socket) : Nil + # Writes at least one byte from *slice* to the socket. # # Blocks the current fiber if the socket is not ready for writing, @@ -25,6 +28,9 @@ abstract class Crystal::EventLoop # Use `#send_to` for sending a message to a specific target address. abstract def write(socket : ::Socket, slice : Bytes) : Int32 + # Blocks the current fiber until the file descriptor is ready for write. + abstract def wait_writable(socket : ::Socket) : Nil + # Accepts an incoming TCP connection on the socket. # # Blocks the current fiber if no connection is waiting, continuing when one diff --git a/src/crystal/system/file_descriptor.cr b/src/crystal/system/file_descriptor.cr index 03868bc07034..5aa0003962fc 100644 --- a/src/crystal/system/file_descriptor.cr +++ b/src/crystal/system/file_descriptor.cr @@ -39,6 +39,14 @@ module Crystal::System::FileDescriptor event_loop.write(self, slice) end + private def system_wait_readable : Nil + event_loop.wait_readable(self) + end + + private def system_wait_writable : Nil + event_loop.wait_writable(self) + end + private def event_loop? : Crystal::EventLoop::FileDescriptor? Crystal::EventLoop.current? end diff --git a/src/io/file_descriptor.cr b/src/io/file_descriptor.cr index 82c2b8ac232f..ad64cf480776 100644 --- a/src/io/file_descriptor.cr +++ b/src/io/file_descriptor.cr @@ -315,4 +315,18 @@ class IO::FileDescriptor < IO private def unbuffered_flush : Nil # Nothing end + + # Blocks the current fiber until the socket is ready for read. Raises + # `TimeoutError` when the configured `#read_timeout` has elapsed, otherwise + # returns normally. + def wait_readable : Nil + system_wait_readable + end + + # Blocks the current fiber until the socket is ready for write. Raises + # `TimeoutError` when the configured `#write_timeout` has elapsed, otherwise + # returns normally. + def wait_writable : Nil + system_wait_writable + end end diff --git a/src/socket.cr b/src/socket.cr index b862c30e2f9e..b053ccff0db1 100644 --- a/src/socket.cr +++ b/src/socket.cr @@ -467,6 +467,20 @@ class Socket < IO private def unbuffered_flush : Nil # Nothing end + + # Blocks the current fiber until the socket is ready for read. Raises + # `TimeoutError` when the configured `#read_timeout` has elapsed, otherwise + # returns normally. + def wait_readable : Nil + system_wait_readable + end + + # Blocks the current fiber until the socket is ready for write. Raises + # `TimeoutError` when the configured `#write_timeout` has elapsed, otherwise + # returns normally. + def wait_writable : Nil + system_wait_writable + end end require "./socket/*" From a15f3234931c983f176d8abb425d20d27eb311e4 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 20 Feb 2025 12:28:24 +0100 Subject: [PATCH 4/5] Fix: missing System::Socket#wait_* methods --- src/crystal/system/socket.cr | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index 54648f17f7db..ce52e288e5cd 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -89,6 +89,14 @@ module Crystal::System::Socket event_loop.write(self, slice) end + private def system_wait_readable : Nil + event_loop.wait_readable(self) + end + + private def system_wait_writable : Nil + event_loop.wait_writable(self) + end + # private def system_close # Closes the internal handle without notifying the event loop. From 6c65c43e66e4b3800bfc2bcfc5bfe73a69711fa9 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 20 Feb 2025 12:28:43 +0100 Subject: [PATCH 5/5] Fix: docs for IO::FileDescriptor#wait_* methods --- src/io/file_descriptor.cr | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/io/file_descriptor.cr b/src/io/file_descriptor.cr index ad64cf480776..4494dffec0fa 100644 --- a/src/io/file_descriptor.cr +++ b/src/io/file_descriptor.cr @@ -316,16 +316,16 @@ class IO::FileDescriptor < IO # Nothing end - # Blocks the current fiber until the socket is ready for read. Raises - # `TimeoutError` when the configured `#read_timeout` has elapsed, otherwise - # returns normally. + # Blocks the current fiber until the file descriptor is ready for read. + # Raises `TimeoutError` when the configured `#read_timeout` has elapsed, + # otherwise returns normally. def wait_readable : Nil system_wait_readable end - # Blocks the current fiber until the socket is ready for write. Raises - # `TimeoutError` when the configured `#write_timeout` has elapsed, otherwise - # returns normally. + # Blocks the current fiber until the file descriptor is ready for write. + # Raises `TimeoutError` when the configured `#write_timeout` has elapsed, + # otherwise returns normally. def wait_writable : Nil system_wait_writable end