-
Notifications
You must be signed in to change notification settings - Fork 254
feat: Add experimental support for native Parquet writes #2812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
A good test would be to write with this feature enabled and then read it with and without Comet enabled. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2812 +/- ##
============================================
+ Coverage 56.12% 59.16% +3.03%
- Complexity 976 1477 +501
============================================
Files 119 167 +48
Lines 11743 15188 +3445
Branches 2251 2523 +272
============================================
+ Hits 6591 8986 +2395
- Misses 4012 4917 +905
- Partials 1140 1285 +145 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
Outdated
Show resolved
Hide resolved
| | BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. | | ||
| | BroadcastExchangeExec | Yes | | | ||
| | BroadcastHashJoinExec | Yes | | | ||
| | DataWritingCommandExec | No | Experimental support for native Parquet writes. Disabled by default. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean also Iceberg writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change to InsertIntoHadoopFsRelationCommand here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I updated this.
| pub struct ParquetWriterExec { | ||
| /// Input execution plan | ||
| input: Arc<dyn ExecutionPlan>, | ||
| /// Output file path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it file or folder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is folder. Files named part-*-.parquet will be created within the folder
|
|
||
| // Strip file:// or file: prefix if present | ||
| let local_path = output_path | ||
| .strip_prefix("file://") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if hdfs:// ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a fallback for now so that it falls back to Spark if the path does not start with file:
| })?; | ||
|
|
||
| // Generate part file name for this partition | ||
| let part_file = format!("{}/part-{:05}.parquet", local_path, self.partition_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem right, the extension will be different depending on the codec
.snappy.parquet
.gz.parquet
etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file name is best generated by FileCommitProtocol later, so hardcoding it on the native side for now makes sense to me.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L156-L163
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wForget, are you proposing to keep hardcoded names for the PR and replicate Spark getFilename later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this pr seems to be missing some work related to file commit. My proposed write process might look like this: create a staging dir -> native write files to staging dir -> file commit (move and merge staging files) -> add or update partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed #2827 for implementing the file commit protocol.
This PR adds a starting point for development. Once it is merged then other contributors can help add the missing features.
|
|
||
| // Execute the write task and convert to a stream | ||
| use datafusion::physical_plan::stream::RecordBatchStreamAdapter; | ||
| Ok(Box::pin(RecordBatchStreamAdapter::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the partition failed? what would happen with the folder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 this is all highly experimental so far
| } | ||
|
|
||
| message ParquetWriter { | ||
| string output_path = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if it is a partiioned writer? df.write.partitionBy()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this PR, we fall back to Spark for now for partitioned writes. There are checks in getSupportLevel.
comphead
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove it is a real good start
spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala
Outdated
Show resolved
Hide resolved
…WritingCommand.scala Co-authored-by: Zhen Wang <[email protected]>
wForget
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove , lgtm
Which issue does this PR close?
Part of #1625
Rationale for this change
We would eventually like to support native writes to Parquet. This PR adds a starting point for further development.
This is the result of vibe coding with Claude.
The goal is to add the minium possible implementation and test. There are plenty of things that are not implemented or tested yet.
Example of new native plan:
What changes are included in this PR?
ParquetWriterExecCometNativeWriteExecCometExecRuleHow are these changes tested?
New suite added.