Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Parquet writer TableClient APIs and default implementation #2626

Merged
merged 9 commits into from
Feb 14, 2024

Conversation

vkorukanti
Copy link
Collaborator

@vkorukanti vkorukanti commented Feb 9, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Add the following API to ParquetHandler to support writing Parquet files.

    /**
     * Write the given data batches to a Parquet files. Try to keep the Parquet file size to given
     * size. If the current file exceeds this size close the current file and start writing to a new
     * file.
     * <p>
     *
     * @param directoryPath Path to the directory where the Parquet should be written into.
     * @param dataIter      Iterator of data batches to write.
     * @param maxFileSize   Target maximum size of the created Parquet file in bytes.
     * @param statsColumns  List of columns to collect statistics for. The statistics collection is
     *                      optional. If the implementation does not support statistics collection,
     *                      it is ok to return no statistics.
     * @return an iterator of {@link DataFileStatus} containing the status of the written files.
     * Each status contains the file path and the optionally collected statistics for the file
     * It is the responsibility of the caller to close the iterator.
     *
     * @throws IOException if an I/O error occurs during the file writing. This may leave some files
     *                     already written in the directory. It is the responsibility of the caller
     *                     to clean up.
     * @since 3.2.0
     */
    CloseableIterator<DataFileStatus> writeParquetFiles(
            String directoryPath,
            CloseableIterator<FilteredColumnarBatch> dataIter,
            long maxFileSize,
            List<Column> statsColumns) throws IOException;

The default implementation of the above interface uses parquet-mr library.

How was this patch tested?

Added support for all Delta types except the timestamp_ntz. Tested writing different data types with variations of nested levels, null/non-null values and target file size.

Followup work

  • Support 2-level structures for array and primitive type data writing
  • Support INT64 format timestamp writing
  • Uniform support to add field id for intermediate elements in MAP, LIST data types.

@vkorukanti vkorukanti requested a review from tdas February 9, 2024 19:05
* Extends {@link FileStatus} to include additional details such as column level statistics
* of the data file in the Delta Lake table.
*/
public class DataFileStatus extends FileStatus {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just change the FileStatus to include the DataFileStatistics? This class doesn't seem to be adding a lot of value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely there are cases of FileStatus where statistics just are not useful or applicable? This class seems very useful to me. Please elaborate?

@vkorukanti vkorukanti force-pushed the parquetWriter branch 3 times, most recently from 7d3e2c7 to a7d0ae7 Compare February 9, 2024 19:56
@vkorukanti vkorukanti force-pushed the parquetWriter branch 3 times, most recently from fbe6b97 to b6a75ff Compare February 12, 2024 18:08
Adds the interface to write a Parquet file and collect the file status and stats. Currently only
support writing int type columns. Once the interfaces are approved, will add the rest of the column type
support.
@vkorukanti vkorukanti changed the title [WIP][Kernel] Parquet writer TableClient APIs [Kernel] Parquet writer TableClient APIs Feb 13, 2024
} else if (precision <= ParquetSchemaUtils.DECIMAL_MAX_DIGITS_IN_LONG) {
return new DecimalLongWriter(colName, fieldIndex, columnVector);
}
// TODO: Need to support legacy mode where all decimals are written as binary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make an issue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create issues once this PR is landed. Without landing it, we don't know what we are referring to.


@Override
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
recordConsumer.addInteger(columnVector.getByte(rowId));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.addInteger .... .getByte ? should it be .getInteger?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is getByte because the vector stores byte values. Parquet has one physical type int for byte, short and int logical types. Internally it has a mechanism to encode to save the space on disk.


@Override
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
recordConsumer.addInteger(columnVector.getShort(rowId));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an .addShort?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

})
.filter(Objects::nonNull)
.collect(Collectors.toList());
.map(column -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just indentation change? can we ignore this noise?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It got changed as part of the autoformat in IntelliJ. Just space changes are easy to visualize in github.

return Optional.empty();
}

return metadataList.stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how hard is it to not stream and reduce? stream is known to not have great performance

Copy link
Collaborator Author

@vkorukanti vkorukanti Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is executed once per file. If it is per row, the cost adds up. Also this happens mostly in tasks at executors.


private static boolean hasInvalidStatistics(Collection<ColumnChunkMetaData> metadataList) {
// If any row group does not have stats collected, stats for the file will not be valid
return metadataList.stream().anyMatch(metadata ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. i wonder if we should avoid stream on the hot path. can you confirm: is this on any sort of hot path? seems like it could be happening multiple times for every file?

Copy link
Collaborator Author

@vkorukanti vkorukanti Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the hotpath. Stats extraction is done once per file and mostly at tasks on the executor.

@vkorukanti vkorukanti changed the title [Kernel] Parquet writer TableClient APIs [Kernel] Parquet writer TableClient APIs and default implementation Feb 14, 2024
Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@vkorukanti vkorukanti merged commit 4ecfa45 into delta-io:master Feb 14, 2024
6 checks passed
@vkorukanti vkorukanti deleted the parquetWriter branch May 9, 2024 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants