Skip to content

Commit

Permalink
Add EventLoop#wait_[readable|writable] methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ysbaddaden committed Jan 27, 2025
1 parent 9e475f6 commit 0578808
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/crystal/event_loop/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ abstract class Crystal::EventLoop
# Returns 0 when EOF is reached.
abstract def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for read.
abstract def wait_readable(file_descriptor Crystal::System::FileDescriptor) : Nil

Check failure on line 13 in src/crystal/event_loop/file_descriptor.cr

View workflow job for this annotation

GitHub Actions / x86_64-windows-release / build

expecting token ')', not 'Crystal'

# Writes at least one byte from *slice* to the file descriptor.
#
# Blocks the current fiber if the file descriptor isn't ready for writing,
Expand All @@ -17,6 +20,9 @@ abstract class Crystal::EventLoop
# Returns the number of bytes written (up to `slice.size`).
abstract def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32

# Blocks the current fiber until the file descriptor is ready for write.
abstract def wait_writable(file_descriptor Crystal::System::FileDescriptor) : Nil

# Closes the file descriptor resource.
abstract def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
end
Expand Down
22 changes: 22 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,21 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
end.to_i32
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(FileDescriptor)")
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end.to_i32
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(FileDescriptor)")
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
LibC.CancelIoEx(file_descriptor.windows_handle, nil) unless file_descriptor.system_blocking?
end
Expand All @@ -220,6 +228,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes_read.to_i32
end

def wait_readable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_readable(Socket)")
end

def write(socket : ::Socket, slice : Bytes) : Int32
wsabuf = wsa_buffer(slice)

Expand All @@ -231,6 +246,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
bytes.to_i32
end

def wait_writable(socket : ::Socket) : Nil
# NOTE: Windows 10+ has `ProcessSocketNotifications` to associate sockets to
# a completion port and be notified of socket readiness. See
# <https://learn.microsoft.com/en-us/windows/win32/winsock/winsock-socket-state-notifications>
raise NotImplementedError.new("Crystal::System::IOCP#wait_writable(Socket)")
end

def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32
wsabuf = wsa_buffer(slice)
bytes_written = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped|
Expand Down
35 changes: 31 additions & 4 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -94,6 +100,12 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -104,12 +116,23 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0).to_i32
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
Expand Down Expand Up @@ -142,7 +165,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
socket.wait_writable(timeout: timeout) do
socket.evented_wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
Expand Down Expand Up @@ -174,7 +197,7 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
if socket.closed?
return
elsif Errno.value == Errno::EAGAIN
socket.wait_readable(raise_if_closed: false) do
socket.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
return if socket.closed?
Expand All @@ -200,7 +223,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_readable
target.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand All @@ -218,7 +243,9 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
end

if Errno.value == Errno::EAGAIN
target.wait_writable
target.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
else
raise IO::Error.from_errno(errno_msg, target: target)
end
Expand Down
24 changes: 24 additions & 0 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : System::FileDescriptor) : Nil
wait_readable(file_descriptor, file_descriptor.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(file_descriptor : System::FileDescriptor, slice : Bytes) : Int32
size = evented_write(file_descriptor, slice, file_descriptor.@write_timeout)

Expand All @@ -160,6 +166,12 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : System::FileDescriptor) : Nil
wait_writable(file_descriptor, file_descriptor.@write_timeout) do
raise IO::TimeoutError.new
end
end

def close(file_descriptor : System::FileDescriptor) : Nil
evented_close(file_descriptor)
end
Expand All @@ -176,12 +188,24 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
size
end

def wait_readable(socket : ::Socket) : Nil
wait_readable(socket, socket.@read_timeout) do
raise IO::TimeoutError.new
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
size = evented_write(socket, slice, socket.@write_timeout)
raise IO::Error.from_errno("write", target: socket) if size == -1
size
end

def wait_writable(socket : ::Socket) : Nil
wait_writable(socket, socket.@write_timeout) do
raise IO::TimeoutError.new
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
loop do
client_fd =
Expand Down
24 changes: 24 additions & 0 deletions src/crystal/event_loop/wasi.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32
evented_write(file_descriptor, "Error writing file_descriptor") do
LibC.write(file_descriptor.fd, slice, slice.size).tap do |return_code|
Expand All @@ -49,6 +55,12 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_writable(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_wait_writable(raise_if_closed: false) do
raise IO::TimeoutError.new("Write timed out")
end
end

def close(file_descriptor : Crystal::System::FileDescriptor) : Nil
file_descriptor.evented_close
end
Expand All @@ -59,12 +71,24 @@ class Crystal::EventLoop::Wasi < Crystal::EventLoop
end
end

def wait_readable(socket : ::Socket) : Nil
socket.evented_wait_readable do
raise IO::TimeoutError.new("Read timed out")
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
evented_write(socket, "Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0)
end
end

def wait_writable(socket : ::Socket) : Nil
socket.evented_wait_writable do
raise IO::TimeoutError.new("Write timed out")
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from"
end
Expand Down
14 changes: 2 additions & 12 deletions src/io/evented.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ module IO::Evented
end

# :nodoc:
def wait_readable(timeout = @read_timeout) : Nil
wait_readable(timeout: timeout) { raise TimeoutError.new("Read timed out") }
end

# :nodoc:
def wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
def evented_wait_readable(timeout = @read_timeout, *, raise_if_closed = true, &) : Nil
readers = @readers.get { Deque(Fiber).new }
readers << Fiber.current
add_read_event(timeout)
Expand All @@ -59,12 +54,7 @@ module IO::Evented
end

# :nodoc:
def wait_writable(timeout = @write_timeout) : Nil
wait_writable(timeout: timeout) { raise TimeoutError.new("Write timed out") }
end

# :nodoc:
def wait_writable(timeout = @write_timeout, &) : Nil
def evented_wait_writable(timeout = @write_timeout, &) : Nil
writers = @writers.get { Deque(Fiber).new }
writers << Fiber.current
add_write_event(timeout)
Expand Down

0 comments on commit 0578808

Please sign in to comment.