-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: Interface based DataFile reader and writer API - PoC #12298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
907089c to
313c2d5
Compare
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
| public Key(FileFormat fileFormat, String dataType, String builderType) { | ||
| this.fileFormat = fileFormat; | ||
| this.dataType = dataType; | ||
| this.builderType = builderType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of defining the default one using a priority (int) based approach and let the one with the highest priority be the default one. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a concrete example for this: Comet vectorized parquet reader spark.sql.iceberg.parquet.reader-type
I think it is good if the reader/writer choice is a conscious decision, and not happening based on some behind the scenes algorithm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for simplicity. This code should not determine things like whether Comet is used. This should have a single purpose, which is to standardize how object models plug in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the config to properties, and the builder method will create the different readers based on this config
| import org.apache.iceberg.io.FileAppender; | ||
|
|
||
| /** Builder API for creating {@link FileAppender}s. */ | ||
| public interface AppenderBuilder extends InternalData.WriteBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder that AppenderBuilder has a base interface but the other builders don't.
Guess it might help to have a common DataFileIoBuilder interface defining the common builder attributes (table, schema, properties, meta). It's a bit of an "adventure in Java generics", but doable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take a look at the other PRs (#12164, #12069), you can see that first, I took that adventurous route, but the result was too many classes/interfaces and casts.
This PR is aiming for the minimal set of changes, and the InternalData.WriteBuilder is already introduced to Iceberg by #12060. We either need to widen that interface or inherit from it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused by this inheritance. We're extending but overriding everything and it's not clear to me what we really gain by going with this approach. It looks like it ends up as a completely different builder that produces the same build result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal with the PR was to show the minimal changes required to make the idea work.
We either create a different builder class for the InternalData.WriteBuilder and the DataFile.WriteBuilder, or we need to have inheritance of the interfaces.
Based on our discussion below we might end up using a different strategy, so revisit this comment later.
| return DataFileServiceRegistry.read( | ||
| task.file().format(), Record.class.getName(), input, fileProjection, partition) | ||
| .split(task.start(), task.length()) | ||
| .caseSensitive(caseSensitive) | ||
| .reuseContainers(reuseContainers) | ||
| .filter(task.residual()) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these simplifications!
c528a52 to
9975b4f
Compare
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary for this proposal, I left some comments.
|
|
||
| /** Enables reusing the containers returned by the reader. Decreases pressure on GC. */ | ||
| @Override | ||
| default ReaderBuilder reuseContainers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it should not be here? These are parquet reader specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also used by Avro.
See:
| this.reuseContainers = reuseContainers; |
| * @param rowType of the native input data | ||
| * @return {@link DataWriterBuilder} for building the actual writer | ||
| */ | ||
| public static <S> DataWriterBuilder dataWriterBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand in what case need this? I think append would be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will check this. We might be able to remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the current approach, the file format api implementation creates the appender, and the PR creates the writers for the different data/delete files
| * @return {@link AppenderBuilder} for building the actual writer | ||
| */ | ||
| public static <S, B extends EqualityDeleteWriterBuilder<B>> | ||
| EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think file format should consider eqaulity deletion/pos deletion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current Avro positional delete writer behaves differently than Parquet/ORC positional delete writers.
In case of the positional delete files the schema provided to the Avro writer should omit the PATH and the POS fields, and only needs the actual table schema. The writer handles the PATH/POS fields by static code:
iceberg/core/src/main/java/org/apache/iceberg/avro/Avro.java
Lines 614 to 618 in b8fdd84
| public void write(PositionDelete<D> delete, Encoder out) throws IOException { | |
| PATH_WRITER.write(delete.path(), out); | |
| POS_WRITER.write(delete.pos(), out); | |
| rowWriter.write(delete.row(), out); | |
| } |
The Parquet/ORC positional delete writers behave in the same way. They expect the same input.
If we are ready for a more invasive change we can harmonize the writers.
I have aimed for a minimal changeset to allow easier acceptance for the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The appender doesn't need to know about these, but the file formats and the writer implementations need this
| * issues. | ||
| */ | ||
| private static final class Registry { | ||
| private static final Map<Key, ReaderService> READ_BUILDERS = Maps.newConcurrentMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like a convention problem, I think maybe we just need to store in FileFormatService in registry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes we don't have writers (arrow), or we have multiple readers vectorized/non-vectorized readers. Also Parquet has Comet reader. So I kept the writers and the readers separate
| /** Key used to identify readers and writers in the {@link DataFileServiceRegistry}. */ | ||
| public static class Key { | ||
| private final FileFormat fileFormat; | ||
| private final String dataType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this things like arrow, internal row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah,
Currenly we have:
- Record - generic readers/writers
- ColumnarBatch (arrow) - arrow
- RowData - Flink
- InternalRow - Spark
- ColumnarBatch (spark) - Spark batch
|
I will start to collect the differences here between the different writer types (appender/dataWriter/equalityDeleteWriter/positionalDeleteWriter) for reference:
|
| import org.apache.iceberg.io.DataWriter; | ||
|
|
||
| /** Builder API for creating {@link DataWriter}s. */ | ||
| public interface DataWriterBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put the builder interface into the DataWriter class and put it in the same package? It seems odd to me that we're introducing this new datafile package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The classes which has to be implemented by the file formats are kept in the io package, but moved the others to the data package
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
...urces/META-INF/services/org.apache.iceberg.io.datafile.DataFileServiceRegistry$WriterService
Outdated
Show resolved
Hide resolved
|
While I think the goal here is a good one, the implementation looks too complex to be workable in its current form. The primary issue that we currently have is adapting object models (like Iceber's internal - switch (format) {
- case AVRO:
- AvroIterable<ManifestEntry<F>> reader =
- Avro.read(file)
- .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
- .createResolvingReader(this::newReader)
- .reuseContainers()
- .build();
+ CloseableIterable<ManifestEntry<F>> reader =
+ InternalData.read(format, file)
+ .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
+ .reuseContainers()
+ .build();
- addCloseable(reader);
+ addCloseable(reader);
- return CloseableIterable.transform(reader, inheritableMetadata::apply);
+ return CloseableIterable.transform(reader, inheritableMetadata::apply);
-
- default:
- throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
- }This shows:
In this PR, there are a lot of other changes as well. I'm looking at one of the simpler Spark cases in the row reader. The builder is initialized from return DataFileServiceRegistry.readerBuilder(
format, InternalRow.class.getName(), file, projection, idToConstant)There are also new static classes in the file. Each creates a new service and each service creates the builder and object model: public static class AvroReaderService implements DataFileServiceRegistry.ReaderService {
@Override
public DataFileServiceRegistry.Key key() {
return new DataFileServiceRegistry.Key(FileFormat.AVRO, InternalRow.class.getName());
}
@Override
public ReaderBuilder builder(
InputFile inputFile,
Schema readSchema,
Map<Integer, ?> idToConstant,
DeleteFilter<?> deleteFilter) {
return Avro.read(inputFile)
.project(readSchema)
.createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant));
}The In addition, there are now a lot more abstractions:
I think that the next steps are to focus on making this a lot simpler, and there are some good ways to do that:
|
I'm happy that we agree with the goals. I created a PR to start the conversation. If there are willing reviewers we can introduce more invasive changes to archive a better API. I'm all for it!
I think we need to keep this direct transformations to prevent the performance loss which would be caused by multiple transformations between object model -> common model -> file format. We have a matrix of transformation which we need to encode somewhere:
The InternalData reader has one advantage over the data file readers/writers. The internal object model is static for these readers/writers. For the DataFile readers/writers we have multiple object models to handle.
If we allow adding new builders for the file formats we can remove a good chunk of the boilerplate code. Let me see how this would look like
We need to refactor the Avro positional delete write for this, or add a positionalWriterFunc. Also need to consider that the format specific configurations which are different for the appenders and the delete files (DELETE_PARQUET_ROW_GROUP_SIZE_BYTES vs. PARQUET_ROW_GROUP_SIZE_BYTES)
If we are ok with having a new Builder for the readers/writers, then we don't need the service. It was needed to keep the current APIs and the new APIs compatible.
Will do
Will see what could be arcived |
c488d32 to
71ec538
Compare
cb12c93 to
e6c6147
Compare
core/src/main/java/org/apache/iceberg/formats/ContentFileWriteBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/AvroFormatModel.java
Outdated
Show resolved
Hide resolved
| * @param value config value | ||
| * @return this for method chaining | ||
| */ | ||
| ReadBuilder<D, S> set(String key, String value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like both set and setAll are not called. Are they really needed in this API?
If they are not used, then I'd prefer to add them later if and when they are necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are used on the Parquet.ReadBuilder. So they will be used once we migrate all of the use-cases.
Lines 151 to 162 in 0d4d3a5
| Parquet.read(Files.localInput(dataFile)) | |
| .project(SCHEMA) | |
| .readSupport(new ParquetReadSupport()) | |
| .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) | |
| .set("spark.sql.parquet.binaryAsString", "false") | |
| .set("spark.sql.parquet.int96AsTimestamp", "false") | |
| .set("spark.sql.caseSensitive", "false") | |
| .set("spark.sql.parquet.fieldId.write.enabled", "false") | |
| .set("spark.sql.parquet.inferTimestampNTZ.enabled", "false") | |
| .set("spark.sql.legacy.parquet.nanosAsLong", "false") | |
| .callInit() | |
| .build()) { |
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
Outdated
Show resolved
Hide resolved
| new ParquetFormatModel<ColumnarBatch, StructType, DeleteFilter<InternalRow>>( | ||
| VectorizedSparkParquetReaders.CometColumnarBatch.class, | ||
| StructType.class, | ||
| VectorizedSparkParquetReaders::buildCometReader)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I think this should be calling VectorizedSparkParquetReaders.buildReader instead of buildCometReader. That is the one that checks the incoming config and chooses whether to use Comet or the regular Arrow reader.
I also think that the buildCometReader method that this references isn't needed because this is the only place where it is called and it ignores the config map passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I went to investigate where config was used and then argue that it should be removed. The replacement is to register a different type and I see that's actually what is happening here. This is correct because the registered output type is CometColumnarBatch.
So the real problem is that there is an extra and unnecessary argument passed to these methods, the config string map. Registering a different batch type is the right way to handle the registration, so that Spark can use the registered reader and make the decision about using Arrow vs Comet entirely independent of the file format models.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is a questionable decision. I didn't intend to put this commit on this branch. I just created it to test how it could work.
Currently (on main) the Comet/Arrow decision is done in the BaseBatchReader:
iceberg/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
Lines 96 to 105 in 0d4d3a5
| .createBatchedReaderFunc( | |
| fileSchema -> { | |
| if (parquetConf.readerType() == ParquetReaderType.COMET) { | |
| return VectorizedSparkParquetReaders.buildCometReader( | |
| requiredSchema, fileSchema, idToConstant, deleteFilter); | |
| } else { | |
| return VectorizedSparkParquetReaders.buildReader( | |
| requiredSchema, fileSchema, idToConstant, deleteFilter); | |
| } | |
| }) |
My original idea was to add another parameter to the FormatModelRegistry.readBuilder method, like readBuilder(returnType, readerType, inputFile), and based on the readerType it could chose the Arrow, or the Comet reader.
Since this was only used for Spark/Parquet, your suggestion was to hide it behind a config, and push this decision to the VectorizedSparkParquetReaders.buildReader. This is how I wanted to keep this PR.
I was playing around how can I change this, and use the currently proposed API to move this decision back to the caller. That is why I experimented with a Hacky solution to register the Comet vectorized reader to the File Format API. I did not like what I have seen. Mostly because I had to create a CometColumnarBatch for the sole reason to differentiate between the Arrow and the Comet reader. So I abandoned the idea but forgot to revert the commit 😢
|
|
||
| @Override | ||
| public PositionDeleteWriter<InternalRow> newPositionDeleteWriter( | ||
| EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only here to preserve the existing functionality that we are not moving into the new file interfaces, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This will be removed in 1.12.0.
| MessageType fileSchema, | ||
| Map<Integer, ?> idToConstant, | ||
| DeleteFilter<InternalRow> deleteFilter, | ||
| Map<String, String> config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I noted in SparkFormatModels, I think that this config is no longer needed becuase this buildReader method is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's talk about it at the other place: #12298 (comment)
| MessageType messageType, | ||
| Map<Integer, ?> constantValues, | ||
| F deleteFilter, | ||
| Map<String, String> config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed (see previous comments).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's talk about it at the other place: #12298 (comment)
| Schema schema, | ||
| MessageType messageType, | ||
| Map<Integer, ?> constantValues, | ||
| F deleteFilter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that the delete filter should be part of this interface. I realize that it is currently part of how vectorized readers are built, but the row deletes should be independent of the read plan and reader. That also simplifies the types here because you don't need a specific filter type (which would probably need to be changed to DeleteFilter<D> anyway).
The delete filter extracts information from each row, so it does not require being part of the reader or vectorized reader. It can and should be applied by engines rather than being passed through this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several uses for the DeleteFilter in vectorized reads:
- Handling the
_deletedmetadata column when needed - RowId mapping to delete the rows
- Maintaining the counter for the delete records for metrics
This is most probably a quite sizeable change.
I will check what I can do with it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a PR to see how this could work: #14652
| } | ||
| } | ||
|
|
||
| private abstract static class ReadBuilderWrapper<D, S, F> implements ReadBuilder<D, S> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified quite a bit:
- The batch reader function and the row reader function can be combined because the delete filter and config do not need to be passed through. That means the sub-classes of this builder are not needed
- Without subclasses, this does not need to expose methods for fetching its state. I also refactored this class and concrete children locally and removed the need to expose its state, but it's easier to just remove the subclasses.
- This doesn't need to track the Iceberg schema. Instead, this should register a binary reader function with Parquet so that Parquet is responsible for passing the Iceberg schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, shouldn't this account for the DF schema?
I know that it isn't used at the moment, but the engine type is a type param here and we have a use case where it will be needed later (requesting shredded variant reads) so I don't think this is complete without it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This depends on #12298 (comment) and #12298 (comment)
Otherwise the differences are bigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this became irrelevant after the DeleteFilter change (#14065)
These points might still be worth discussing:
This doesn't need to track the Iceberg schema. Instead, this should register a binary reader function with Parquet so that Parquet is responsible for passing the Iceberg schema.
Are you suggesting updating the underlying Parquet class to add a method like:
ReadBuilder.createBatchedReaderFunc(BiFunction<Schema, MessageType, VectorizedReader<?>> newReaderFunction)?
I’ve made this change, but it turned out to be a bit more involved to keep it consistent with Parquet.BinaryReaderFunction. Please, review!
Also, shouldn't this account for the DF schema?
I originally included it, but several reviewers noted that it’s not currently used. So, asked to remove it for now and add it back when needed. I’m open to either approach.
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
Outdated
Show resolved
Hide resolved
| this(type, schemaType, readerFunction, null, writerFunction); | ||
| } | ||
|
|
||
| public ParquetFormatModel( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to me to have a model that is for batch reading. But why not also have options for reading a single row format or writing a single row format?
For that matter, maybe reads and writes should be registered separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The early decision was that we don't allow only readers/writers. The File Format needs to implement readers and writers both to be accepted as a supported File Format.
I see the vectorized readers as an exception.
c61b8ba to
5f91620
Compare
…quetFormatModel.ReadBuilderWrapper
5f91620 to
c3babfd
Compare
Here is what the PR does:
ReadBuilder- Builder for reading data from data filesAppenderBuilder- Builder for writing data to data filesObjectModel- Providing ReadBuilders, and AppenderBuilders for the specific data file format and object model pairAppenderBuilder- Builder for writing a fileDataWriterBuilder- Builder for generating a data filePositionDeleteWriterBuilder- Builder for generating a position delete fileEqualityDeleteWriterBuilder- Builder for generating an equality delete fileReadBuilderhere - the file format reader builder is reusedWriterBuilderclass which implements the interfaces above (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) based on a provided file format specificAppenderBuilderObjectModelRegistrywhich stores the availableObjectModels, and engines and users could request the readers (ReadBuilder) and writers (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) from.GenericObjectModels- for reading and writing Iceberg RecordsSparkObjectModels- for reading (vectorized and non-vectorized) and writing Spark InternalRow/ColumnarBatch objectsFlinkObjectModels- for reading and writing Flink RowData objects