From 7ad889d1a19386b45c670926b86567cbaaf96bac Mon Sep 17 00:00:00 2001 From: abson Date: Tue, 20 Jun 2017 14:01:13 +0200 Subject: [PATCH] Added compression codec to HDFS filenames --- .../io/confluent/connect/hdfs/DataWriter.java | 17 +++++++-- .../hdfs/HdfsSinkConnectorConstants.java | 2 +- .../connect/hdfs/RecordWriterProvider.java | 1 + .../connect/hdfs/TopicPartitionWriter.java | 4 +-- .../hdfs/avro/AvroRecordWriterProvider.java | 12 +++++++ .../connect/hdfs/parquet/ParquetFormat.java | 11 +++++- .../parquet/ParquetRecordWriterProvider.java | 29 ++++++++++++--- .../connect/hdfs/FailureRecoveryTest.java | 2 +- .../connect/hdfs/avro/DataWriterAvroTest.java | 17 ++++++--- .../hdfs/parquet/DataWriterParquetTest.java | 36 +++++++++++++++++-- .../utils/MemoryRecordWriterProvider.java | 3 ++ 11 files changed, 115 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/confluent/connect/hdfs/DataWriter.java b/src/main/java/io/confluent/connect/hdfs/DataWriter.java index dd9f3899f..06e4be461 100644 --- a/src/main/java/io/confluent/connect/hdfs/DataWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/DataWriter.java @@ -385,6 +385,15 @@ public Map getTempFileNames(TopicPartition tp) { return topicPartitionWriter.getTempFiles(); } + /** + * Get the file extension with compression codec (i.e .gz.parquet) + * All partitions shall have the same file extension (they all share the same connectorConfig) + * @return + */ + public String getExtension() { + return writerProvider.getCompressionCodecAndExtension(); + } + private void createDir(String dir) throws IOException { String path = url + "/" + dir; if (!storage.exists(path)) { @@ -396,10 +405,14 @@ private void createDir(String dir) throws IOException { private Format getFormat() throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{ final String className = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG); - if (className.equals("io.confluent.connect.hdfs.avro.AvroFormat")) { + + switch (className) { + case "io.confluent.connect.hdfs.avro.AvroFormat": + case "io.confluent.connect.hdfs.parquet.ParquetFormat": return (Format) Class.forName(className).getConstructor(HdfsSinkConnectorConfig.class).newInstance(new Object[] {connectorConfig}); + default: + return ((Class) Class.forName(className)).newInstance(); } - return ((Class) Class.forName(className)).newInstance(); } private String getPartitionValue(String path) { diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConstants.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConstants.java index f4c8039ee..29a975f2a 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConstants.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConstants.java @@ -22,7 +22,7 @@ public class HdfsSinkConnectorConstants { // groups: topic, partition, start offset, end offset, extension // Also see legalChars in Topic.scala - public static final Pattern COMMITTED_FILENAME_PATTERN = Pattern.compile("([a-zA-Z0-9\\._\\-]+)\\+(\\d+)\\+(\\d+)\\+(\\d+)(.\\w+)?"); + public static final Pattern COMMITTED_FILENAME_PATTERN = Pattern.compile("([a-zA-Z0-9\\._\\-]+)\\+(\\d+)\\+(\\d+)\\+(\\d+)(.\\w+){0,2}"); public static final int PATTERN_TOPIC_GROUP = 1; public static final int PATTERN_PARTITION_GROUP = 2; public static final int PATTERN_START_OFFSET_GROUP = 3; diff --git a/src/main/java/io/confluent/connect/hdfs/RecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/RecordWriterProvider.java index 6507cab1a..1beb928ed 100644 --- a/src/main/java/io/confluent/connect/hdfs/RecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/RecordWriterProvider.java @@ -23,5 +23,6 @@ public interface RecordWriterProvider { String getExtension(); + String getCompressionCodecAndExtension(); RecordWriter getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException; } diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 68edabd37..a8830d20c 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -158,7 +158,7 @@ public TopicPartitionWriter( failureTime = -1L; offset = -1L; sawInvalidOffset = false; - extension = writerProvider.getExtension(); + extension = writerProvider.getCompressionCodecAndExtension(); zeroPadOffsetFormat = "%0" + connectorConfig.getInt(HdfsSinkConnectorConfig.FILENAME_OFFSET_ZERO_PAD_WIDTH_CONFIG) + @@ -405,7 +405,7 @@ public Map getTempFiles() { } public String getExtension() { - return writerProvider.getExtension(); + return writerProvider.getCompressionCodecAndExtension(); } private String getDirectory(String encodedPartition) { diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java index 6baeb3bcc..21dfd748d 100644 --- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java @@ -23,6 +23,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumWriter; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -52,6 +53,17 @@ public String getExtension() { return EXTENSION; } + @Override + public String getCompressionCodecAndExtension() { + final String compressionCodec = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG); + + if (StringUtils.isBlank(compressionCodec)) { + return EXTENSION; + } + + return "." + compressionCodec + EXTENSION; + } + @Override public RecordWriter getRecordWriter(Configuration conf, final String fileName, SinkRecord record, final AvroData avroData) diff --git a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetFormat.java b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetFormat.java index 2d930e5aa..e206ed102 100644 --- a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetFormat.java +++ b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetFormat.java @@ -23,14 +23,23 @@ import io.confluent.connect.hdfs.hive.HiveUtil; public class ParquetFormat implements Format { + HdfsSinkConnectorConfig connectorConfig; + + public ParquetFormat(HdfsSinkConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override public RecordWriterProvider getRecordWriterProvider() { - return new ParquetRecordWriterProvider(); + return new ParquetRecordWriterProvider(connectorConfig); } + @Override public SchemaFileReader getSchemaFileReader(AvroData avroData) { return new ParquetFileReader(avroData); } + @Override public HiveUtil getHiveUtil(HdfsSinkConnectorConfig config, AvroData avroData, HiveMetaStore hiveMetaStore) { return new ParquetHiveUtil(config, avroData, hiveMetaStore); } diff --git a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java index 574111b54..68a3dc0da 100644 --- a/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/parquet/ParquetRecordWriterProvider.java @@ -40,18 +40,37 @@ public class ParquetRecordWriterProvider implements RecordWriterProvider { private final static String EXTENSION = ".parquet"; + private HdfsSinkConnectorConfig connectorConfig; + + public ParquetRecordWriterProvider(HdfsSinkConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + @Override public String getExtension() { return EXTENSION; } + @Override + public String getCompressionCodecAndExtension() { + final CompressionCodecName compressionCodecName = getCompressionCodecName(); + + if (compressionCodecName == CompressionCodecName.UNCOMPRESSED) { + return EXTENSION; + } else if (compressionCodecName == CompressionCodecName.GZIP) { + return ".gz" + EXTENSION; + } + + return "." + compressionCodecName.toString().toLowerCase() + EXTENSION; + } + @Override public RecordWriter getRecordWriter( Configuration conf, final String fileName, SinkRecord record, final AvroData avroData) throws IOException { final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema()); - final CompressionCodecName compressionCodecName = getCompressionCodecName(conf); + final CompressionCodecName compressionCodecName = getCompressionCodecName(); int blockSize = 256 * 1024 * 1024; int pageSize = 64 * 1024; @@ -74,15 +93,15 @@ public void close() throws IOException { }; } - private CompressionCodecName getCompressionCodecName(Configuration conf) { - String compressionCodecClassName = conf.get(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, - "snappy"); + private CompressionCodecName getCompressionCodecName() { + String compressionCodecClassName = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG); + switch(compressionCodecClassName) { case "uncompressed": return CompressionCodecName.UNCOMPRESSED; case "snappy": return CompressionCodecName.SNAPPY; case "gzip": return CompressionCodecName.GZIP; case "lzo": return CompressionCodecName.LZO; - default: throw new ConfigException("Invalid "+HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG+" value for parquet: "+compressionCodecClassName); + default: return CompressionCodecName.SNAPPY; // Use Snappy as default compression codec } } } diff --git a/src/test/java/io/confluent/connect/hdfs/FailureRecoveryTest.java b/src/test/java/io/confluent/connect/hdfs/FailureRecoveryTest.java index 6e3ce5e78..fe31971ce 100644 --- a/src/test/java/io/confluent/connect/hdfs/FailureRecoveryTest.java +++ b/src/test/java/io/confluent/connect/hdfs/FailureRecoveryTest.java @@ -34,7 +34,6 @@ public class FailureRecoveryTest extends HdfsSinkConnectorTestBase { private static final String ZERO_PAD_FMT = "%010d"; - private static final String extension = ""; @Before public void setUp() throws Exception { @@ -128,6 +127,7 @@ public void testWriterFailureMultiPartitions() throws Exception { assertEquals(context.timeout(), (long) connectorConfig.getLong(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG)); + String extension = hdfsWriter.getExtension(); Map> data = Data.getData(); String directory2 = TOPIC + "/" + "partition=" + String.valueOf(PARTITION2); long[] validOffsets = {-1, 2, 5}; diff --git a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java index 494fc23bb..6fc2573ac 100644 --- a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java +++ b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java @@ -50,7 +50,6 @@ public class DataWriterAvroTest extends TestWithMiniDFSCluster { - private static final String extension = ".avro"; private static final String ZERO_PAD_FMT = "%010d"; private SchemaFileReader schemaFileReader = new AvroFileReader(avroData); @@ -92,6 +91,7 @@ private void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) t sinkRecords.add(sinkRecord); } hdfsWriter.write(sinkRecords); + String extension = hdfsWriter.getExtension(); hdfsWriter.close(assignment); hdfsWriter.stop(); @@ -107,6 +107,7 @@ private void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) t new Path(FileUtils .committedFileName(url, topicsDir, directory, TOPIC_PARTITION, startOffset, endOffset, extension, ZERO_PAD_FMT)); + Collection records = schemaFileReader.readData(conf, path); long size = endOffset - startOffset + 1; assertEquals(size, records.size()); @@ -131,6 +132,8 @@ public void testRecovery() throws Exception { Set committedFiles = new HashSet<>(); String directory = TOPIC + "/" + "partition=" + String.valueOf(PARTITION); + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); for (int i = 0; i < 5; ++i) { long startOffset = i * 10; @@ -145,7 +148,6 @@ public void testRecovery() throws Exception { wal.append(WAL.endMarker, ""); wal.close(); - DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); hdfsWriter.recover(TOPIC_PARTITION); Map offsets = context.offsets(); assertTrue(offsets.containsKey(TOPIC_PARTITION)); @@ -178,6 +180,7 @@ public void testRecovery() throws Exception { @Test public void testWriteRecordMultiplePartitions() throws Exception { DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); for (TopicPartition tp: assignment) { hdfsWriter.recover(tp); @@ -221,6 +224,9 @@ public void testWriteRecordMultiplePartitions() throws Exception { @Test public void testGetPreviousOffsets() throws Exception { + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); + String directory = TOPIC + "/" + "partition=" + String.valueOf(PARTITION); long[] startOffsets = {0, 3}; long[] endOffsets = {2, 5}; @@ -236,9 +242,7 @@ public void testGetPreviousOffsets() throws Exception { path = new Path(FileUtils.fileName(url, topicsDir, directory, "abcd")); fs.createNewFile(path); - DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); hdfsWriter.recover(TOPIC_PARTITION); - Map committedOffsets = hdfsWriter.getCommittedOffsets(); assertTrue(committedOffsets.containsKey(TOPIC_PARTITION)); @@ -252,6 +256,7 @@ public void testGetPreviousOffsets() throws Exception { @Test public void testWriteRecordNonZeroInitailOffset() throws Exception { DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); Partitioner partitioner = hdfsWriter.getPartitioner(); hdfsWriter.recover(TOPIC_PARTITION); @@ -292,6 +297,7 @@ public void testWriteRecordNonZeroInitailOffset() throws Exception { @Test public void testRebalance() throws Exception { DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); // Initial assignment is {TP1, TP2} for (TopicPartition tp: assignment) { @@ -400,6 +406,7 @@ public void testProjectBackWard() throws Exception { HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); hdfsWriter.recover(TOPIC_PARTITION); String key = "key"; @@ -456,6 +463,7 @@ public void testProjectNone() throws Exception { HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); hdfsWriter.recover(TOPIC_PARTITION); String key = "key"; @@ -518,6 +526,7 @@ public void testProjectForward() throws Exception { HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + String extension = hdfsWriter.getExtension(); hdfsWriter.recover(TOPIC_PARTITION); String key = "key"; diff --git a/src/test/java/io/confluent/connect/hdfs/parquet/DataWriterParquetTest.java b/src/test/java/io/confluent/connect/hdfs/parquet/DataWriterParquetTest.java index 0a0c39977..a95aa2585 100644 --- a/src/test/java/io/confluent/connect/hdfs/parquet/DataWriterParquetTest.java +++ b/src/test/java/io/confluent/connect/hdfs/parquet/DataWriterParquetTest.java @@ -14,7 +14,6 @@ package io.confluent.connect.hdfs.parquet; - import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -36,7 +35,6 @@ public class DataWriterParquetTest extends TestWithMiniDFSCluster { private static final String ZERO_PAD_FMT = "%010d"; - private static final String extension = ".parquet"; private final SchemaFileReader schemaFileReader = new ParquetFileReader(avroData); @Override @@ -48,13 +46,44 @@ protected Map createProps() { @Test public void testWriteRecord() throws Exception { - DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData); + testWriteRecordBase(connectorConfig); + } + + @Test + public void testWriteSnappyCompressedRecord() throws Exception { + Map props = createProps(); + props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "snappy"); + HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); + testWriteRecordBase(connectorConfig); + } + + @Test + public void testWriteGzipCompressedRecord() throws Exception { + Map props = createProps(); + props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "gzip"); + HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); + testWriteRecordBase(connectorConfig); + } + + @Test + public void testWriteUncompressedRecord() throws Exception { + Map props = createProps(); + props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "uncompressed"); + HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); + testWriteRecordBase(connectorConfig); + } + + // lzo compression doesn't work out of the box with current tests, skipping + + public void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) throws Exception { + DataWriter hdfsWriter = new DataWriter(connectorConfigParam, context, avroData); Partitioner partitioner = hdfsWriter.getPartitioner(); hdfsWriter.recover(TOPIC_PARTITION); String key = "key"; Schema schema = createSchema(); Struct record = createRecord(schema); + String extension = hdfsWriter.getExtension(); Collection sinkRecords = new ArrayList<>(); for (long offset = 0; offset < 7; offset++) { @@ -63,6 +92,7 @@ public void testWriteRecord() throws Exception { sinkRecords.add(sinkRecord); } + hdfsWriter.write(sinkRecords); hdfsWriter.close(assignment); hdfsWriter.stop(); diff --git a/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriterProvider.java b/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriterProvider.java index d173eb526..a7554b9a9 100644 --- a/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriterProvider.java +++ b/src/test/java/io/confluent/connect/hdfs/utils/MemoryRecordWriterProvider.java @@ -35,6 +35,9 @@ public String getExtension() { return ""; } + @Override + public String getCompressionCodecAndExtension() { return ""; } + @Override public RecordWriter getRecordWriter( Configuration conf, final String fileName, SinkRecord record, final AvroData avroData)