Skip to content

Commit

Permalink
PARQUET-229 Add a strict thrift projection API with backwards compat …
Browse files Browse the repository at this point in the history
…support

Currently, the thrift projection API accepts strings in a very general glob format that supports not only wildcards like `*` and `?` and expansions (`{x,y,z}`) but also character classes `[abc]`, and negation.
Because of this flexibility, it's hard to give users good error reporting, for example letting them know that when they requested columns `foo.bar.{a,b,c}` there is actually no such column `foo.bar.c`.

This PR introduces a new syntax that supports a more restrictive form of glob syntax and enforces that all **expansions** of a glob match a column, not just that all globs match a column. The new syntax is very simple and only has four special characters: `{`,`}`,`,`, and `*`

It supports glob expansion, for example:
`home.{phone,address}` or `org.apache{-incubator,}.foo`

And the wildcard `*` which is treated the same way as java regex treats `(.*)`, for example:
`home.*` or `org.apache*.foo`

In the new syntax glob paths mean "keep all the child fields of the field matched by this glob", just like variable access would work in a programming language. For example: `x.y.z` means keep field `z` and all of its children (if any). So it's not necessary to do `x.y.z.*`. However, `x.y.z` would not keep field `x.y.zoo`. If that was desired, then `x.y.z*` could be used instead.

Setting `"parquet.thrift.column.filter"` will result in the same behavior that it does currently in master, though a deprecation warning will be logged. The classes that implement the current behavior have been marked as deprecated, and using this will log a warning.

Setting `"parquet.thrift.column.projection.globs"` will instead use this new syntax, and entry points in the various Builder's in the codebase is added as well.

This PR does a little bit of cleanup as well, moving some shared methods to a `Strings` class and simplifying some of the class hierarchy in `ThriftSchemaConverterVisitor`. There are a few `// TODO Why?` added as well that I wanted to ask about.

Author: Alex Levenson <[email protected]>

Closes apache#150 from isnotinvain/alexlevenson/strict-projection and squashes the following commits:

6c58e1c [Alex Levenson] clean up docs
1aab666 [Alex Levenson] Merge branch 'master' into alexlevenson/strict-projection
92b6ba6 [Alex Levenson] Merge branch 'master' into alexlevenson/strict-projection
ceaf6cd [Alex Levenson] update packages
a28dc19 [Alex Levenson] Merge branch 'master' into alexlevenson/strict-projection
ebc4761 [Alex Levenson] Remove unneeded TODO
c2e12c5 [Alex Levenson] Update docs
eecf5f3 [Alex Levenson] Merge branch 'master' into alexlevenson/strict-projection
671f0b5 [Alex Levenson] Merge branch 'master' into alexlevenson/strict-projection
298cad8 [Alex Levenson] Add warning
8b7e4bb [Alex Levenson] Add more comments to StrictFieldProjectionFilter
8f65ed2 [Alex Levenson] Add tests for strict projection filter
c81d9e1 [Alex Levenson] Docs and cleanup for FieldProjectionFilter
71139a7 [Alex Levenson] Add tests for FieldsPath
7d17068 [Alex Levenson] Tests for WildcardPath
8a3d2af [Alex Levenson] Add some tests
f3fd931 [Alex Levenson] More docs
0b190c3 [Alex Levenson] Add more comments
6e67df5 [Alex Levenson] Add a strict thrift projection API with backwards support for the current API
  • Loading branch information
isnotinvain committed May 1, 2015
1 parent 22c6d08 commit 7fc7998
Show file tree
Hide file tree
Showing 24 changed files with 1,858 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,56 @@ public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader

public static final class Config<T> implements Serializable {
private final FilterPredicate filterPredicate;
private final String projectionString;
private final String deprecatedProjectionString;
private final String strictProjectionString;
private final Class<T> klass;
private Config(Class<T> klass, FilterPredicate filterPredicate, String projectionString) {

private Config(Class<T> klass, FilterPredicate filterPredicate, String deprecatedProjectionString, String strictProjectionString) {
this.filterPredicate = filterPredicate;
this.projectionString = projectionString;
this.deprecatedProjectionString = deprecatedProjectionString;
this.strictProjectionString = strictProjectionString;
this.klass = klass;
}

public Config() {
filterPredicate = null;
projectionString = null;
deprecatedProjectionString = null;
strictProjectionString = null;
klass = null;
}

public FilterPredicate getFilterPredicate() {
return filterPredicate;
}

@Deprecated
public String getProjectionString() {
return projectionString;
return deprecatedProjectionString;
}

public String getStrictProjectionString() {
return strictProjectionString;
}

public Class<T> getKlass() {
return klass;
}

public Config<T> withFilterPredicate(FilterPredicate f) {
return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.projectionString);
return new Config<T>(this.klass, checkNotNull(f, "filterPredicate"), this.deprecatedProjectionString, this.strictProjectionString);
}

@Deprecated
public Config<T> withProjectionString(String p) {
return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionFilter"));
return new Config<T>(this.klass, this.filterPredicate, checkNotNull(p, "projectionString"), this.strictProjectionString);
}

public Config<T> withStrictProjectionString(String p) {
return new Config<T>(this.klass, this.filterPredicate, this.deprecatedProjectionString, checkNotNull(p, "projectionString"));
}

public Config<T> withRecordClass(Class<T> klass) {
return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate, this.projectionString);
return new Config<T>(checkNotNull(klass, "recordClass"), this.filterPredicate, this.deprecatedProjectionString, this.strictProjectionString);
}
}

Expand All @@ -105,9 +119,16 @@ public ParquetValueScheme(Config<T> config) {
this.config = config;
}

@Deprecated
private void setProjectionPushdown(JobConf jobConf) {
if (this.config.projectionString!= null) {
ThriftReadSupport.setProjectionPushdown(jobConf, this.config.projectionString);
if (this.config.deprecatedProjectionString != null) {
ThriftReadSupport.setProjectionPushdown(jobConf, this.config.deprecatedProjectionString);
}
}

private void setStrictProjectionPushdown(JobConf jobConf) {
if (this.config.strictProjectionString != null) {
ThriftReadSupport.setStrictFieldProjectionFilter(jobConf, this.config.strictProjectionString);
}
}

Expand All @@ -120,6 +141,7 @@ private void setPredicatePushdown(JobConf jobConf) {
public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
setPredicatePushdown(jobConf);
setProjectionPushdown(jobConf);
setStrictProjectionPushdown(jobConf);
setRecordClass(jobConf);
}

Expand Down
110 changes: 110 additions & 0 deletions parquet-common/src/main/java/org/apache/parquet/Strings.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.parquet.glob.GlobExpander;
import org.apache.parquet.glob.WildcardPath;

public final class Strings {
private Strings() { }

/**
* Join an Iterable of Strings into a single string with a delimiter.
* For example, join(Arrays.asList("foo","","bar","x"), "|") would return
* "foo||bar|x"
*
* @param s an iterable of strings
* @param on the delimiter
* @return a single joined string
*/
public static String join(Iterable<String> s, String on) {
Iterator<String> iter = s.iterator();
StringBuilder sb = new StringBuilder();
while (iter.hasNext()) {
sb.append(iter.next());
if (iter.hasNext()) {
sb.append(on);
}
}
return sb.toString();
}

/**
* Join an Array of Strings into a single string with a delimiter.
* For example, join(new String[] {"foo","","bar","x"}, "|") would return
* "foo||bar|x"
*
* @param s an iterable of strings
* @param on the delimiter
* @return a single joined string
*/
public static String join(String[] s, String on) {
return join(Arrays.asList(s), on);
}

/**
* Returns true if s.isEmpty() or s == null
*/
public static boolean isNullOrEmpty(String s) {
return s == null || s.isEmpty();
}

/**
* Expands a string with braces ("{}") into all of its possible permutations.
* We call anything inside of {} braces a "one-of" group.
*
* The only special characters in this glob syntax are '}', '{' and ','
*
* The top-level pattern must not contain any commas, but a "one-of" group separates
* its elements with commas, and a one-of group may contain sub one-of groups.
*
* For example:
* start{a,b,c}end -> startaend, startbend, startcend
* start{a,{b,c},d} -> startaend, startbend, startcend, startdend
* {a,b,c} -> a, b, c
* start{a, b{x,y}} -> starta, startbx, startby
*
* @param globPattern a string in the format described above
* @return a list of all the strings that would satisfy globPattern, including duplicates
*/
public static List<String> expandGlob(String globPattern) {
return GlobExpander.expand(globPattern);
}

/**
* Expands a string according to {@link #expandGlob(String)}, and then constructs a {@link WildcardPath}
* for each expanded result which can be used to match strings as described in {@link WildcardPath}.
*
* @param globPattern a String to be passed to {@link #expandGlob(String)}
* @param delim the delimeter used by {@link WildcardPath}
*/
public static List<WildcardPath> expandGlobToWildCardPaths(String globPattern, char delim) {
List<WildcardPath> ret = new ArrayList<WildcardPath>();
for (String expandedGlob : Strings.expandGlob(globPattern)) {
ret.add(new WildcardPath(globPattern, expandedGlob, delim));
}
return ret;
}
}
114 changes: 114 additions & 0 deletions parquet-common/src/main/java/org/apache/parquet/glob/GlobExpander.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.glob;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.parquet.glob.GlobNode.Atom;
import org.apache.parquet.glob.GlobNode.GlobNodeSequence;
import org.apache.parquet.glob.GlobNode.OneOf;

/**
* Implementation of {@link org.apache.parquet.Strings#expandGlob(String)}
*/
public final class GlobExpander {
private GlobExpander() { }

/**
* See {@link org.apache.parquet.Strings#expandGlob(String)} for docs.
*/
public static List<String> expand(String globPattern) {
return GlobExpanderImpl.expand(GlobParser.parse(globPattern));
}

/**
* Transforms a tree of {@link GlobNode} into a list of all the strings that satisfy
* this tree.
*/
private final static class GlobExpanderImpl implements GlobNode.Visitor<List<String>> {
private static final GlobExpanderImpl INSTANCE = new GlobExpanderImpl();

private GlobExpanderImpl() {}

public static List<String> expand(GlobNode node) {
return node.accept(INSTANCE);
}

@Override
public List<String> visit(Atom atom) {
// atoms are the base case, just return a singleton list
return Arrays.asList(atom.get());
}

@Override
public List<String> visit(OneOf oneOf) {
// in the case of OneOf, we just need to take all of
// the possible values the OneOf represents and
// union them together
List<String> results = new ArrayList<String>();
for (GlobNode n : oneOf.getChildren()) {
results.addAll(n.accept(this));
}
return results;
}

@Override
public List<String> visit(GlobNodeSequence seq) {
// in the case of a sequence, for each child
// we need to expand the child into all of its
// possibilities, then do a cross product of
// all the children, in order.

List<String> results = new ArrayList<String>();
for (GlobNode n : seq.getChildren()) {
results = crossOrTakeNonEmpty(results, n.accept(this));
}
return results;
}

/**
* Computes the cross product of two lists by adding each string in list1 to each string in list2.
* If one of the lists is empty, a copy of the other list is returned.
* If both are empty, an empty list is returned.
*/
public static List<String> crossOrTakeNonEmpty(List<String> list1, List<String> list2) {
if (list1.isEmpty()) {
ArrayList<String> result = new ArrayList<String>(list2.size());
result.addAll(list2);
return result;
}

if (list2.isEmpty()) {
ArrayList<String> result = new ArrayList<String>(list1.size());
result.addAll(list1);
return result;
}

List<String> result = new ArrayList<String>(list1.size() * list2.size());
for (String s1 : list1) {
for (String s2 : list2) {
result.add(s1 + s2);
}
}
return result;
}
}
}
Loading

0 comments on commit 7fc7998

Please sign in to comment.