From 389faf33d6661171309f0d075079c28b67ce133a Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Tue, 8 Jul 2025 22:12:55 +0100 Subject: [PATCH 1/7] First draft of AvroFileWriter --- .../arrow/adapter/avro/AvroFileWriter.java | 192 ++++++++++++++++++ .../adapter/avro/BufferOutputStream.java | 95 +++++++++ 2 files changed, 287 insertions(+) create mode 100644 adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java create mode 100644 adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java new file mode 100644 index 0000000000..b5fbfe3c72 --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.avro; + +import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + + +class AvroFileWriter { + + // Use magic from Avro's own constants + private static final byte[] AVRO_MAGIC = DataFileConstants.MAGIC; + + private static final String codecName = "zstandard"; + private static final CompressionUtil.CodecType codecType = CompressionUtil.CodecType.ZSTD; + + private final OutputStream stream; + private final Encoder encoder; + + private final BufferAllocator allocator; + private final BufferOutputStream batchBuffer; + private BinaryEncoder batchEncoder; + private VectorSchemaRoot batch; + + private final Schema avroSchema; + private final byte[] syncMarker; + + private final CompositeAvroProducer recordProducer; + private final CompressionCodec compressionCodec; + + + public AvroFileWriter( + OutputStream stream, + VectorSchemaRoot firstBatch, + DictionaryProvider dictionaries) + throws IOException { + + EncoderFactory encoderFactory = EncoderFactory.get(); + + this.stream = stream; + this.encoder = encoderFactory.binaryEncoder(stream, null); + + this.allocator = firstBatch.getVector(0).getAllocator(); + this.batchBuffer = new BufferOutputStream(allocator); + this.batchEncoder = encoderFactory.binaryEncoder(stream, null); + this.batch = firstBatch; + + try { + + this.avroSchema = ArrowToAvroUtils.createAvroSchema( + firstBatch.getSchema().getFields(), + dictionaries); + + this.recordProducer = ArrowToAvroUtils.createCompositeProducer( + firstBatch.getFieldVectors(), + dictionaries); + + this.compressionCodec = CompressionCodec.Factory.INSTANCE.createCodec(codecType); + + // Generate a random sync marker + var random = new Random(); + this.syncMarker = new byte[16]; + random.nextBytes(this.syncMarker); + } + catch (Throwable e) { + // Do not leak the batch buffer if there are problems during setup + batchBuffer.close(); + throw e; + } + } + + // Sets up a defaulr binary encoder for the channel + public AvroFileWriter( + WritableByteChannel channel, + VectorSchemaRoot firstBatch, + DictionaryProvider dictionaries) + throws IOException { + + this(Channels.newOutputStream(channel), firstBatch, dictionaries); + } + + // Write the Avro header (throws if already written) + public void writeHeader() throws IOException { + + // Prepare the metadata map + Map metadata = new HashMap<>(); + metadata.put("avro.schema", avroSchema.toString().getBytes(StandardCharsets.UTF_8)); + metadata.put("avro.codec", codecName.getBytes(StandardCharsets.UTF_8)); + + // Avro magic + encoder.writeFixed(AVRO_MAGIC); + + // Write the metadata map + encoder.writeMapStart(); // write metadata + encoder.setItemCount(metadata.size()); + for (Map.Entry entry : metadata.entrySet()) { + encoder.startItem(); + encoder.writeString(entry.getKey()); + encoder.writeBytes(entry.getValue()); + } + encoder.writeMapEnd(); + + // Sync marker denotes end of the header + encoder.writeFixed(this.syncMarker); + encoder.flush(); + } + + // Write the contents of the VSR as an Avro data block + // Writes header if not yet written + // Expects new data to be in the batch (i.e. VSR can be recycled) + public void writeBatch() throws IOException { + + // Reset batch buffer and encoder + batchBuffer.reset(); + batchEncoder = EncoderFactory.get().directBinaryEncoder(batchBuffer, batchEncoder); + + // Reset producers + recordProducer.getProducers().forEach(producer -> producer.setPosition(0)); + + // Produce a batch + for (int row = 0; row < batch.getRowCount(); row++) { + recordProducer.produce(batchEncoder); + } + + batchEncoder.flush(); + + // Raw buffer is a view onto the stream backing buffer - do not release + ArrowBuf rawBuffer = batchBuffer.getBuffer(); + + // Compressed buffer is newly allocated and needs to be released + try (ArrowBuf compressedBuffer = compressionCodec.compress(allocator, rawBuffer)) { + + // Write Avro block to the main encoder + encoder.writeLong(batch.getRowCount()); + encoder.writeBytes(compressedBuffer.nioBuffer()); + encoder.writeFixed(syncMarker); + } + } + + // Reset vectors in all the producders + // Supports a stream of VSRs if source VSR is not recycled + void resetBatch(VectorSchemaRoot batch) { + recordProducer.resetProducerVectors(batch); + this.batch = batch; + } + + public void flush() throws IOException { + encoder.flush(); + } + + // Closes encoder and / or channel + // Does not close VSR or dictionary vectors + public void close() throws IOException { + encoder.flush(); + stream.close(); + batchBuffer.close(); + } +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java new file mode 100644 index 0000000000..1e746989f0 --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.avro; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; + +import java.io.IOException; +import java.io.OutputStream; + +public class BufferOutputStream extends OutputStream { + + private static final int INITIAL_BUFFER_SIZE = 4096; + + private ArrowBuf buffer; + + public BufferOutputStream(BufferAllocator allocator) { + buffer = allocator.buffer(INITIAL_BUFFER_SIZE); + } + + public ArrowBuf getBuffer() { + return buffer.slice(0, buffer.writerIndex()); + } + + @Override + public void write(int b) throws IOException { + ensureCapacity(buffer.capacity() + 1); + buffer.writeByte(b); + } + + @Override + public void write(byte[] b) throws IOException { + ensureCapacity(buffer.capacity() + b.length); + buffer.writeBytes(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ensureCapacity(buffer.capacity() + len); + buffer.writeBytes(b, off, len); + } + + @Override + public void flush() { + // no-op + } + + public void reset() { + buffer.clear(); + } + + @Override + public void close() throws IOException { + buffer.close(); + } + + private void ensureCapacity(long capacity) { + + if (capacity > buffer.capacity()) { + + long newCapacity = Math.max(capacity, buffer.capacity() * 2); + + BufferAllocator allocator = buffer.getReferenceManager().getAllocator(); + ArrowBuf newBuffer = allocator.buffer(newCapacity); + + try { + + MemoryUtil.copyMemory(buffer.memoryAddress(), newBuffer.memoryAddress(), buffer.writerIndex()); + + buffer.close(); + buffer = newBuffer; + } + catch (Throwable t) { + newBuffer.close(); + throw t; + } + } + } +} From 53efc2dd5e5642de2e4202c17185f974855bff19 Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 21:19:50 +0100 Subject: [PATCH 2/7] First draft of Avro reader --- .../arrow/adapter/avro/AvroFileReader.java | 380 ++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java new file mode 100644 index 0000000000..489a9146d6 --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.avro; + +import org.apache.arrow.adapter.avro.consumers.CompositeAvroConsumer; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.io.BinaryData; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + + +class AvroFileReader implements DictionaryProvider { + + // Writer owns a channel / decoder and will close them + // Schema / VSR / dictionaries are created when header is read + // VSR / dictionaries are cleaned up on close + // Dictionaries accessible through DictionaryProvider iface + + // Use magic from Avro's own constants + private static final byte[] AVRO_MAGIC = DataFileConstants.MAGIC; + private static final int SYNC_MARKER_SIZE = 16; + + private final InputStream stream; + private final BinaryDecoder decoder; + private final BufferAllocator allocator; + private final boolean blocking; + + private org.apache.avro.Schema avroSchema; + private String avroCodec; + private final byte[] syncMarker; + + private CompositeAvroConsumer recordConsumer; + private VectorSchemaRoot arrowBatch; + private Schema arrowSchema; + private DictionaryProvider.MapDictionaryProvider dictionaries; + + private long nextBatchPosition; + private ByteBuffer batchBuffer; + private BinaryDecoder batchDecoder; + private final byte[] batchSyncMarker; + + // Create a new AvroFileReader for the input stream + // In order to support non-blocking mode, the stream must support mark / reset + public AvroFileReader( + InputStream stream, + BufferAllocator allocator, + boolean blocking) { + + this.stream =stream; + this.allocator = allocator; + this.blocking = blocking; + + if (blocking) { + this.decoder = DecoderFactory.get().binaryDecoder(stream, null); + } else { + if (!stream.markSupported()) { + throw new IllegalArgumentException("Input stream must support mark/reset for non-blocking mode"); + } + this.decoder = DecoderFactory.get().directBinaryDecoder(stream, null); + } + + this.syncMarker = new byte[SYNC_MARKER_SIZE]; + this.batchSyncMarker = new byte[SYNC_MARKER_SIZE]; + } + + // Read the Avro header and set up schema / VSR / dictionaries + void readHeader() throws IOException { + + if (avroSchema != null) { + throw new IllegalStateException("Avro header has already been read"); + } + + // Keep track of the header size + long headerSize = 0; + + // Read Avro magic + byte[] magic = new byte[AVRO_MAGIC.length]; + decoder.readFixed(magic); + headerSize += magic.length; + + // Validate Avro magic + int validateMagic = BinaryData.compareBytes( + AVRO_MAGIC, 0, AVRO_MAGIC.length, + magic, 0, AVRO_MAGIC.length); + + if (validateMagic != 0) { + throw new RuntimeException("Invalid AVRO data file: The file is not an Avro file"); + } + + // Read the metadata map + for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) { + + headerSize += zigzagSize(count); + + for (long i = 0; i < count; i++) { + + ByteBuffer keyBuffer = decoder.readBytes(null); + ByteBuffer valueBuffer = decoder.readBytes(null); + + headerSize += zigzagSize(keyBuffer.remaining()) + keyBuffer.remaining(); + headerSize += zigzagSize(valueBuffer.remaining()) + valueBuffer.remaining(); + + String key = new String(keyBuffer.array(), StandardCharsets.UTF_8); + + // Handle header entries for schema and codec + if ("avro.schema".equals(key)) { + avroSchema = processSchema(valueBuffer); + } else if ("avro.codec".equals(key)) { + avroCodec = processCodec(valueBuffer); + } + } + } + + // End of map marker + headerSize += 1; + + // Sync marker denotes end of the header + decoder.readFixed(syncMarker); + headerSize += syncMarker.length; + + // Schema must always be present + if (avroSchema == null) { + throw new RuntimeException("Invalid AVRO data file: Schema missing in file header"); + } + + // Prepare read config + this.dictionaries = new DictionaryProvider.MapDictionaryProvider(); + AvroToArrowConfig config = new AvroToArrowConfig(allocator, 0, dictionaries, Set.of(), false); + + // Calling this method will also populate the dictionary map + this.recordConsumer = AvroToArrowUtils.createCompositeConsumer(avroSchema, config); + + // Initialize data vectors + List vectors = new ArrayList<>(arrowSchema.getFields().size()); + for (int i = 0; i < arrowSchema.getFields().size(); i++) { + FieldVector vector = recordConsumer.getConsumers().get(i).getVector(); + vectors.add(vector); + } + + // Initialize batch and schema + this.arrowBatch = new VectorSchemaRoot(vectors); + this.arrowSchema = arrowBatch.getSchema(); + + // First batch starts after the header + this.nextBatchPosition = headerSize; + } + + private org.apache.avro.Schema processSchema(ByteBuffer buffer) throws IOException { + + org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); + + try (InputStream schemaStream = new ByteArrayInputStream(buffer.array())) { + return parser.parse(schemaStream); + } + } + + private String processCodec(ByteBuffer buffer) { + + if (buffer != null && buffer.remaining() > 0) { + return new String(buffer.array(), StandardCharsets.UTF_8); + } + else { + return DataFileConstants.NULL_CODEC; + } + } + + // Schema and VSR available after readHeader() + Schema getSchema() { + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + return arrowSchema; + } + + VectorSchemaRoot getVectorSchemaRoot() { + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + return arrowBatch; + } + + @Override + public Set getDictionaryIds() { + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + return dictionaries.getDictionaryIds(); + } + + @Override + public Dictionary lookup(long id) { + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + return dictionaries.lookup(id); + } + + // Read the next Avro block and load it into the VSR + // Return true if successful, false if EOS + // Also false in non-blocking mode if need more data + boolean readBatch() throws IOException { + + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + + if (!hasNextBatch()) { + return false; + } + + // Read Avro block from the main encoder + long nRows = decoder.readLong(); + batchBuffer = decoder.readBytes(batchBuffer); + decoder.readFixed(batchSyncMarker); + + // Validate sync marker - mismatch indicates a corrupt file + long batchSize = + zigzagSize(nRows) + + zigzagSize(batchBuffer.remaining()) + + batchBuffer.remaining() + + SYNC_MARKER_SIZE; + + int validateMarker = BinaryData.compareBytes( + syncMarker, 0, SYNC_MARKER_SIZE, + batchSyncMarker, 0, SYNC_MARKER_SIZE); + + if (validateMarker != 0) { + throw new RuntimeException("Invalid AVRO data file: The file is corrupted"); + } + + // Reset producers + recordConsumer.getConsumers().forEach(consumer -> ensureCapacity(consumer.getVector(), (int) nRows)); + recordConsumer.getConsumers().forEach(consumer -> consumer.setPosition(0)); + + // Decompress the batch buffer using Avro's codecs + var codec = AvroCompression.getAvroCodec(avroCodec); + var batchDecompressed = codec.decompress(batchBuffer); + + // Prepare batch stream and decoder + try (InputStream batchStream = new ByteArrayInputStream(batchDecompressed.array())) { + + batchDecoder = DecoderFactory.get().directBinaryDecoder(batchStream, batchDecoder); + + // Consume a batch, reading from the batch stream (buffer) + for (int row = 0; row < nRows; row++) { + recordConsumer.consume(batchDecoder); + } + + arrowBatch.setRowCount((int) nRows); + + // Update next batch position + nextBatchPosition += batchSize; + + // Batch is ready + return true; + } + } + + private void ensureCapacity(FieldVector vector, int capacity) { + if (vector.getValueCapacity() < capacity) { + vector.setInitialCapacity(capacity); + } + } + + // Check for position and size of the next Avro data block + // Provides a mechanism for non-blocking / reactive styles + boolean hasNextBatch() throws IOException { + + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + + if (blocking) { + return ! decoder.isEnd(); + } + + var in = decoder.inputStream(); + in.mark(1); + + try { + + int nextByte = in.read(); + in.reset(); + + return nextByte >= 0; + } + catch(EOFException e) { + return false; + } + } + + long nextBatchPosition() { + + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + + return nextBatchPosition; + } + + public long nextBatchSize() throws IOException { + + if (avroSchema == null) { + throw new IllegalStateException("Avro header has not been read yet"); + } + + if (blocking) { + throw new IllegalStateException("Next batch size is only available in non-blocking mode"); + } + + InputStream in = decoder.inputStream(); + in.mark(20); + + long nRows = decoder.readLong(); + long nBytes = decoder.readLong(); + + in.reset(); + + return zigzagSize(nRows) + zigzagSize(nBytes) + nBytes + SYNC_MARKER_SIZE; + } + + private int zigzagSize(long n) { + + long val = (n << 1) ^ (n >> 63); // move sign to low-order bit + int bytes = 1; + + while ((val & ~0x7F) != 0) { + bytes += 1; + val >>>= 7; + } + + return bytes; + } + + // Closes encoder and / or channel + // Also closes VSR and dictionary vectors + void close() throws IOException { + + stream.close(); + + if (arrowBatch != null) { + arrowBatch.close(); + } + + if (dictionaries != null) { + for (long dictionaryId : dictionaries.getDictionaryIds()) { + Dictionary dictionary = dictionaries.lookup(dictionaryId); + dictionary.getVector().close(); + } + } + } +} From ff7150ad50965c9f967c217f1cd3c1f51ec98ec8 Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 21:20:07 +0100 Subject: [PATCH 3/7] Update writer to match reader implementation --- .../arrow/adapter/avro/AvroFileWriter.java | 90 +++++++++---------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java index b5fbfe3c72..6afc7b7c70 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java @@ -18,22 +18,19 @@ package org.apache.arrow.adapter.avro; import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.compression.CompressionCodec; -import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.avro.Schema; +import org.apache.avro.file.Codec; import org.apache.avro.file.DataFileConstants; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -44,15 +41,12 @@ class AvroFileWriter { // Use magic from Avro's own constants private static final byte[] AVRO_MAGIC = DataFileConstants.MAGIC; - - private static final String codecName = "zstandard"; - private static final CompressionUtil.CodecType codecType = CompressionUtil.CodecType.ZSTD; + private static final int SYNC_MARKER_SIZE = 16; private final OutputStream stream; private final Encoder encoder; - private final BufferAllocator allocator; - private final BufferOutputStream batchBuffer; + private final BufferOutputStream batchStream; private BinaryEncoder batchEncoder; private VectorSchemaRoot batch; @@ -60,7 +54,7 @@ class AvroFileWriter { private final byte[] syncMarker; private final CompositeAvroProducer recordProducer; - private final CompressionCodec compressionCodec; + private final Codec avroCodec; public AvroFileWriter( @@ -69,13 +63,22 @@ public AvroFileWriter( DictionaryProvider dictionaries) throws IOException { + this(stream, firstBatch, dictionaries, null); + } + + public AvroFileWriter( + OutputStream stream, + VectorSchemaRoot firstBatch, + DictionaryProvider dictionaries, + String codecName) + throws IOException { + EncoderFactory encoderFactory = EncoderFactory.get(); this.stream = stream; this.encoder = encoderFactory.binaryEncoder(stream, null); - this.allocator = firstBatch.getVector(0).getAllocator(); - this.batchBuffer = new BufferOutputStream(allocator); + this.batchStream = new BufferOutputStream(); this.batchEncoder = encoderFactory.binaryEncoder(stream, null); this.batch = firstBatch; @@ -89,43 +92,34 @@ public AvroFileWriter( firstBatch.getFieldVectors(), dictionaries); - this.compressionCodec = CompressionCodec.Factory.INSTANCE.createCodec(codecType); - // Generate a random sync marker var random = new Random(); - this.syncMarker = new byte[16]; + this.syncMarker = new byte[SYNC_MARKER_SIZE]; random.nextBytes(this.syncMarker); + + // Look up the compression codec + this.avroCodec = AvroCompression.getAvroCodec(codecName); } catch (Throwable e) { // Do not leak the batch buffer if there are problems during setup - batchBuffer.close(); + batchStream.close(); throw e; } } - // Sets up a defaulr binary encoder for the channel - public AvroFileWriter( - WritableByteChannel channel, - VectorSchemaRoot firstBatch, - DictionaryProvider dictionaries) - throws IOException { - - this(Channels.newOutputStream(channel), firstBatch, dictionaries); - } - // Write the Avro header (throws if already written) public void writeHeader() throws IOException { // Prepare the metadata map Map metadata = new HashMap<>(); metadata.put("avro.schema", avroSchema.toString().getBytes(StandardCharsets.UTF_8)); - metadata.put("avro.codec", codecName.getBytes(StandardCharsets.UTF_8)); + metadata.put("avro.codec", avroCodec.getName().getBytes(StandardCharsets.UTF_8)); // Avro magic encoder.writeFixed(AVRO_MAGIC); // Write the metadata map - encoder.writeMapStart(); // write metadata + encoder.writeMapStart(); encoder.setItemCount(metadata.size()); for (Map.Entry entry : metadata.entrySet()) { encoder.startItem(); @@ -144,34 +138,31 @@ public void writeHeader() throws IOException { // Expects new data to be in the batch (i.e. VSR can be recycled) public void writeBatch() throws IOException { - // Reset batch buffer and encoder - batchBuffer.reset(); - batchEncoder = EncoderFactory.get().directBinaryEncoder(batchBuffer, batchEncoder); + // Prepare batch stream and encoder + batchStream.reset(); + batchEncoder = EncoderFactory.get().directBinaryEncoder(batchStream, batchEncoder); // Reset producers recordProducer.getProducers().forEach(producer -> producer.setPosition(0)); - // Produce a batch + // Produce a batch, writing to the batch stream (buffer) for (int row = 0; row < batch.getRowCount(); row++) { recordProducer.produce(batchEncoder); } batchEncoder.flush(); - // Raw buffer is a view onto the stream backing buffer - do not release - ArrowBuf rawBuffer = batchBuffer.getBuffer(); - - // Compressed buffer is newly allocated and needs to be released - try (ArrowBuf compressedBuffer = compressionCodec.compress(allocator, rawBuffer)) { + // Compress the batch buffer using Avro's codecs + ByteBuffer batchBuffer = ByteBuffer.wrap(batchStream.internalBuffer()); + ByteBuffer batchCompressed = avroCodec.compress(batchBuffer); - // Write Avro block to the main encoder - encoder.writeLong(batch.getRowCount()); - encoder.writeBytes(compressedBuffer.nioBuffer()); - encoder.writeFixed(syncMarker); - } + // Write Avro block to the main encoder + encoder.writeLong(batch.getRowCount()); + encoder.writeBytes(batchCompressed); + encoder.writeFixed(syncMarker); } - // Reset vectors in all the producders + // Reset vectors in all the producers // Supports a stream of VSRs if source VSR is not recycled void resetBatch(VectorSchemaRoot batch) { recordProducer.resetProducerVectors(batch); @@ -187,6 +178,13 @@ public void flush() throws IOException { public void close() throws IOException { encoder.flush(); stream.close(); - batchBuffer.close(); + batchStream.close(); + } + + private static final class BufferOutputStream extends ByteArrayOutputStream { + + byte[] internalBuffer() { + return buf; + } } } From a2b1c49f696ab333e14be72e2c8753b56037501e Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 21:20:30 +0100 Subject: [PATCH 4/7] Remove unused stream implementation --- .../adapter/avro/BufferOutputStream.java | 95 ------------------- 1 file changed, 95 deletions(-) delete mode 100644 adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java deleted file mode 100644 index 1e746989f0..0000000000 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/BufferOutputStream.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.adapter.avro; - -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.util.MemoryUtil; - -import java.io.IOException; -import java.io.OutputStream; - -public class BufferOutputStream extends OutputStream { - - private static final int INITIAL_BUFFER_SIZE = 4096; - - private ArrowBuf buffer; - - public BufferOutputStream(BufferAllocator allocator) { - buffer = allocator.buffer(INITIAL_BUFFER_SIZE); - } - - public ArrowBuf getBuffer() { - return buffer.slice(0, buffer.writerIndex()); - } - - @Override - public void write(int b) throws IOException { - ensureCapacity(buffer.capacity() + 1); - buffer.writeByte(b); - } - - @Override - public void write(byte[] b) throws IOException { - ensureCapacity(buffer.capacity() + b.length); - buffer.writeBytes(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - ensureCapacity(buffer.capacity() + len); - buffer.writeBytes(b, off, len); - } - - @Override - public void flush() { - // no-op - } - - public void reset() { - buffer.clear(); - } - - @Override - public void close() throws IOException { - buffer.close(); - } - - private void ensureCapacity(long capacity) { - - if (capacity > buffer.capacity()) { - - long newCapacity = Math.max(capacity, buffer.capacity() * 2); - - BufferAllocator allocator = buffer.getReferenceManager().getAllocator(); - ArrowBuf newBuffer = allocator.buffer(newCapacity); - - try { - - MemoryUtil.copyMemory(buffer.memoryAddress(), newBuffer.memoryAddress(), buffer.writerIndex()); - - buffer.close(); - buffer = newBuffer; - } - catch (Throwable t) { - newBuffer.close(); - throw t; - } - } - } -} From 2704c14c750cbdc98aee1e61db587459403eb799 Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 21:25:48 +0100 Subject: [PATCH 5/7] Class to give out Avro compression codecs --- .../arrow/adapter/avro/AvroCompression.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java new file mode 100644 index 0000000000..91667c2b2a --- /dev/null +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.avro; + +import org.apache.avro.file.*; + +import java.nio.ByteBuffer; + + +public class AvroCompression { + + public static Codec getAvroCodec(String codecName) { + + if (codecName == null || DataFileConstants.NULL_CODEC.equals(codecName)) { + return new NullCodec(); + } + + switch (codecName) { + case DataFileConstants.DEFLATE_CODEC: + return new DeflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL); + case DataFileConstants.BZIP2_CODEC: + return new BZip2Codec(); + case DataFileConstants.XZ_CODEC: + return new XZCodec(CodecFactory.DEFAULT_XZ_LEVEL); + case DataFileConstants.ZSTANDARD_CODEC: + return new ZstandardCodec(CodecFactory.DEFAULT_ZSTANDARD_LEVEL, false, false); + } + + throw new IllegalArgumentException("Unsupported codec: " + codecName); + } + + private static class NullCodec extends Codec { + + @Override + public String getName() { + return DataFileConstants.NULL_CODEC; + } + + @Override + public ByteBuffer compress(ByteBuffer buffer) { + return buffer; + } + + @Override + public ByteBuffer decompress(ByteBuffer buffer) { + return buffer; + } + + @Override + public boolean equals(Object other) { + if (this == other) + return true; + return (other != null && other.getClass() == getClass()); + } + + @Override + public int hashCode() { + return 2; + } + } + +} \ No newline at end of file From f6c0397086117c1eb082c2678924a62b5b892444 Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 22:02:31 +0100 Subject: [PATCH 6/7] Fix one comment --- .../main/java/org/apache/arrow/adapter/avro/AvroFileReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java index 489a9146d6..a06b8a01e5 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java @@ -241,13 +241,13 @@ boolean readBatch() throws IOException { batchBuffer = decoder.readBytes(batchBuffer); decoder.readFixed(batchSyncMarker); - // Validate sync marker - mismatch indicates a corrupt file long batchSize = zigzagSize(nRows) + zigzagSize(batchBuffer.remaining()) + batchBuffer.remaining() + SYNC_MARKER_SIZE; + // Validate sync marker - mismatch indicates a corrupt file int validateMarker = BinaryData.compareBytes( syncMarker, 0, SYNC_MARKER_SIZE, batchSyncMarker, 0, SYNC_MARKER_SIZE); From 621707ed671fc2560a1cb5f67a0c4b5d32af7995 Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Mon, 21 Jul 2025 22:29:48 +0100 Subject: [PATCH 7/7] Apply spotless --- .../arrow/adapter/avro/AvroCompression.java | 11 +--- .../arrow/adapter/avro/AvroFileReader.java | 58 +++++++++---------- .../arrow/adapter/avro/AvroFileWriter.java | 55 ++++++++---------- 3 files changed, 52 insertions(+), 72 deletions(-) diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java index 91667c2b2a..7579fe6c06 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroCompression.java @@ -14,13 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.arrow.adapter.avro; -import org.apache.avro.file.*; - import java.nio.ByteBuffer; - +import org.apache.avro.file.*; public class AvroCompression { @@ -63,8 +60,7 @@ public ByteBuffer decompress(ByteBuffer buffer) { @Override public boolean equals(Object other) { - if (this == other) - return true; + if (this == other) return true; return (other != null && other.getClass() == getClass()); } @@ -73,5 +69,4 @@ public int hashCode() { return 2; } } - -} \ No newline at end of file +} diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java index a06b8a01e5..7210312e16 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileReader.java @@ -14,9 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.arrow.adapter.avro; +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import org.apache.arrow.adapter.avro.consumers.CompositeAvroConsumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; @@ -29,14 +34,6 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - - class AvroFileReader implements DictionaryProvider { // Writer owns a channel / decoder and will close them @@ -69,12 +66,9 @@ class AvroFileReader implements DictionaryProvider { // Create a new AvroFileReader for the input stream // In order to support non-blocking mode, the stream must support mark / reset - public AvroFileReader( - InputStream stream, - BufferAllocator allocator, - boolean blocking) { + public AvroFileReader(InputStream stream, BufferAllocator allocator, boolean blocking) { - this.stream =stream; + this.stream = stream; this.allocator = allocator; this.blocking = blocking; @@ -82,7 +76,8 @@ public AvroFileReader( this.decoder = DecoderFactory.get().binaryDecoder(stream, null); } else { if (!stream.markSupported()) { - throw new IllegalArgumentException("Input stream must support mark/reset for non-blocking mode"); + throw new IllegalArgumentException( + "Input stream must support mark/reset for non-blocking mode"); } this.decoder = DecoderFactory.get().directBinaryDecoder(stream, null); } @@ -107,9 +102,8 @@ void readHeader() throws IOException { headerSize += magic.length; // Validate Avro magic - int validateMagic = BinaryData.compareBytes( - AVRO_MAGIC, 0, AVRO_MAGIC.length, - magic, 0, AVRO_MAGIC.length); + int validateMagic = + BinaryData.compareBytes(AVRO_MAGIC, 0, AVRO_MAGIC.length, magic, 0, AVRO_MAGIC.length); if (validateMagic != 0) { throw new RuntimeException("Invalid AVRO data file: The file is not an Avro file"); @@ -126,7 +120,7 @@ void readHeader() throws IOException { ByteBuffer valueBuffer = decoder.readBytes(null); headerSize += zigzagSize(keyBuffer.remaining()) + keyBuffer.remaining(); - headerSize += zigzagSize(valueBuffer.remaining()) + valueBuffer.remaining(); + headerSize += zigzagSize(valueBuffer.remaining()) + valueBuffer.remaining(); String key = new String(keyBuffer.array(), StandardCharsets.UTF_8); @@ -186,8 +180,7 @@ private String processCodec(ByteBuffer buffer) { if (buffer != null && buffer.remaining() > 0) { return new String(buffer.array(), StandardCharsets.UTF_8); - } - else { + } else { return DataFileConstants.NULL_CODEC; } } @@ -242,22 +235,24 @@ boolean readBatch() throws IOException { decoder.readFixed(batchSyncMarker); long batchSize = - zigzagSize(nRows) + - zigzagSize(batchBuffer.remaining()) + - batchBuffer.remaining() + - SYNC_MARKER_SIZE; + zigzagSize(nRows) + + zigzagSize(batchBuffer.remaining()) + + batchBuffer.remaining() + + SYNC_MARKER_SIZE; // Validate sync marker - mismatch indicates a corrupt file - int validateMarker = BinaryData.compareBytes( - syncMarker, 0, SYNC_MARKER_SIZE, - batchSyncMarker, 0, SYNC_MARKER_SIZE); + int validateMarker = + BinaryData.compareBytes( + syncMarker, 0, SYNC_MARKER_SIZE, batchSyncMarker, 0, SYNC_MARKER_SIZE); if (validateMarker != 0) { throw new RuntimeException("Invalid AVRO data file: The file is corrupted"); } // Reset producers - recordConsumer.getConsumers().forEach(consumer -> ensureCapacity(consumer.getVector(), (int) nRows)); + recordConsumer + .getConsumers() + .forEach(consumer -> ensureCapacity(consumer.getVector(), (int) nRows)); recordConsumer.getConsumers().forEach(consumer -> consumer.setPosition(0)); // Decompress the batch buffer using Avro's codecs @@ -299,7 +294,7 @@ boolean hasNextBatch() throws IOException { } if (blocking) { - return ! decoder.isEnd(); + return !decoder.isEnd(); } var in = decoder.inputStream(); @@ -311,8 +306,7 @@ boolean hasNextBatch() throws IOException { in.reset(); return nextByte >= 0; - } - catch(EOFException e) { + } catch (EOFException e) { return false; } } diff --git a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java index 6afc7b7c70..1c4ef89440 100644 --- a/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java +++ b/adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroFileWriter.java @@ -14,9 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.arrow.adapter.avro; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; @@ -27,16 +34,6 @@ import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - - class AvroFileWriter { // Use magic from Avro's own constants @@ -56,11 +53,8 @@ class AvroFileWriter { private final CompositeAvroProducer recordProducer; private final Codec avroCodec; - public AvroFileWriter( - OutputStream stream, - VectorSchemaRoot firstBatch, - DictionaryProvider dictionaries) + OutputStream stream, VectorSchemaRoot firstBatch, DictionaryProvider dictionaries) throws IOException { this(stream, firstBatch, dictionaries, null); @@ -73,24 +67,22 @@ public AvroFileWriter( String codecName) throws IOException { - EncoderFactory encoderFactory = EncoderFactory.get(); + EncoderFactory encoderFactory = EncoderFactory.get(); - this.stream = stream; - this.encoder = encoderFactory.binaryEncoder(stream, null); + this.stream = stream; + this.encoder = encoderFactory.binaryEncoder(stream, null); - this.batchStream = new BufferOutputStream(); - this.batchEncoder = encoderFactory.binaryEncoder(stream, null); - this.batch = firstBatch; + this.batchStream = new BufferOutputStream(); + this.batchEncoder = encoderFactory.binaryEncoder(stream, null); + this.batch = firstBatch; try { - this.avroSchema = ArrowToAvroUtils.createAvroSchema( - firstBatch.getSchema().getFields(), - dictionaries); + this.avroSchema = + ArrowToAvroUtils.createAvroSchema(firstBatch.getSchema().getFields(), dictionaries); - this.recordProducer = ArrowToAvroUtils.createCompositeProducer( - firstBatch.getFieldVectors(), - dictionaries); + this.recordProducer = + ArrowToAvroUtils.createCompositeProducer(firstBatch.getFieldVectors(), dictionaries); // Generate a random sync marker var random = new Random(); @@ -99,11 +91,10 @@ public AvroFileWriter( // Look up the compression codec this.avroCodec = AvroCompression.getAvroCodec(codecName); - } - catch (Throwable e) { - // Do not leak the batch buffer if there are problems during setup - batchStream.close(); - throw e; + } catch (Throwable e) { + // Do not leak the batch buffer if there are problems during setup + batchStream.close(); + throw e; } }