Skip to content

Commit

Permalink
Fix inset for large queries. Disable record level filtering (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored Dec 2, 2016
1 parent 784072a commit a1a17b5
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.9.0-palantir3</parquet.version>
<parquet.version>1.9.0-palantir4</parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.8.0</chill.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -107,8 +108,14 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
FilterCompat.Filter filter = getFilter(configuration);
this.reader = ParquetFileReader.open(configuration, file, footer);
List<RowGroupFilter.FilterLevel> filterLevels =
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS);
if (configuration.getBoolean(DICTIONARY_FILTERING_ENABLED, false)) {
filterLevels = ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS,
RowGroupFilter.FilterLevel.DICTIONARY);
}
blocks = filterRowGroups(
ImmutableList.of(RowGroupFilter.FilterLevel.STATISTICS, RowGroupFilter.FilterLevel.DICTIONARY),
filterLevels,
filter,
footer.getBlocks(),
reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ class ParquetFileFormat
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
int96AsTimestamp)

// By default, disable record level filtering.
if (hadoopConf.get(ParquetInputFormat.RECORD_FILTERING_ENABLED) == null) {
hadoopConf.setBoolean(ParquetInputFormat.RECORD_FILTERING_ENABLED, false)
}

// Try to push down filters when filter push-down is enabled.
val pushed =
if (sparkSession.sessionState.conf.parquetFilterPushDown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,53 @@ import org.apache.spark.sql.types._
*/
private[parquet] object ParquetFilters {

case class SetInFilter[T <: Comparable[T]](valueSet: Set[T])
extends UserDefinedPredicate[T] with Serializable {

override def keep(value: T): Boolean = {
value != null && valueSet.contains(value)
}

// Drop when no value in the set is within the statistics range.
override def canDrop(statistics: Statistics[T]): Boolean = {
val statMax = statistics.getMax
val statMin = statistics.getMin
val statRange = com.google.common.collect.Range.closed(statMin, statMax)
!valueSet.exists(value => statRange.contains(value))
}

// Can only drop not(in(set)) when we are know that every element in the block is in valueSet.
// From the statistics, we can only be assured of this when min == max.
override def inverseCanDrop(statistics: Statistics[T]): Boolean = {
val statMax = statistics.getMax
val statMin = statistics.getMin
statMin == statMax && valueSet.contains(statMin)
}
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
case IntegerType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
case LongType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
case FloatType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand Down Expand Up @@ -299,10 +346,7 @@ private[parquet] object ParquetFilters {
.map(LogicalInverseRewriter.rewrite)

case sources.In(name, values) if dataTypeOf.contains(name) =>
val eq = makeEq.lift(dataTypeOf(name))
values.flatMap { v =>
eq.map(_(name, v))
}.reduceLeftOption(FilterApi.or)
makeInSet.lift(dataTypeOf(name)).map(_(name, values.toSet))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.time.{LocalDate, ZoneId}
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -111,7 +112,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
maybeFilter.exists(_.getClass === filterClass)
}
checker(stripSparkFilter(query), expected)
checker(query, expected)
}
}
}
Expand Down Expand Up @@ -557,11 +558,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
val df = spark.read.parquet(path).filter("a = 2")

// The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work.
assert(stripSparkFilter(df).count == 1)
assert(df.count == 1)
}
}
}
Expand Down Expand Up @@ -676,20 +673,35 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
val df = spark.read.parquet(path).where("b in (0,2)")
assert(stripSparkFilter(df).count == 3)
assert(df.count == 3)
val df1 = spark.read.parquet(path).where("not (b in (1))")
assert(stripSparkFilter(df1).count == 3)
assert(df1.count == 3)
val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)")
assert(stripSparkFilter(df2).count == 2)
assert(df2.count == 2)
val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)")
assert(stripSparkFilter(df3).count == 4)
assert(df3.count == 4)
val df4 = spark.read.parquet(path).where("not (a <= 2)")
assert(stripSparkFilter(df4).count == 3)
assert(df4.count == 3)
}
}
}
}

test("Large In filters work with UDP") {
import testImplicits._
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
ParquetInputFormat.DICTIONARY_FILTERING_ENABLED -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 1000).toDF().write.parquet(path)
val df = spark.read.parquet(path)
val filter = (1 to 499).map(i => i.toString).mkString(",")
assert(df.where(s"value in (${filter})").count() == 499)
assert(df.where(s"value not in (${filter})").count() == 501)
}
}
}

test("Do not create Timestamp filters when interpreting from INT96") {
val baseMillis = System.currentTimeMillis()
def base(): Timestamp = new Timestamp(baseMillis)
Expand Down

0 comments on commit a1a17b5

Please sign in to comment.