diff --git a/base/stream.jl b/base/stream.jl index e81f65685df72..33d884018d5ad 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -1559,6 +1559,64 @@ function wait_readnb(s::BufferStream, nb::Int) end end +function readavailable(this::BufferStream) + bytes = lock(this.cond) do + wait_readnb(this, 1) + buf = this.buffer + @assert buf.seekable == false + take!(buf) + end + return bytes +end + +function read(stream::BufferStream) + bytes = lock(stream.cond) do + wait_close(stream) + take!(stream.buffer) + end + return bytes +end + +function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int) + sbuf = s.buffer + @assert sbuf.seekable == false + @assert sbuf.maxsize >= nb + + function wait_locked(s, buf, nb) + while bytesavailable(buf) < nb + s.readerror === nothing || throw(s.readerror) + isopen(s) || break + s.status != StatusEOF || break + wait_readnb(s, nb) + end + end + + bytes = lock(s.cond) do + if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer + wait_locked(s, sbuf, nb) + end + if bytesavailable(sbuf) >= nb + nread = readbytes!(sbuf, a, nb) + else + initsize = length(a) + newbuf = PipeBuffer(a, maxsize=nb) + newbuf.size = newbuf.offset # reset the write pointer to the beginning + nread = try + s.buffer = newbuf + write(newbuf, sbuf) + wait_locked(s, newbuf, nb) + bytesavailable(newbuf) + finally + s.buffer = sbuf + end + _take!(a, _unsafe_take!(newbuf)) + length(a) >= initsize || resize!(a, initsize) + end + return nread + end + return bytes +end + show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")") function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)