Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,64 @@ function wait_readnb(s::BufferStream, nb::Int)
end
end

function readavailable(this::BufferStream)
bytes = lock(this.cond) do
wait_readnb(this, 1)
buf = this.buffer
@assert buf.seekable == false
take!(buf)
end
return bytes
end

function read(stream::BufferStream)
bytes = lock(stream.cond) do
wait_close(stream)
take!(stream.buffer)
end
return bytes
end

function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int)
sbuf = s.buffer
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

function wait_locked(s, buf, nb)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || break
s.status != StatusEOF || break
wait_readnb(s, nb)
end
end

bytes = lock(s.cond) do
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_locked(s, sbuf, nb)
end
if bytesavailable(sbuf) >= nb
nread = readbytes!(sbuf, a, nb)
else
initsize = length(a)
newbuf = PipeBuffer(a, maxsize=nb)
newbuf.size = newbuf.offset # reset the write pointer to the beginning
nread = try
s.buffer = newbuf
write(newbuf, sbuf)
wait_locked(s, newbuf, nb)
bytesavailable(newbuf)
finally
s.buffer = sbuf
end
_take!(a, _unsafe_take!(newbuf))
length(a) >= initsize || resize!(a, initsize)
end
return nread
end
return bytes
end

show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")

function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
Expand Down