Skip to content

Commit 1ec3563

Browse files
authored
feat: Add experimental support for native Parquet writes (#2812)
1 parent 6828dc4 commit 1ec3563

File tree

13 files changed

+827
-22
lines changed

13 files changed

+827
-22
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ jobs:
118118
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
119119
- name: "parquet"
120120
value: |
121+
org.apache.comet.parquet.CometParquetWriterSuite
121122
org.apache.comet.parquet.ParquetReadV1Suite
122123
org.apache.comet.parquet.ParquetReadV2Suite
123124
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ jobs:
8383
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
8484
- name: "parquet"
8585
value: |
86+
org.apache.comet.parquet.CometParquetWriterSuite
8687
org.apache.comet.parquet.ParquetReadV1Suite
8788
org.apache.comet.parquet.ParquetReadV2Suite
8889
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ object CometConf extends ShimCometConf {
100100
.booleanConf
101101
.createWithDefault(true)
102102

103+
val COMET_NATIVE_PARQUET_WRITE_ENABLED: ConfigEntry[Boolean] =
104+
conf("spark.comet.parquet.write.enabled")
105+
.category(CATEGORY_TESTING)
106+
.doc(
107+
"Whether to enable native Parquet write through Comet. When enabled, " +
108+
"Comet will intercept Parquet write operations and execute them natively. This " +
109+
"feature is highly experimental and only partially implemented. It should not " +
110+
"be used in production.")
111+
.booleanConf
112+
.createWithDefault(false)
113+
103114
val SCAN_NATIVE_COMET = "native_comet"
104115
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
105116
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ These settings can be used to determine which parts of the plan are accelerated
142142
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. It can be overridden by the environment variable `ENABLE_COMET_ONHEAP`. | false |
143143
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
144144
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
145+
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
145146
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
146147
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
147148
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |

docs/source/user-guide/latest/operators.md

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,26 @@
2222
The following Spark operators are currently replaced with native versions. Query stages that contain any operators
2323
not supported by Comet will fall back to regular Spark execution.
2424

25-
| Operator | Spark-Compatible? | Compatibility Notes |
26-
| ----------------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------ |
27-
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
28-
| BroadcastExchangeExec | Yes | |
29-
| BroadcastHashJoinExec | Yes | |
30-
| ExpandExec | Yes | |
31-
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
32-
| FilterExec | Yes | |
33-
| GlobalLimitExec | Yes | |
34-
| HashAggregateExec | Yes | |
35-
| LocalLimitExec | Yes | |
36-
| LocalTableScanExec | No | Experimental and disabled by default. |
37-
| ObjectHashAggregateExec | Yes | Supports a limited number of aggregates, such as `bloom_filter_agg`. |
38-
| ProjectExec | Yes | |
39-
| ShuffleExchangeExec | Yes | |
40-
| ShuffledHashJoinExec | Yes | |
41-
| SortExec | Yes | |
42-
| SortMergeJoinExec | Yes | |
43-
| UnionExec | Yes | |
44-
| WindowExec | No | Disabled by default due to known correctness issues. |
25+
| Operator | Spark-Compatible? | Compatibility Notes |
26+
| --------------------------------- | ----------------- | ------------------------------------------------------------------------------------------------------------------ |
27+
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
28+
| BroadcastExchangeExec | Yes | |
29+
| BroadcastHashJoinExec | Yes | |
30+
| ExpandExec | Yes | |
31+
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
32+
| FilterExec | Yes | |
33+
| GlobalLimitExec | Yes | |
34+
| HashAggregateExec | Yes | |
35+
| InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. |
36+
| LocalLimitExec | Yes | |
37+
| LocalTableScanExec | No | Experimental and disabled by default. |
38+
| ObjectHashAggregateExec | Yes | Supports a limited number of aggregates, such as `bloom_filter_agg`. |
39+
| ProjectExec | Yes | |
40+
| ShuffleExchangeExec | Yes | |
41+
| ShuffledHashJoinExec | Yes | |
42+
| SortExec | Yes | |
43+
| SortMergeJoinExec | Yes | |
44+
| UnionExec | Yes | |
45+
| WindowExec | No | Disabled by default due to known correctness issues. |
4546

4647
[Comet Compatibility Guide]: compatibility.md

native/core/src/execution/operators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ mod copy;
2929
mod expand;
3030
pub use expand::ExpandExec;
3131
mod iceberg_scan;
32+
mod parquet_writer;
33+
pub use parquet_writer::ParquetWriterExec;
3234
mod scan;
3335

3436
/// Error returned during executing operators.

0 commit comments

Comments
 (0)