Skip to content

Commit

Permalink
Fix for ineligible filters, use compressed block size (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored Nov 28, 2016
1 parent 9c256a4 commit ded7fee
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class ParquetMetadataFileSplitter(
(applied, unapplied, filteredBlocks)
}

val eligible = parquetFilter(unapplied, filteredBlocks).map { bmd =>
val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
val blockPath = new Path(root, bmd.getPath)
new FileSplit(blockPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty)
new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty)
}

val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
Expand All @@ -95,19 +95,20 @@ class ParquetMetadataFileSplitter(
statFilter
}

private def parquetFilter(
private def applyParquetFilter(
filters: Seq[Filter],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
if (filters.nonEmpty) {
val predicates = filters.flatMap {
ParquetFilters.createFilter(schema, _)
}
if (predicates.nonEmpty) {
// Asynchronously build bitmaps
Future {
buildFilterBitMaps(filters)
}(ParquetMetadataFileSplitter.executionContext)

val predicate = filters.flatMap {
ParquetFilters.createFilter(schema, _)
}.reduce(FilterApi.and)
blocks.filter(bmd => !StatisticsFilter.canDrop(predicate, bmd.getColumns))
val predicate = predicates.reduce(FilterApi.and)
blocks.filterNot(bmd => StatisticsFilter.canDrop(predicate, bmd.getColumns))
} else {
blocks
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("Ensure file with multiple blocks splits properly with filters") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL",
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1024",
ParquetOutputFormat.BLOCK_SIZE -> "1") {
withTempPath { path =>
spark.sparkContext.parallelize((1 to 1000).map(x => x.toString), 1)
.toDF("x").write.parquet(path.getCanonicalPath)
val df = spark.read.parquet(path.getCanonicalPath)
val column: Column = df.col("x").isNotNull
assert(df.filter(column).count == df.count)
}
}
}

test("Ensure unconvertable filters don't break splitting") {
withSQLConf(ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") {
withTempPath { path =>
spark.sparkContext.parallelize((1 to 1000).map(x => x.toString), 1)
.toDF("x").write.parquet(path.getCanonicalPath)
val df = spark.read.parquet(path.getCanonicalPath)
val column: Column = df.col("x").startsWith("1000")
assert(df.filter(column).count == 1)
}
}
}
}

class CountingFileSystem extends RawLocalFileSystem {
Expand Down

0 comments on commit ded7fee

Please sign in to comment.