Skip to content

Commit

Permalink
PARQUET-480: Update for Cascading 3.0
Browse files Browse the repository at this point in the history
The code in parquet-cascading is adapted to the API as of Cascading 2.5.3

Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo<JobConf> to requiring a Foo<? extends JobConf>

Author: Cyrille Chépélov (TP12) <[email protected]>

Closes apache#284 from cchepelov/try_cascading3 and squashes the following commits:

e7d1304 [Cyrille Chépélov (TP12)] Adding a @deprecated notice on parquet-cascading's remaining classes
05a417d [Cyrille Chépélov (TP12)] cascading2/3: share back TupleWriteSupport.java (accidentally unmerged)
7fff2d4 [Cyrille Chépélov (TP12)] cascading/cascading3: remove duplicates, push common files into parquet-cascading-common23
338a416 [Cyrille Chépélov (TP12)] Removing unwanted file (what?!) + .gitignoring this kind of files
d9f0455 [Cyrille Chépélov (TP12)] TupleEntry#get is now TupleEntry#getObject
a7f490a [Cyrille Chépélov (TP12)] Revert "Missing test conversion to Cascading 3.0"
cc8b870 [Cyrille Chépélov (TP12)] Missing test conversion to Cascading 3.0
2d73512 [Cyrille Chépélov (TP12)] conflicting values can come in one order or the other. Accept both.
33355d5 [Cyrille Chépélov (TP12)] Fix version mismatch (duh!)
7128639 [Cyrille Chépélov (TP12)] non-C locale can break tests implementation (decimal formats)
53aa2f9 [Cyrille Chépélov (TP12)] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes)
  • Loading branch information
Cyrille Chépélov (TP12) authored and julienledem committed Feb 1, 2016
1 parent af9fd05 commit 5769479
Show file tree
Hide file tree
Showing 24 changed files with 949 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ dependency-reduced-pom.xml
parquet-scrooge/.cache
.idea/*
target/
.cache
*~
mvn_install.log
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ sudo make install
Once protobuf and thrift are available in your path, you can build the project by running:

```
mvn clean install
LC_ALL=C mvn clean install
```

## Features
Expand Down
47 changes: 47 additions & 0 deletions parquet-cascading/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,51 @@

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>../parquet-cascading-common23/src/main/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>../parquet-cascading-common23/src/test/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-resource</id>
<phase>generate-test-resources</phase>
<goals>
<goal>add-test-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>../parquet-cascading-common23/src/test/resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
Expand All @@ -115,6 +160,8 @@
<version>0.1.10</version>
<configuration>
<thriftExecutable>${thrift.executable}</thriftExecutable>
<thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
<thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
import org.apache.parquet.thrift.TBaseRecordConverter;

@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {

// In the case of reads, we can read the thrift class from the file metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* @author Avi Bryant
*/

@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{

private static final long serialVersionUID = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* This is an abstract class; implementations are expected to set up their Input/Output Formats
* correctly in the respective Init methods.
*/
@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{

public static final class Config<T> implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@
import java.util.HashMap;
import java.util.Map;

@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class TestParquetTBaseScheme {
final String txtInputPath = "src/test/resources/names.txt";
final String txtInputPath = "target/test-classes/names.txt";
final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
Expand Down
27 changes: 27 additions & 0 deletions parquet-cascading3/REVIEWERS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project:

| Name | Apache Id | github id |
|--------------------|------------|-------------|
| Dmitriy Ryaboy | dvryaboy | dvryaboy |
| Tianshuo Deng | tianshuo | tsdeng |


178 changes: 178 additions & 0 deletions parquet-cascading3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.8.2-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>parquet-cascading3</artifactId>
<packaging>jar</packaging>

<name>Apache Parquet Cascading (for Cascading 3.0 onwards)</name>
<url>https://parquet.apache.org</url>

<repositories>
<repository>
<id>conjars.org</id>
<url>http://conjars.org/repo</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-thrift</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cascading</groupId>
<artifactId>cascading-hadoop</artifactId> <!-- building against cascading-hadoop for Hadoop1, but will use against any backend -->
<version>${cascading3.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- TEMPORARY UNTIL AFTER previous.version &gt;= 1.8.2
(enforcer checks against the API in 1.7.0, this module did not exist back then, therefore it can't succeed)
-->
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<phase>none</phase>
</execution>
</executions>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<!-- /TEMPORARY -->

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>../parquet-cascading-common23/src/main/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>../parquet-cascading-common23/src/test/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-resource</id>
<phase>generate-test-resources</phase>
<goals>
<goal>add-test-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>../parquet-cascading-common23/src/test/resources</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.thrift.tools</groupId>
<artifactId>maven-thrift-plugin</artifactId>
<version>0.1.10</version>
<configuration>
<thriftExecutable>${thrift.executable}</thriftExecutable>
<thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
<thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
</configuration>
<executions>
<execution>
<id>thrift-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.cascading;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.thrift.TBase;

import cascading.flow.FlowProcess;
import cascading.tap.Tap;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
import org.apache.parquet.thrift.TBaseRecordConverter;

public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {

// In the case of reads, we can read the thrift class from the file metadata
public ParquetTBaseScheme() {
this(new Config<T>());
}

public ParquetTBaseScheme(Class<T> thriftClass) {
this(new Config<T>().withRecordClass(thriftClass));
}

public ParquetTBaseScheme(FilterPredicate filterPredicate) {
this(new Config<T>().withFilterPredicate(filterPredicate));
}

public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
}

public ParquetTBaseScheme(Config<T> config) {
super(config);
}

@Override
public void sourceConfInit(FlowProcess<? extends JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
super.sourceConfInit(fp, tap, jobConf);
jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
}

@Override
public void sinkConfInit(FlowProcess<? extends JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {

if (this.config.getKlass() == null) {
throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
}

DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
}
}
Loading

0 comments on commit 5769479

Please sign in to comment.