Skip to content

Commit

Permalink
timestamp and date filter pushdown (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored Dec 1, 2016
1 parent ded7fee commit 784072a
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce
private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
column.dataType() == DataTypes.TimestampType ||
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class ParquetFileFormat
val splits = ParquetFileFormat.fileSplits.get(root,
new Callable[ParquetFileSplitter] {
override def call(): ParquetFileSplitter =
createParquetFileSplits(root, hadoopConf, schema)
createParquetFileSplits(root, hadoopConf, schema, sparkSession)
})
root -> splits.buildSplitter(filters)
}.toMap
Expand All @@ -320,9 +320,12 @@ class ParquetFileFormat
private def createParquetFileSplits(
root: Path,
hadoopConf: Configuration,
schema: StructType): ParquetFileSplitter = {
schema: StructType,
sparkSession: SparkSession): ParquetFileSplitter = {
getMetadataForPath(root, hadoopConf)
.map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema))
.map { meta =>
new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema, sparkSession)
}
.getOrElse(ParquetDefaultFileSplitter)
}

Expand Down Expand Up @@ -382,13 +385,14 @@ class ParquetFileFormat
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

val int96AsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
int96AsTimestamp)

// Try to push down filters when filter push-down is enabled.
val pushed =
Expand All @@ -397,12 +401,12 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.flatMap(ParquetFilters.createFilter(requiredSchema, _, int96AsTimestamp))
.reduceOption(FilterApi.and)
} else {
None
}

log.info(s"Pushing converted filters: $pushed")
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils
Expand All @@ -53,10 +54,13 @@ object ParquetDefaultFileSplitter extends ParquetFileSplitter {
class ParquetMetadataFileSplitter(
val root: Path,
val blocks: Seq[BlockMetaData],
val schema: StructType)
val schema: StructType,
val session: SparkSession)
extends ParquetFileSplitter
with Logging {

private val int96AsTimestamp = session.sessionState.conf.isParquetINT96AsTimestamp

private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet

private val filterSets: Cache[Filter, RoaringBitmap] =
Expand Down Expand Up @@ -99,7 +103,7 @@ class ParquetMetadataFileSplitter(
filters: Seq[Filter],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
val predicates = filters.flatMap {
ParquetFilters.createFilter(schema, _)
ParquetFilters.createFilter(schema, _, int96AsTimestamp)
}
if (predicates.nonEmpty) {
// Asynchronously build bitmaps
Expand All @@ -121,7 +125,7 @@ class ParquetMetadataFileSplitter(
.filter(filterSets.getIfPresent(_) == null)
.flatMap { filter =>
val bitmap = new RoaringBitmap
ParquetFilters.createFilter(schema, filter)
ParquetFilters.createFilter(schema, filter, int96AsTimestamp)
.map((filter, _, bitmap))
}
var i = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -49,6 +50,12 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.eq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -70,6 +77,12 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -88,6 +101,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -106,6 +125,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -124,6 +149,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -142,6 +173,28 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n), convertTimestamp(v.asInstanceOf[java.sql.Timestamp]))
case DateType =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n), convertDate(v.asInstanceOf[java.sql.Date]))
}

private def convertDate(d: java.sql.Date): java.lang.Integer = {
if (d != null) {
DateTimeUtils.fromJavaDate(d).asInstanceOf[java.lang.Integer]
} else {
null
}
}

private def convertTimestamp(t: java.sql.Timestamp): java.lang.Long = {
if (t != null) {
DateTimeUtils.fromJavaTimestamp(t).asInstanceOf[java.lang.Long]
} else {
null
}
}

/**
Expand All @@ -153,23 +206,32 @@ private[parquet] object ParquetFilters {
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}
private def getFieldMap(dataType: DataType, int96AsTimestamp: Boolean): Map[String, DataType] =
dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
// scalastyle:off println
fields.filterNot { f =>
val isTs = DataTypes.TimestampType.acceptsType(f.dataType)

val isOptionalField = f.metadata.contains(StructType.metadataKeyForOptionalField) &&
f.metadata.getBoolean(StructType.metadataKeyForOptionalField)

(isTs && int96AsTimestamp) || isOptionalField
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema)
def createFilter(
schema: StructType,
predicate: sources.Filter,
int96AsTimestamp: Boolean): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema, int96AsTimestamp)

// NOTE:
//
Expand Down Expand Up @@ -221,18 +283,20 @@ private[parquet] object ParquetFilters {
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.and(lhsFilter, rhsFilter)

case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not).map(LogicalInverseRewriter.rewrite)
createFilter(schema, pred, int96AsTimestamp)
.map(FilterApi.not)
.map(LogicalInverseRewriter.rewrite)

case sources.In(name, values) if dataTypeOf.contains(name) =>
val eq = makeEq.lift(dataTypeOf(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)

case _: TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
new ParquetPrimitiveConverter(updater)

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,8 @@ private[parquet] class ParquetSchemaConverter(
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)

case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter
Expand All @@ -32,7 +31,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -66,9 +64,6 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
private var writeLegacyParquetFormat: Boolean = _

// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)

// Reusable byte array used to write decimal values
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

Expand Down Expand Up @@ -154,20 +149,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
// Currently we only support timestamps stored as INT96, which is compatible with Hive
// and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
// defined in the parquet-format spec. But up until writing, the most recent parquet-mr
// version (1.8.1) hasn't implemented it yet.

// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
}
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))

case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down
Loading

0 comments on commit 784072a

Please sign in to comment.