diff --git a/fluentd.gemspec b/fluentd.gemspec index c7c03911ad..de94bd83cc 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"]) gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"]) gem.add_runtime_dependency("webrick", ["~> 1.4"]) + gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"]) # gems that aren't default gems as of Ruby 3.4 gem.add_runtime_dependency("base64", ["~> 0.2"]) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index bcb06998ed..9c2ffbbf8e 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil) out.full_pack end - def to_compressed_msgpack_stream(time_int: false, packer: nil) + def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip) packed = to_msgpack_stream(time_int: time_int, packer: packer) - compress(packed) + compress(packed, type: type) end def to_msgpack_stream_forced_integer(packer: nil) @@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil) end class CompressedMessagePackEventStream < MessagePackEventStream - def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) - super + def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip) + super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records) @decompressed_data = nil @compressed_data = data + @type = compress end def empty? @@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil) def ensure_decompressed! return if @decompressed_data - @data = @decompressed_data = decompress(@data) + @data = @decompressed_data = decompress(@data, type: @type) end end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 80709c12bb..6ba36c4318 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than config_param :queued_chunks_limit_size, :integer, default: nil desc 'Compress buffered data.' - config_param :compress, :enum, list: [:text, :gzip], default: :text + config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text desc 'If true, chunks are thrown away when unrecoverable error happens' config_param :disable_chunk_backup, :bool, default: false diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 01506a1b9b..30707e6811 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -59,8 +59,11 @@ def initialize(metadata, compress: :text) @size = 0 @created_at = Fluent::Clock.real_now @modified_at = Fluent::Clock.real_now - - extend Decompressable if compress == :gzip + if compress == :gzip + extend GzipDecompressable + elsif compress == :zstd + extend ZstdDecompressable + end end attr_reader :unique_id, :metadata, :state @@ -85,7 +88,7 @@ def modified_at # data is array of formatted record string def append(data, **kwargs) - raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip + raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd begin adding = data.join.force_encoding(Encoding::ASCII_8BIT) rescue @@ -172,23 +175,23 @@ def purge end def read(**kwargs) - raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip + raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd raise NotImplementedError, "Implement this method in child class" end def open(**kwargs, &block) - raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip + raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd raise NotImplementedError, "Implement this method in child class" end def write_to(io, **kwargs) - raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip + raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd open do |i| IO.copy_stream(i, io) end end - module Decompressable + module GzipDecompressable include Fluent::Plugin::Compressable def append(data, **kwargs) @@ -241,6 +244,60 @@ def write_to(io, **kwargs) end end end + + module ZstdDecompressable + include Fluent::Plugin::Compressable + + def append(data, **kwargs) + if kwargs[:compress] == :zstd + io = StringIO.new + stream = Zstd::StreamWriter.new(io) + data.each do |d| + stream.write(d) + end + stream.finish + concat(io.string, data.size) + else + super + end + end + + def open(**kwargs, &block) + if kwargs[:compressed] == :zstd + super + else + super(**kwargs) do |chunk_io| + output_io = if chunk_io.is_a?(StringIO) + StringIO.new + else + Tempfile.new('decompressed-data') + end + output_io.binmode if output_io.is_a?(Tempfile) + decompress(input_io: chunk_io, output_io: output_io, type: :zstd) + output_io.seek(0, IO::SEEK_SET) + yield output_io + end + end + end + + def read(**kwargs) + if kwargs[:compressed] == :zstd + super + else + decompress(super,type: :zstd) + end + end + + def write_to(io, **kwargs) + open(compressed: :zstd) do |chunk_io| + if kwargs[:compressed] == :zstd + IO.copy_stream(chunk_io, io) + else + decompress(input_io: chunk_io, output_io: io, type: :zstd) + end + end + end + end end end end diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index aa242478fe..00aa915ba5 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -16,29 +16,35 @@ require 'stringio' require 'zlib' +require 'zstd-ruby' module Fluent module Plugin module Compressable - def compress(data, **kwargs) + def compress(data, type: :gzip, **kwargs) output_io = kwargs[:output_io] io = output_io || StringIO.new - Zlib::GzipWriter.wrap(io) do |gz| - gz.write data + if type == :gzip + writer = Zlib::GzipWriter.new(io) + elsif type == :zstd + writer = Zstd::StreamWriter.new(io) + else + raise ArgumentError, "Unknown compression type: #{type}" end - + writer.write(data) + writer.finish output_io || io.string end # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)` # https://www.ruby-forum.com/topic/971591#979503 - def decompress(compressed_data = nil, output_io: nil, input_io: nil) + def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip) case when input_io && output_io - io_decompress(input_io, output_io) + io_decompress(input_io, output_io, type) when input_io output_io = StringIO.new - io = io_decompress(input_io, output_io) + io = io_decompress(input_io, output_io, type) io.string when compressed_data.nil? || compressed_data.empty? # check compressed_data(String) is 0 length @@ -46,51 +52,91 @@ def decompress(compressed_data = nil, output_io: nil, input_io: nil) when output_io # execute after checking compressed_data is empty or not io = StringIO.new(compressed_data) - io_decompress(io, output_io) + io_decompress(io, output_io, type) else - string_decompress(compressed_data) + string_decompress(compressed_data, type) end end private - def string_decompress(compressed_data) + def string_decompress_gzip(compressed_data) io = StringIO.new(compressed_data) - out = '' loop do - gz = Zlib::GzipReader.new(io) - out << gz.read - unused = gz.unused - gz.finish - + reader = Zlib::GzipReader.new(io) + out << reader.read + unused = reader.unused + reader.finish unless unused.nil? adjust = unused.length io.pos -= adjust end break if io.eof? end + out + end + def string_decompress_zstd(compressed_data) + io = StringIO.new(compressed_data) + out = '' + loop do + reader = Zstd::StreamReader.new(io) + # Zstd::StreamReader needs to specify the size of the buffer + out << reader.read(1024) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if io.eof? + end out end - def io_decompress(input, output) + def string_decompress(compressed_data, type = :gzip) + if type == :gzip + string_decompress_gzip(compressed_data) + elsif type == :zstd + string_decompress_zstd(compressed_data) + else + raise ArgumentError, "Unknown compression type: #{type}" + end + end + + def io_decompress_gzip(input, output) loop do - gz = Zlib::GzipReader.new(input) - v = gz.read + reader = Zlib::GzipReader.new(input) + v = reader.read output.write(v) - unused = gz.unused - gz.finish - + unused = reader.unused + reader.finish unless unused.nil? adjust = unused.length input.pos -= adjust end break if input.eof? end + output + end + def io_decompress_zstd(input, output) + loop do + reader = Zstd::StreamReader.new(input) + # Zstd::StreamReader needs to specify the size of the buffer + v = reader.read(1024) + output.write(v) + # Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position + break if input.eof? + end output end + + def io_decompress(input, output, type = :gzip) + if type == :gzip + io_decompress_gzip(input, output) + elsif type == :zstd + io_decompress_zstd(input, output) + else + raise ArgumentError, "Unknown compression type: #{type}" + end + end end end end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index eb8b3c629e..0c2216883c 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn) case entries when String # PackedForward - option = msg[2] - size = (option && option['size']) || 0 - es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream - es = es_class.new(entries, nil, size.to_i) + option = msg[2] || {} + size = option['size'] || 0 + + if option['compressed'] && option['compressed'] != 'text' + es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym) + else + es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i) + end es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event if @enable_field_injection es = add_source_info(es, conn) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index ae47114a20..1598cc7c3f 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -29,11 +29,12 @@ class FileOutput < Output helpers :formatter, :inject, :compat_parameters - SUPPORTED_COMPRESS = [:text, :gz, :gzip] + SUPPORTED_COMPRESS = [:text, :gz, :gzip, :zstd] SUPPORTED_COMPRESS_MAP = { text: nil, gz: :gzip, gzip: :gzip, + zstd: :zstd, } DEFAULT_TIMEKEY = 60 * 60 * 24 @@ -184,6 +185,10 @@ def configure(conf) @buffer.symlink_path = @symlink_path @buffer.output_plugin_for_symlink = self end + + if @compress != :text && @buffer.compress != :text && @buffer.compress != @compress_method + raise Fluent::ConfigError, "You cannot specify different compression formats for Buffer (Buffer: #{@buffer.compress}, Self: #{@compress})" + end end @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @@ -205,17 +210,17 @@ def write(chunk) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm writer = case - when @compress_method.nil? - method(:write_without_compression) - when @compress_method == :gzip - if @buffer.compress != :gzip || @recompress - method(:write_gzip_with_compression) - else - method(:write_gzip_from_gzipped_chunk) - end - else - raise "BUG: unknown compression method #{@compress_method}" - end + when @compress_method.nil? + method(:write_without_compression) + when @compress_method != :text + if @buffer.compress == :text || @recompress + method(:write_with_compression).curry.call(@compress_method) + else + method(:write_from_compressed_chunk).curry.call(@compress_method) + end + else + raise "BUG: unknown compression method #{@compress_method}" + end if @append if @need_lock @@ -241,17 +246,22 @@ def write_without_compression(path, chunk) end end - def write_gzip_with_compression(path, chunk) + def write_with_compression(type, path, chunk) File.open(path, "ab", @file_perm) do |f| - gz = Zlib::GzipWriter.new(f) + gz = nil + if type == :gzip + gz = Zlib::GzipWriter.new(f) + elsif type == :zstd + gz = Zstd::StreamWriter.new(f) + end chunk.write_to(gz, compressed: :text) gz.close end end - def write_gzip_from_gzipped_chunk(path, chunk) + def write_from_compressed_chunk(type, path, chunk) File.open(path, "ab", @file_perm) do |f| - chunk.write_to(f, compressed: :gzip) + chunk.write_to(f, compressed: type) end end @@ -268,6 +278,7 @@ def timekey_to_timeformat(timekey) def compression_suffix(compress) case compress when :gzip then '.gz' + when :zstd then '.zstd' when nil then '' else raise ArgumentError, "unknown compression type #{compress}" diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 9a07acbe22..d8df7a201d 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -87,7 +87,7 @@ class ForwardOutput < Output config_param :verify_connection_at_startup, :bool, default: false desc 'Compress buffered data.' - config_param :compress, :enum, list: [:text, :gzip], default: :text + config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text desc 'The default version of TLS transport.' config_param :tls_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: Fluent::TLS::DEFAULT_VERSION @@ -251,10 +251,14 @@ def configure(conf) end unless @as_secondary - if @compress == :gzip && @buffer.compress == :text - @buffer.compress = :gzip - elsif @compress == :text && @buffer.compress == :gzip - log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in " + if @buffer.compress == :text + @buffer.compress = @compress + else + if @compress == :text + log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in " + elsif @compress != @buffer.compress + raise Fluent::ConfigError, "You cannot specify different compression formats for Buffer (Buffer: #{@buffer.compress}, Self: #{@compress})" + end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index d2107330ba..a987937b67 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1014,13 +1014,17 @@ def write_guard(&block) end FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer, type: :zstd) } def generate_format_proc if @buffer && @buffer.compress == :gzip - @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP : FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP + elsif @buffer && @buffer.compress == :zstd + @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD : FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 14d33af9c7..d1de2f415f 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -1443,7 +1443,7 @@ def create_chunk_es(metadata, es) test 'create decompressable chunk' do chunk = @p.generate_chunk(create_metadata) - assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable) + assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::GzipDecompressable) end test '#write compressed data which exceeds chunk_limit_size, it raises BufferChunkOverflowError' do diff --git a/test/plugin/test_buffer_chunk.rb b/test/plugin/test_buffer_chunk.rb index 535b0acd1a..446751d685 100644 --- a/test/plugin/test_buffer_chunk.rb +++ b/test/plugin/test_buffer_chunk.rb @@ -77,6 +77,16 @@ class BufferChunkTest < Test::Unit::TestCase assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :gzip) } assert_raise(ArgumentError){ chunk.append(nil, compress: :gzip) } end + + test 'some methods raise ArgumentError with an option of `compressed: :zstd` and without extending Compressble`' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta) + + assert_raise(ArgumentError){ chunk.read(compressed: :zstd) } + assert_raise(ArgumentError){ chunk.open(compressed: :zstd){} } + assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :zstd) } + assert_raise(ArgumentError){ chunk.append(nil, compress: :zstd) } + end end class TestChunk < Fluent::Plugin::Buffer::Chunk @@ -203,7 +213,15 @@ def open(**kwargs) test 'create decompressable chunk' do meta = Object.new chunk = Fluent::Plugin::Buffer::Chunk.new(meta, compress: :gzip) - assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable) + assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::GzipDecompressable) + end + end + + sub_test_case 'when compress is zstd' do + test 'create decompressable chunk' do + meta = Object.new + chunk = Fluent::Plugin::Buffer::Chunk.new(meta, compress: :zstd) + assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::ZstdDecompressable) end end end diff --git a/test/plugin/test_buffer_file_chunk.rb b/test/plugin/test_buffer_file_chunk.rb index 00bd4fc153..755869a7ae 100644 --- a/test/plugin/test_buffer_file_chunk.rb +++ b/test/plugin/test_buffer_file_chunk.rb @@ -802,6 +802,7 @@ def gen_chunk_path(prefix, unique_id) setup do @src = 'text data for compressing' * 5 @gzipped_src = compress(@src) + @zstded_src = compress(@src, type: :zstd) end test '#append with compress option writes compressed data to chunk when compress is gzip' do @@ -867,5 +868,69 @@ def gen_chunk_path(prefix, unique_id) c.write_to(io, compressed: :gzip) assert_equal @gzipped_src, io.string end + + test '#append with compress option writes compressed data to chunk when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :zstd) + c.append([@src, @src], compress: :zstd) + c.commit + + # check chunk is compressed + assert c.read(compressed: :zstd).size < [@src, @src].join("").size + + assert_equal @src + @src, c.read + end + + test '#open passes io object having decompressed data to a block when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + decomressed_data = c.open do |io| + v = io.read + assert_equal @src, v + v + end + assert_equal @src, decomressed_data + end + + test '#open with compressed option passes io object having decompressed data to a block when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + comressed_data = c.open(compressed: :zstd) do |io| + v = io.read + assert_equal @zstded_src, v + v + end + assert_equal @zstded_src, comressed_data + end + + test '#write_to writes decompressed data when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + c.write_to(io) + assert_equal @src, io.string + end + + test '#write_to with compressed option writes compressed data when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + io.set_encoding(Encoding::ASCII_8BIT) + c.write_to(io, compressed: :zstd) + assert_equal @zstded_src, io.string + end end end diff --git a/test/plugin/test_buffer_file_single_chunk.rb b/test/plugin/test_buffer_file_single_chunk.rb index db96092b3b..0a6d804d70 100644 --- a/test/plugin/test_buffer_file_single_chunk.rb +++ b/test/plugin/test_buffer_file_single_chunk.rb @@ -542,6 +542,7 @@ def gen_chunk_path(prefix, unique_id) setup do @src = 'text data for compressing' * 5 @gzipped_src = compress(@src) + @zstded_src = compress(@src, type: :zstd) end test '#append with compress option writes compressed data to chunk when compress is gzip' do @@ -607,5 +608,69 @@ def gen_chunk_path(prefix, unique_id) c.write_to(io, compressed: :gzip) assert_equal @gzipped_src, io.string end + + test '#append with compress option writes compressed data to chunk when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'fsb.*.buf'), :create, nil, compress: :zstd) + c.append([@src, @src], compress: :zstd) + c.commit + + # check chunk is compressed + assert c.read(compressed: :zstd).size < [@src, @src].join("").size + + assert_equal @src + @src, c.read + end + + test '#open passes io object having decompressed data to a block when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'fsb.*.buf'), :create, nil, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + decomressed_data = c.open do |io| + v = io.read + assert_equal @src, v + v + end + assert_equal @src, decomressed_data + end + + test '#open with compressed option passes io object having decompressed data to a block when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'fsb.*.buf'), :create, nil, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + comressed_data = c.open(compressed: :zstd) do |io| + v = io.read + assert_equal @zstded_src, v + v + end + assert_equal @zstded_src, comressed_data + end + + test '#write_to writes decompressed data when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'fsb.*.buf'), :create, nil, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + c.write_to(io) + assert_equal @src, io.string + end + + test '#write_to with compressed option writes compressed data when compress is zstd' do + c = @klass.new(gen_metadata, File.join(@chunkdir,'fsb.*.buf'), :create, nil, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + io.set_encoding(Encoding::ASCII_8BIT) + c.write_to(io, compressed: :zstd) + assert_equal @zstded_src, io.string + end end end diff --git a/test/plugin/test_buffer_memory_chunk.rb b/test/plugin/test_buffer_memory_chunk.rb index 03f2dda9f1..6479124b0e 100644 --- a/test/plugin/test_buffer_memory_chunk.rb +++ b/test/plugin/test_buffer_memory_chunk.rb @@ -270,6 +270,7 @@ class BufferMemoryChunkTest < Test::Unit::TestCase setup do @src = 'text data for compressing' * 5 @gzipped_src = compress(@src) + @zstded_src = compress(@src, type: :zstd) end test '#append with compress option writes compressed data to chunk when compress is gzip' do @@ -335,5 +336,69 @@ class BufferMemoryChunkTest < Test::Unit::TestCase c.write_to(io, compressed: :gzip) assert_equal @gzipped_src, io.string end + + test '#append with compress option writes compressed data to chunk when compress is zstd' do + c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :zstd) + c.append([@src, @src], compress: :zstd) + c.commit + + # check chunk is compressed + assert c.read(compressed: :zstd).size < [@src, @src].join("").size + + assert_equal @src + @src, c.read + end + + test '#open passes io object having decompressed data to a block when compress is zstd' do + c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + decomressed_data = c.open do |io| + v = io.read + assert_equal @src, v + v + end + assert_equal @src, decomressed_data + end + + test '#open with compressed option passes io object having decompressed data to a block when compress is zstd' do + c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + comressed_data = c.open(compressed: :zstd) do |io| + v = io.read + assert_equal @zstded_src, v + v + end + assert_equal @zstded_src, comressed_data + end + + test '#write_to writes decompressed data when compress is zstd' do + c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + c.write_to(io) + assert_equal @src, io.string + end + + test '#write_to with compressed option writes compressed data when compress is zstd' do + c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :zstd) + c.concat(@zstded_src, @src.size) + c.commit + + assert_equal @src, c.read + assert_equal @zstded_src, c.read(compressed: :zstd) + + io = StringIO.new + io.set_encoding(Encoding::ASCII_8BIT) + c.write_to(io, compressed: :zstd) + assert_equal @zstded_src, io.string + end end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index cbebe91593..b3f5a1e07d 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -509,7 +509,7 @@ def create_driver(conf=base_config) end sub_test_case 'compressed packed forward' do - test 'set_compress_to_option' do + test 'set_compress_to_option_gzip' do @d = d = create_driver time_i = event_time("2011-01-02 13:14:15 UTC").to_i @@ -536,6 +536,33 @@ def create_driver(conf=base_config) assert_equal events, d.events end + test 'set_compress_to_option_zstd' do + @d = d = create_driver + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] + ] + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v, type: :zstd) + end + chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack + + d.run do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + assert_equal 'zstd', option['compressed'] + end + end + + assert_equal events, d.events + end + test 'create_CompressedMessagePackEventStream_with_gzip_compress_option' do @d = d = create_driver @@ -554,7 +581,7 @@ def create_driver(conf=base_config) chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack # check CompressedMessagePackEventStream is created - mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0) + mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip) d.run do Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| @@ -563,6 +590,34 @@ def create_driver(conf=base_config) end end end + + test 'create_CompressedMessagePackEventStream_with_zstd_compress_option' do + @d = d = create_driver + + time_i = event_time("2011-01-02 13:14:15 UTC").to_i + events = [ + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] + ] + + # create compressed entries + entries = '' + events.each do |_tag, _time, record| + v = [_time, record].to_msgpack + entries << compress(v) + end + chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack + + # check CompressedMessagePackEventStream is created + mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :zstd) + + d.run do + Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj| + option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK) + assert_equal 'zstd', option['compressed'] + end + end + end end sub_test_case 'warning' do diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 062ba374d7..b78f8f62ce 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -5,6 +5,7 @@ require 'time' require 'timecop' require 'zlib' +require 'zstd-ruby' require 'fluent/file_wrapper' class FileOutputTest < Test::Unit::TestCase @@ -397,20 +398,32 @@ def create_driver(conf = CONFIG, opts = {}) end end - def check_gzipped_result(path, expect) + def check_zipped_result(path, expect, type: :gzip) # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790 # Following code from https://www.ruby-forum.com/topic/971591#979520 result = '' - File.open(path, "rb") { |io| - loop do - gzr = Zlib::GzipReader.new(StringIO.new(io.read)) - result << gzr.read - unused = gzr.unused - gzr.finish - break if unused.nil? - io.pos -= unused.length - end - } + if type == :gzip || type == :gz + File.open(path, "rb") { |io| + loop do + gzr = Zlib::GzipReader.new(StringIO.new(io.read)) + result << gzr.read + unused = gzr.unused + gzr.finish + break if unused.nil? + io.pos -= unused.length + end + } + elsif type == :zstd + File.open(path, "rb") { |io| + loop do + reader = Zstd::StreamReader.new(StringIO.new(io.read)) + result << reader.read(1024) + break if io.eof? + end + } + else + raise "Invalid compression type to check" + end assert_equal expect, result end @@ -421,7 +434,7 @@ def check_result(path, expect) end sub_test_case 'write' do - test 'basic case' do + test 'basic case with gz' do d = create_driver assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") @@ -433,7 +446,29 @@ def check_result(path, expect) end assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") - check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}]) + check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}]) + end + + test 'write with zstd compression' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress zstd + utc + + timekey_use_utc true + + ] + + assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd") + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + + assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.zstd") + check_zipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.zstd", %[2011-01-02T13:14:15Z\ttest\t{"a":1}#{@default_newline}] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}#{@default_newline}], type: :zstd) end end @@ -481,7 +516,7 @@ def parse_system(text) assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz") - check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) + check_zipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i) file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode @@ -500,7 +535,7 @@ def parse_system(text) end path = d.instance.last_written_path - check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}]) + check_zipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}#{@default_newline}] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}#{@default_newline}]) end test 'ltsv' do @@ -513,7 +548,7 @@ def parse_system(text) end path = d.instance.last_written_path - check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}]) + check_zipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z#{@default_newline}] + %[a:2\ttime:2011-01-02T13:14:15Z#{@default_newline}]) end test 'single_value' do @@ -526,7 +561,7 @@ def parse_system(text) end path = d.instance.last_written_path - check_gzipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}]) + check_zipped_result(path, %[1#{@default_newline}] + %[2#{@default_newline}]) end end @@ -547,23 +582,24 @@ def parse_system(text) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path - check_gzipped_result(path, formatted_lines) + check_zipped_result(path, formatted_lines) assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path - check_gzipped_result(path, formatted_lines) + check_zipped_result(path, formatted_lines) assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path - check_gzipped_result(path, formatted_lines) + check_zipped_result(path, formatted_lines) assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size end data( - "with compression" => true, - "without compression" => false, + "without compression" => "text", + "with gzip compression" => "gz", + "with zstd compression" => "zstd" ) test 'append' do |compression| time = event_time("2011-01-02 13:14:15 UTC") @@ -578,8 +614,8 @@ def parse_system(text) timekey_use_utc true ] - if compression - config << " compress gz" + if compression != :text + config << " compress #{compression}" end d = create_driver(config) d.run(default_tag: 'test'){ @@ -590,16 +626,16 @@ def parse_system(text) } log_file_name = "out_file_test.20110102.log" - if compression - log_file_name << ".gz" + if compression != "text" + log_file_name << ".#{compression}" end 1.upto(3) do |i| path = write_once.call assert_equal "#{TMP_DIR}/#{log_file_name}", path expect = formatted_lines * i - if compression - check_gzipped_result(path, expect) + if compression != "text" + check_zipped_result(path, expect, type: compression.to_sym) else check_result(path, expect) end @@ -630,15 +666,15 @@ def parse_system(text) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path - check_gzipped_result(path, formatted_lines) + check_zipped_result(path, formatted_lines) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path - check_gzipped_result(path, formatted_lines * 2) + check_zipped_result(path, formatted_lines * 2) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path - check_gzipped_result(path, formatted_lines * 3) + check_zipped_result(path, formatted_lines * 3) end end @@ -667,15 +703,15 @@ def parse_system(text) path = write_once.call # Rotated at 2011-01-02 17:00:00+02:00 assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path - check_gzipped_result(path, formatted_lines) + check_zipped_result(path, formatted_lines) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path - check_gzipped_result(path, formatted_lines * 2) + check_zipped_result(path, formatted_lines * 2) path = write_once.call assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path - check_gzipped_result(path, formatted_lines * 3) + check_zipped_result(path, formatted_lines * 3) end end @@ -871,6 +907,10 @@ def run_and_check(d, symlink_path) test 'returns .gz for gzip' do assert_equal '.gz', @i.compression_suffix(:gzip) end + + test 'returns .zstd for zstd' do + assert_equal '.zstd', @i.compression_suffix(:zstd) + end end sub_test_case '#generate_path_template' do diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index c2442b631a..45d665a10e 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -343,6 +343,15 @@ def try_write(chunk) assert_equal :gzip, node.instance_variable_get(:@compress) end + test 'set_compress_is_zstd' do + @d = d = create_driver(config + %[compress zstd]) + assert_equal :zstd, d.instance.compress + assert_equal :zstd, d.instance.buffer.compress + + node = d.instance.nodes.first + assert_equal :zstd, node.instance_variable_get(:@compress) + end + test 'set_compress_is_gzip_in_buffer_section' do mock = flexmock($log) mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ") @@ -360,6 +369,23 @@ def try_write(chunk) assert_equal :text, node.instance_variable_get(:@compress) end + test 'set_compress_is_zstd_in_buffer_section' do + mock = flexmock($log) + mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in ") + + @d = d = create_driver(config + %[ + + type memory + compress zstd + + ]) + assert_equal :text, d.instance.compress + assert_equal :zstd, d.instance.buffer.compress + + node = d.instance.nodes.first + assert_equal :text, node.instance_variable_get(:@compress) + end + test 'phi_failure_detector disabled' do @d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first @@ -550,6 +576,36 @@ def try_write(chunk) assert_equal ['test', time, records[1]], events[1] end + test 'send_comprssed_message_pack_stream_if_compress_is_zstd' do + target_input_driver = create_target_input_driver + + @d = d = create_driver(config + %[ + flush_interval 1s + compress zstd + ]) + + time = event_time('2011-01-02 13:14:15 UTC') + + records = [ + {"a" => 1}, + {"a" => 2} + ] + target_input_driver.run(expect_records: 2) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end + end + end + + event_streams = target_input_driver.event_streams + assert_true event_streams[0][1].is_a?(Fluent::CompressedMessagePackEventStream) + + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] + end + test 'send_to_a_node_supporting_responses' do target_input_driver = create_target_input_driver diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index c9cc649f0a..5923ee0937 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1010,13 +1010,25 @@ def waiting(seconds) test 'when output has and compress is gzip' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_GZIP, i.generate_format_proc end test 'when output has and compress is gzip and time_as_integer is true' do i = create_output(:buffered) i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'gzip'})])) - assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, i.generate_format_proc + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_GZIP, i.generate_format_proc + end + + test 'when output has and compress is zstd' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_ZSTD, i.generate_format_proc + end + + test 'when output has and compress is zstd and time_as_integer is true' do + i = create_output(:buffered) + i.configure(config_element('ROOT', '', {'time_as_integer' => true}, [config_element('buffer', '', {'compress' => 'zstd'})])) + assert_equal Fluent::Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT_ZSTD, i.generate_format_proc end test 'when output has and compress is text' do