diff --git a/Project.toml b/Project.toml index 61de90e..40a0a59 100644 --- a/Project.toml +++ b/Project.toml @@ -4,5 +4,9 @@ license = "MIT" authors = ["Kenta Sato "] version = "0.11.3" +[deps] +ChunkCodecCore = "0b6fb165-00bc-4d37-ab8b-79f91016dbe1" + [compat] +ChunkCodecCore = "1" julia = "1.6" diff --git a/src/TranscodingStreams.jl b/src/TranscodingStreams.jl index 7ebf353..5015e57 100644 --- a/src/TranscodingStreams.jl +++ b/src/TranscodingStreams.jl @@ -16,5 +16,6 @@ include("stream.jl") include("io.jl") include("noop.jl") include("transcode.jl") +include("chunkcodecs.jl") end # module diff --git a/src/chunkcodecs.jl b/src/chunkcodecs.jl new file mode 100644 index 0000000..877df87 --- /dev/null +++ b/src/chunkcodecs.jl @@ -0,0 +1,128 @@ +# Allow a `TranscodingStreams.Codec` to be used as a decoder. + +using ChunkCodecCore: + check_in_range, + check_contiguous, + grow_dst!, + MaybeSize, + NOT_SIZE +import ChunkCodecCore: + try_decode!, + try_resize_decode!, + try_find_decoded_size + +# `Codec` subtypes may want to specialize `try_find_decoded_size` +function try_find_decoded_size(::Codec, src::AbstractVector{UInt8})::Nothing + nothing +end + +function try_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...)::MaybeSize + try_resize_decode!(codec, dst, src, Int64(length(dst))) +end + +function try_resize_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}, max_size::Int64; kwargs...)::MaybeSize + dst_size::Int64 = length(dst) + src_size::Int64 = length(src) + check_contiguous(dst) + check_contiguous(src) + cconv_src = Base.cconvert(Ptr{UInt8}, src) + if dst_size < max_size + # Do the equivalent of `_default_output_buffer(codec, input)` in `transcode` + # by resizing `dst` + expected_dst_size::Int64 = GC.@preserve(cconv_src, min(initial_output_size( + codec, + Memory(Base.unsafe_convert(Ptr{UInt8}, cconv_src), src_size), + ), max_size)) + if expected_dst_size > dst_size + resize!(dst, expected_dst_size) + dst_size = expected_dst_size + end + end + src_left::Int64 = src_size + dst_left::Int64 = dst_size + err = Error() + # Outer loop to decode a concatenation of multiple compressed streams. + while true + if startproc(codec, :write, err) === :error + @goto handle_error + end + if pledgeinsize(codec, src_left, err) === :error + @goto handle_error + end + # The process loop + while true + GC.@preserve cconv_src begin + local src_p = Base.unsafe_convert(Ptr{UInt8}, cconv_src) + (src_size - src_left) + local input = Memory(src_p, src_left) + # ensure minoutsize is provided + local dst_space_needed = minoutsize(codec, input) + while dst_space_needed > dst_left + local next_size = grow_dst!(dst, max_size) + if isnothing(next_size) + break # reached max_size limit + end + dst_left += next_size - dst_size + dst_size = next_size + @assert dst_left > 0 + end + cconv_dst = Base.cconvert(Ptr{UInt8}, dst) + GC.@preserve cconv_dst begin + local dst_p = Base.unsafe_convert(Ptr{UInt8}, cconv_dst) + (dst_size - dst_left) + if dst_space_needed > dst_left + # Try to do the decoding into a scratch buffer + # The scratch buffer should typically be just a few bytes. + # This enables handling the `return NOT_SIZE` case + # while respecting the `minoutsize` restrictions. + local scratch_dst = zeros(UInt8, dst_space_needed) + local cconv_scratch_dst = Base.cconvert(Ptr{UInt8}, scratch_dst) + GC.@preserve cconv_scratch_dst let + local scratch_p = Base.unsafe_convert(Ptr{UInt8}, cconv_scratch_dst) + local output = Memory(scratch_p, dst_space_needed) + local Δin, Δout, code = process(codec, input, output, err) + if code === :error + @goto handle_error + end + local valid_Δout = min(Δout, dst_left) + src_left -= Δin + unsafe_copyto!(dst_p, scratch_p, valid_Δout) + if Δout > dst_left + # Ran out of dst space + return NOT_SIZE + end + dst_left -= Δout + if code === :end + if src_left > 0 + break # out of the process loop to decode next stream + else + # Done + return dst_size - dst_left + end + end + end + else + local output = Memory(dst_p, dst_left) + local Δin, Δout, code = process(codec, input, output, err) + if code === :error + @goto handle_error + end + src_left -= Δin + dst_left -= Δout + if code === :end + if src_left > 0 + break # out of the process loop to decode next stream + else + # Done + return dst_size - dst_left + end + end + end + end + end + end + end + @label handle_error + if !haserror(err) + set_default_error!(err) + end + throw(err[]) +end diff --git a/test/Project.toml b/test/Project.toml index afed488..cb8ebfc 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -1,5 +1,7 @@ [deps] Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" +ChunkCodecCore = "0b6fb165-00bc-4d37-ab8b-79f91016dbe1" +ChunkCodecTests = "06b1ce50-b741-4199-b118-ba5fe1a70fa7" FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b" OffsetArrays = "6fe1bfb0-de20-5000-8ca7-80f57d26f881" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -8,5 +10,5 @@ TestsForCodecPackages = "c2e61002-3542-480d-8b3c-5f05cc4f8554" TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" [sources] -TranscodingStreams = {path = ".."} TestsForCodecPackages = {path = "../lib/TestsForCodecPackages"} +TranscodingStreams = {path = ".."} diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 385dd15..ea3ea14 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -15,6 +15,10 @@ using TestsForCodecPackages: test_chunked_read, test_chunked_write, test_reuse_encoder +import ChunkCodecCore +using ChunkCodecTests: + ChunkCodecTests, + test_encoder_decoder # An insane codec for testing the codec APIs. struct DoubleFrameEncoder <: TranscodingStreams.Codec @@ -454,6 +458,27 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD close(stream) end + @testset "chunk codec tests" begin + # create a encoder + function ChunkCodecCore.try_encode!(e::DoubleFrameEncoder, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...) + ChunkCodecCore.try_decode!(e, dst, src) + end + function ChunkCodecCore.decoded_size_range(e::DoubleFrameEncoder) + Int64(0):Int64(1):typemax(Int64)-Int64(1) + end + function ChunkCodecCore.encode_bound(e::DoubleFrameEncoder, src_size::Int64)::Int64 + clamp(widen(clamp(widemul(src_size, Int64(2)), Int64)) + widen(Int64(4)), Int64) + end + ChunkCodecCore.can_concatenate(::DoubleFrameDecoder) = true + encoder = DoubleFrameEncoder() + TranscodingStreams.initialize(encoder) + decoder = DoubleFrameDecoder() + TranscodingStreams.initialize(decoder) + test_encoder_decoder(encoder, decoder; trials=10) + TranscodingStreams.finalize(encoder) + TranscodingStreams.finalize(decoder) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 4ba9674..ca0bdc2 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -1,5 +1,10 @@ using OffsetArrays: OffsetArray using FillArrays: Zeros +using TranscodingStreams: + TranscodingStreams, + TranscodingStream, + Noop, + NoopStream using TestsForCodecPackages: test_roundtrip_read, test_roundtrip_write, @@ -9,6 +14,11 @@ using TestsForCodecPackages: test_roundtrip_fileio, test_chunked_read, test_chunked_write +import ChunkCodecCore +using ChunkCodecTests: + ChunkCodecTests, + test_encoder_decoder +using Test @testset "Noop Codec" begin source = IOBuffer("") @@ -584,4 +594,12 @@ using TestsForCodecPackages: close(stream) end + @testset "chunk codec tests" begin + # create a encoder + encoder = ChunkCodecCore.NoopEncodeOptions() + decoder = Noop() + TranscodingStreams.initialize(decoder) + test_encoder_decoder(encoder, decoder; trials=100) + TranscodingStreams.finalize(decoder) + end end