Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,15 @@ public Map<String, String> getTempFileNames(TopicPartition tp) {
return topicPartitionWriter.getTempFiles();
}

/**
* Get the file extension with compression codec (i.e .gz.parquet)
* All partitions shall have the same file extension (they all share the same connectorConfig)
* @return
*/
public String getExtension() {
return writerProvider.getCompressionCodecAndExtension();
}

private void createDir(String dir) throws IOException {
String path = url + "/" + dir;
if (!storage.exists(path)) {
Expand All @@ -396,10 +405,14 @@ private void createDir(String dir) throws IOException {
private Format getFormat() throws ClassNotFoundException, IllegalAccessException, InstantiationException,
NoSuchMethodException, SecurityException, IllegalArgumentException, InvocationTargetException{
final String className = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
if (className.equals("io.confluent.connect.hdfs.avro.AvroFormat")) {

switch (className) {
case "io.confluent.connect.hdfs.avro.AvroFormat":
case "io.confluent.connect.hdfs.parquet.ParquetFormat":
return (Format) Class.forName(className).getConstructor(HdfsSinkConnectorConfig.class).newInstance(new Object[] {connectorConfig});
default:
return ((Class<Format>) Class.forName(className)).newInstance();
}
return ((Class<Format>) Class.forName(className)).newInstance();
}

private String getPartitionValue(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class HdfsSinkConnectorConstants {

// groups: topic, partition, start offset, end offset, extension
// Also see legalChars in Topic.scala
public static final Pattern COMMITTED_FILENAME_PATTERN = Pattern.compile("([a-zA-Z0-9\\._\\-]+)\\+(\\d+)\\+(\\d+)\\+(\\d+)(.\\w+)?");
public static final Pattern COMMITTED_FILENAME_PATTERN = Pattern.compile("([a-zA-Z0-9\\._\\-]+)\\+(\\d+)\\+(\\d+)\\+(\\d+)(.\\w+){0,2}");
public static final int PATTERN_TOPIC_GROUP = 1;
public static final int PATTERN_PARTITION_GROUP = 2;
public static final int PATTERN_START_OFFSET_GROUP = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@

public interface RecordWriterProvider {
String getExtension();
String getCompressionCodecAndExtension();
RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public TopicPartitionWriter(
failureTime = -1L;
offset = -1L;
sawInvalidOffset = false;
extension = writerProvider.getExtension();
extension = writerProvider.getCompressionCodecAndExtension();
zeroPadOffsetFormat
= "%0" +
connectorConfig.getInt(HdfsSinkConnectorConfig.FILENAME_OFFSET_ZERO_PAD_WIDTH_CONFIG) +
Expand Down Expand Up @@ -405,7 +405,7 @@ public Map<String, String> getTempFiles() {
}

public String getExtension() {
return writerProvider.getExtension();
return writerProvider.getCompressionCodecAndExtension();
}

private String getDirectory(String encodedPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -52,6 +53,17 @@ public String getExtension() {
return EXTENSION;
}

@Override
public String getCompressionCodecAndExtension() {
final String compressionCodec = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG);

if (StringUtils.isBlank(compressionCodec)) {
return EXTENSION;
}

return "." + compressionCodec + EXTENSION;
}

@Override
public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, final String fileName,
SinkRecord record, final AvroData avroData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@
import io.confluent.connect.hdfs.hive.HiveUtil;

public class ParquetFormat implements Format {
HdfsSinkConnectorConfig connectorConfig;

public ParquetFormat(HdfsSinkConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}

@Override
public RecordWriterProvider getRecordWriterProvider() {
return new ParquetRecordWriterProvider();
return new ParquetRecordWriterProvider(connectorConfig);
}

@Override
public SchemaFileReader getSchemaFileReader(AvroData avroData) {
return new ParquetFileReader(avroData);
}

@Override
public HiveUtil getHiveUtil(HdfsSinkConnectorConfig config, AvroData avroData, HiveMetaStore hiveMetaStore) {
return new ParquetHiveUtil(config, avroData, hiveMetaStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,37 @@ public class ParquetRecordWriterProvider implements RecordWriterProvider {

private final static String EXTENSION = ".parquet";

private HdfsSinkConnectorConfig connectorConfig;

public ParquetRecordWriterProvider(HdfsSinkConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}

@Override
public String getExtension() {
return EXTENSION;
}

@Override
public String getCompressionCodecAndExtension() {
final CompressionCodecName compressionCodecName = getCompressionCodecName();

if (compressionCodecName == CompressionCodecName.UNCOMPRESSED) {
return EXTENSION;
} else if (compressionCodecName == CompressionCodecName.GZIP) {
return ".gz" + EXTENSION;
}

return "." + compressionCodecName.toString().toLowerCase() + EXTENSION;
}

@Override
public RecordWriter<SinkRecord> getRecordWriter(
Configuration conf, final String fileName, SinkRecord record, final AvroData avroData)
throws IOException {
final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema());

final CompressionCodecName compressionCodecName = getCompressionCodecName(conf);
final CompressionCodecName compressionCodecName = getCompressionCodecName();

int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;
Expand All @@ -74,15 +93,15 @@ public void close() throws IOException {
};
}

private CompressionCodecName getCompressionCodecName(Configuration conf) {
String compressionCodecClassName = conf.get(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG,
"snappy");
private CompressionCodecName getCompressionCodecName() {
String compressionCodecClassName = connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG);

switch(compressionCodecClassName) {
case "uncompressed": return CompressionCodecName.UNCOMPRESSED;
case "snappy": return CompressionCodecName.SNAPPY;
case "gzip": return CompressionCodecName.GZIP;
case "lzo": return CompressionCodecName.LZO;
default: throw new ConfigException("Invalid "+HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG+" value for parquet: "+compressionCodecClassName);
default: return CompressionCodecName.SNAPPY; // Use Snappy as default compression codec
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

public class FailureRecoveryTest extends HdfsSinkConnectorTestBase {
private static final String ZERO_PAD_FMT = "%010d";
private static final String extension = "";

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -128,6 +127,7 @@ public void testWriterFailureMultiPartitions() throws Exception {

assertEquals(context.timeout(), (long) connectorConfig.getLong(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG));

String extension = hdfsWriter.getExtension();
Map<String, List<Object>> data = Data.getData();
String directory2 = TOPIC + "/" + "partition=" + String.valueOf(PARTITION2);
long[] validOffsets = {-1, 2, 5};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

public class DataWriterAvroTest extends TestWithMiniDFSCluster {

private static final String extension = ".avro";
private static final String ZERO_PAD_FMT = "%010d";
private SchemaFileReader schemaFileReader = new AvroFileReader(avroData);

Expand Down Expand Up @@ -92,6 +91,7 @@ private void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) t
sinkRecords.add(sinkRecord);
}
hdfsWriter.write(sinkRecords);
String extension = hdfsWriter.getExtension();
hdfsWriter.close(assignment);
hdfsWriter.stop();

Expand All @@ -107,6 +107,7 @@ private void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) t
new Path(FileUtils
.committedFileName(url, topicsDir, directory, TOPIC_PARTITION, startOffset,
endOffset, extension, ZERO_PAD_FMT));

Collection<Object> records = schemaFileReader.readData(conf, path);
long size = endOffset - startOffset + 1;
assertEquals(size, records.size());
Expand All @@ -131,6 +132,8 @@ public void testRecovery() throws Exception {
Set<String> committedFiles = new HashSet<>();

String directory = TOPIC + "/" + "partition=" + String.valueOf(PARTITION);
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();

for (int i = 0; i < 5; ++i) {
long startOffset = i * 10;
Expand All @@ -145,7 +148,6 @@ public void testRecovery() throws Exception {
wal.append(WAL.endMarker, "");
wal.close();

DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
hdfsWriter.recover(TOPIC_PARTITION);
Map<TopicPartition, Long> offsets = context.offsets();
assertTrue(offsets.containsKey(TOPIC_PARTITION));
Expand Down Expand Up @@ -178,6 +180,7 @@ public void testRecovery() throws Exception {
@Test
public void testWriteRecordMultiplePartitions() throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();

for (TopicPartition tp: assignment) {
hdfsWriter.recover(tp);
Expand Down Expand Up @@ -221,6 +224,9 @@ public void testWriteRecordMultiplePartitions() throws Exception {

@Test
public void testGetPreviousOffsets() throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();

String directory = TOPIC + "/" + "partition=" + String.valueOf(PARTITION);
long[] startOffsets = {0, 3};
long[] endOffsets = {2, 5};
Expand All @@ -236,9 +242,7 @@ public void testGetPreviousOffsets() throws Exception {
path = new Path(FileUtils.fileName(url, topicsDir, directory, "abcd"));
fs.createNewFile(path);

DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
hdfsWriter.recover(TOPIC_PARTITION);

Map<TopicPartition, Long> committedOffsets = hdfsWriter.getCommittedOffsets();

assertTrue(committedOffsets.containsKey(TOPIC_PARTITION));
Expand All @@ -252,6 +256,7 @@ public void testGetPreviousOffsets() throws Exception {
@Test
public void testWriteRecordNonZeroInitailOffset() throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();
Partitioner partitioner = hdfsWriter.getPartitioner();
hdfsWriter.recover(TOPIC_PARTITION);

Expand Down Expand Up @@ -292,6 +297,7 @@ public void testWriteRecordNonZeroInitailOffset() throws Exception {
@Test
public void testRebalance() throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();

// Initial assignment is {TP1, TP2}
for (TopicPartition tp: assignment) {
Expand Down Expand Up @@ -400,6 +406,7 @@ public void testProjectBackWard() throws Exception {
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);

DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();
hdfsWriter.recover(TOPIC_PARTITION);

String key = "key";
Expand Down Expand Up @@ -456,6 +463,7 @@ public void testProjectNone() throws Exception {
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);

DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();
hdfsWriter.recover(TOPIC_PARTITION);

String key = "key";
Expand Down Expand Up @@ -518,6 +526,7 @@ public void testProjectForward() throws Exception {
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);

DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
String extension = hdfsWriter.getExtension();
hdfsWriter.recover(TOPIC_PARTITION);

String key = "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.confluent.connect.hdfs.parquet;


import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -36,7 +35,6 @@

public class DataWriterParquetTest extends TestWithMiniDFSCluster {
private static final String ZERO_PAD_FMT = "%010d";
private static final String extension = ".parquet";
private final SchemaFileReader schemaFileReader = new ParquetFileReader(avroData);

@Override
Expand All @@ -48,13 +46,44 @@ protected Map<String, String> createProps() {

@Test
public void testWriteRecord() throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData);
testWriteRecordBase(connectorConfig);
}

@Test
public void testWriteSnappyCompressedRecord() throws Exception {
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "snappy");
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
testWriteRecordBase(connectorConfig);
}

@Test
public void testWriteGzipCompressedRecord() throws Exception {
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "gzip");
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
testWriteRecordBase(connectorConfig);
}

@Test
public void testWriteUncompressedRecord() throws Exception {
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_COMPRESSION_CONFIG, "uncompressed");
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
testWriteRecordBase(connectorConfig);
}

// lzo compression doesn't work out of the box with current tests, skipping

public void testWriteRecordBase(HdfsSinkConnectorConfig connectorConfigParam) throws Exception {
DataWriter hdfsWriter = new DataWriter(connectorConfigParam, context, avroData);
Partitioner partitioner = hdfsWriter.getPartitioner();
hdfsWriter.recover(TOPIC_PARTITION);

String key = "key";
Schema schema = createSchema();
Struct record = createRecord(schema);
String extension = hdfsWriter.getExtension();

Collection<SinkRecord> sinkRecords = new ArrayList<>();
for (long offset = 0; offset < 7; offset++) {
Expand All @@ -63,6 +92,7 @@ public void testWriteRecord() throws Exception {

sinkRecords.add(sinkRecord);
}

hdfsWriter.write(sinkRecords);
hdfsWriter.close(assignment);
hdfsWriter.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public String getExtension() {
return "";
}

@Override
public String getCompressionCodecAndExtension() { return ""; }

@Override
public RecordWriter<SinkRecord> getRecordWriter(
Configuration conf, final String fileName, SinkRecord record, final AvroData avroData)
Expand Down