Skip to content

Commit

Permalink
PARQUET-1580: Page-level CRC checksum verfication for DataPageV1 (apa…
Browse files Browse the repository at this point in the history
…che#647)

* Page-level checksums for DataPageV1

* Got rid of redundant constant

* Use more direct way of obtaining defaults

* Revised implementation, updated tests, addressed review comments

* Revert auto whitespace trimming

* Variable rename for consistency

* Revert whitespace changes

* Revert more whitespace changes

* Addressed code review comments

* Enable writing out checksums by default

* Added benchmarks

* Addressed review comments

* Addressed test failures

* Added run script for checksum benchmarks

* Addressed code review comments
  • Loading branch information
bbraams authored and Fokko committed Jul 24, 2019
1 parent 14958d4 commit fcc5d1a
Show file tree
Hide file tree
Showing 23 changed files with 1,416 additions and 45 deletions.
28 changes: 28 additions & 0 deletions parquet-benchmarks/run_checksums.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# !/usr/bin/env bash

SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P )

echo "Page level CRC checksum benchmarks"
echo "Running write benchmarks"
java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumWriteBenchmarks -bm ss "$@"
echo "Running read benchmarks"
java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumReadBenchmarks -bm ss "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,26 @@ public class BenchmarkFiles {
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");

// Page checksum files
public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED");
public static final Path file_100K_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-UNCOMPRESSED");
public static final Path file_1M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-UNCOMPRESSED");
public static final Path file_1M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-UNCOMPRESSED");
public static final Path file_10M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-UNCOMPRESSED");
public static final Path file_10M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-UNCOMPRESSED");

public static final Path file_100K_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-GZIP");
public static final Path file_100K_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-GZIP");
public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP");
public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP");
public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP");
public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP");

public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY");
public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY");
public static final Path file_1M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-SNAPPY");
public static final Path file_1M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-SNAPPY");
public static final Path file_10M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-SNAPPY");
public static final Path file_10M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-SNAPPY");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import static java.util.UUID.randomUUID;
import static org.apache.parquet.benchmarks.BenchmarkConstants.*;
import static org.apache.parquet.benchmarks.BenchmarkFiles.*;

import java.io.IOException;
import java.util.Random;

import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists;
import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;

public class PageChecksumDataGenerator {

private final MessageType SCHEMA = MessageTypeParser.parseMessageType(
"message m {" +
" required int64 long_field;" +
" required binary binary_field;" +
" required group group {" +
" repeated int32 int_field;" +
" }" +
"}");

public void generateData(Path outFile, int nRows, boolean writeChecksums,
CompressionCodecName compression) throws IOException {
if (exists(configuration, outFile)) {
System.out.println("File already exists " + outFile);
return;
}

ParquetWriter<Group> writer = ExampleParquetWriter.builder(outFile)
.withConf(configuration)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(compression)
.withDictionaryEncoding(true)
.withType(SCHEMA)
.withPageWriteChecksumEnabled(writeChecksums)
.build();

GroupFactory groupFactory = new SimpleGroupFactory(SCHEMA);
Random rand = new Random(42);
for (int i = 0; i < nRows; i++) {
Group group = groupFactory.newGroup();
group
.append("long_field", (long) i)
.append("binary_field", randomUUID().toString())
.addGroup("group")
// Force dictionary encoding by performing modulo
.append("int_field", rand.nextInt() % 100)
.append("int_field", rand.nextInt() % 100)
.append("int_field", rand.nextInt() % 100)
.append("int_field", rand.nextInt() % 100);
writer.write(group);
}

writer.close();
}

public void generateAll() {
try {
// No need to generate the non-checksum versions, as the files generated here are only used in
// the read benchmarks
generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public void cleanup() {
deleteIfExists(configuration, file_100K_NOCHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_100K_CHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_100K_NOCHECKSUMS_GZIP);
deleteIfExists(configuration, file_100K_CHECKSUMS_GZIP);
deleteIfExists(configuration, file_100K_NOCHECKSUMS_SNAPPY);
deleteIfExists(configuration, file_100K_CHECKSUMS_SNAPPY);
deleteIfExists(configuration, file_1M_NOCHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_1M_CHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_1M_NOCHECKSUMS_GZIP);
deleteIfExists(configuration, file_1M_CHECKSUMS_GZIP);
deleteIfExists(configuration, file_1M_NOCHECKSUMS_SNAPPY);
deleteIfExists(configuration, file_1M_CHECKSUMS_SNAPPY);
deleteIfExists(configuration, file_10M_NOCHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_10M_CHECKSUMS_UNCOMPRESSED);
deleteIfExists(configuration, file_10M_NOCHECKSUMS_GZIP);
deleteIfExists(configuration, file_10M_CHECKSUMS_GZIP);
deleteIfExists(configuration, file_10M_NOCHECKSUMS_SNAPPY);
deleteIfExists(configuration, file_10M_CHECKSUMS_SNAPPY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K;
import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION;
import static org.apache.parquet.benchmarks.BenchmarkFiles.configuration;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY;

import java.io.IOException;

@State(Scope.Thread)
public class PageChecksumReadBenchmarks {

private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();

@Setup(Level.Trial)
public void setup() {
pageChecksumDataGenerator.generateAll();
}

@Setup(Level.Trial)
public void cleanup() {
pageChecksumDataGenerator.cleanup();
}

private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
throws IOException {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(configuration)
.usePageChecksumVerification(verifyChecksums)
.build()) {
for (int i = 0; i < nRows; i++) {
Group group = reader.read();
blackhole.consume(group.getLong("long_field", 0));
blackhole.consume(group.getBinary("binary_field", 0));
Group subgroup = group.getGroup("group", 0);
blackhole.consume(subgroup.getInteger("int_field", 0));
blackhole.consume(subgroup.getInteger("int_field", 1));
blackhole.consume(subgroup.getInteger("int_field", 2));
blackhole.consume(subgroup.getInteger("int_field", 3));
}
}
}

// 100k rows, uncompressed, GZIP, Snappy

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
}

// 1M rows, uncompressed, GZIP, Snappy

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
}

// 10M rows, uncompressed, GZIP, Snappy

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
}

@Benchmark @BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
}

}
Loading

0 comments on commit fcc5d1a

Please sign in to comment.