diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala new file mode 100644 index 000000000000..7ae80efc6201 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.extension + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.Partition +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.read.streaming.SparkDataStream +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.BitSet + +class GlutenCustomerExtensionSuite extends SharedSparkSession { + // These configs only take effect on ClickHouse backend. + private val ExtendedColumnarTransformRulesKey = + "spark.gluten.sql.columnar.extended.columnar.transform.rules" + private val ExtendedColumnarPostRulesKey = + "spark.gluten.sql.columnar.extended.columnar.post.rules" + + override def sparkConf: SparkConf = { + super.sparkConf + .setAppName("Gluten-UT") + .set("spark.driver.memory", "1G") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.adaptive.enabled", "false") + .set("spark.ui.enabled", "false") + .set(GlutenConfig.GLUTEN_UI_ENABLED.key, "false") + .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(GlutenConfig.NATIVE_VALIDATION_ENABLED.key, "false") + .set( + ExtendedColumnarTransformRulesKey, + "org.apache.spark.sql" + + ".extension.CustomerColumnarPreRules") + .set(ExtendedColumnarPostRulesKey, "") + } + + test("customer column rules") { + withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { + sql("create table my_parquet(id int) using parquet") + sql("insert into my_parquet values (1)") + sql("insert into my_parquet values (2)") + } + withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { + val df = sql("select * from my_parquet") + val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { + case f: TestFileSourceScanExecTransformer => f + } + assert(testFileSourceScanExecTransformer.nonEmpty) + assert(testFileSourceScanExecTransformer.head.nodeNamePrefix.equals("TestFile")) + } + } +} + +/** Test for customer column rules */ +case class TestFileSourceScanExecTransformer( + @transient override val relation: HadoopFsRelation, + @transient stream: Option[SparkDataStream], + override val output: Seq[Attribute], + override val requiredSchema: StructType, + override val partitionFilters: Seq[Expression], + override val optionalBucketSet: Option[BitSet], + override val optionalNumCoalescedBuckets: Option[Int], + override val dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier], + override val disableBucketedScan: Boolean = false, + override val pushDownFilters: Option[Seq[Expression]] = None) + extends FileSourceScanExecTransformerBase( + relation, + stream, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + disableBucketedScan) { + + override def getPartitions: Seq[Partition] = + BackendsApiManager.getTransformerApiInstance + .genPartitionSeq( + relation, + requiredSchema, + selectedPartitions, + output, + bucketedScan, + optionalBucketSet, + optionalNumCoalescedBuckets, + disableBucketedScan) + + override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = + getPartitions.map((_, fileFormat)) + + override val nodeNamePrefix: String = "TestFile" + + override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = + copy(pushDownFilters = Some(filters)) +} + +case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { + + override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { + case fileSourceScan: FileSourceScanExec => + val transformer = new TestFileSourceScanExecTransformer( + fileSourceScan.relation, + SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), + fileSourceScan.output, + fileSourceScan.requiredSchema, + fileSourceScan.partitionFilters, + fileSourceScan.optionalBucketSet, + fileSourceScan.optionalNumCoalescedBuckets, + fileSourceScan.dataFilters, + fileSourceScan.tableIdentifier, + fileSourceScan.disableBucketedScan + ) + if (transformer.doValidate().ok()) { + transformer + } else { + plan + } + } +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 2ba48f9e546c..03c2d0fa258d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.GlutenSessionExtensionSuite import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite import org.apache.spark.sql.sources._ @@ -1819,7 +1819,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("full outer join with unique keys using ShuffledHashJoin (whole-stage-codegen on)") .exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)") .exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)") - enableSuite[GlutenCustomerExtensionSuite] enableSuite[GlutenSessionExtensionSuite] enableSuite[GlutenFallbackSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index e4cce67a15b8..62634fc7dab0 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -567,7 +567,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] enableSuite[GlutenSessionExtensionSuite] - enableSuite[TestFileSourceScanExecTransformer] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala deleted file mode 100644 index 5de35daaedf9..000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} - -case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { - - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case fileSourceScan: FileSourceScanExec => - val transformer = new TestFileSourceScanExecTransformer( - fileSourceScan.relation, - SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), - fileSourceScan.output, - fileSourceScan.requiredSchema, - fileSourceScan.partitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) - if (transformer.doValidate().ok()) { - transformer - } else { - plan - } - } -} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala deleted file mode 100644 index 4bd104f04c9f..000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait - -class GlutenCustomerExtensionSuite extends GlutenSQLTestsTrait { - // These configs only take effect on ClickHouse backend. - private val ExtendedColumnarTransformRulesKey = - "spark.gluten.sql.columnar.extended.columnar.transform.rules" - private val ExtendedColumnarPostRulesKey = - "spark.gluten.sql.columnar.extended.columnar.post.rules" - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.adaptive.enabled", "false") - .set( - ExtendedColumnarTransformRulesKey, - "org.apache.spark.sql" + - ".extension.CustomerColumnarPreRules") - .set(ExtendedColumnarPostRulesKey, "") - } - - testGluten("test customer column rules") { - withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { - sql("create table my_parquet(id int) using parquet") - sql("insert into my_parquet values (1)") - sql("insert into my_parquet values (2)") - } - withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { - val df = sql("select * from my_parquet") - val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { - case f: TestFileSourceScanExecTransformer => f - } - assert(!testFileSourceScanExecTransformer.isEmpty) - assert(testFileSourceScanExecTransformer(0).nodeNamePrefix.equals("TestFile")) - } - } -} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala deleted file mode 100644 index c2a775bd7f4d..000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.Partition -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.BitSet - -/** Test for customer column rules */ -case class TestFileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, - @transient stream: Option[SparkDataStream], - override val output: Seq[Attribute], - override val requiredSchema: StructType, - override val partitionFilters: Seq[Expression], - override val optionalBucketSet: Option[BitSet], - override val optionalNumCoalescedBuckets: Option[Int], - override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false, - override val pushDownFilters: Option[Seq[Expression]] = None) - extends FileSourceScanExecTransformerBase( - relation, - stream, - output, - requiredSchema, - partitionFilters, - optionalBucketSet, - optionalNumCoalescedBuckets, - dataFilters, - tableIdentifier, - disableBucketedScan) { - - override def getPartitions: Seq[Partition] = - BackendsApiManager.getTransformerApiInstance - .genPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) - - override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = - getPartitions.map((_, fileFormat)) - - override val nodeNamePrefix: String = "TestFile" - - override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = - copy(pushDownFilters = Some(filters)) -} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index a2f6499e27b6..6c5263a420ff 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} -import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.GlutenSessionExtensionSuite import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite import org.apache.spark.sql.sources._ @@ -1682,7 +1682,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("full outer join with unique keys using ShuffledHashJoin (whole-stage-codegen on)") .exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen off)") .exclude("full outer join with unique keys using SortMergeJoin (whole-stage-codegen on)") - enableSuite[GlutenCustomerExtensionSuite] enableSuite[GlutenSessionExtensionSuite] enableSuite[GlutenFallbackSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 3f10a194ffb1..d457638e33b2 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} @@ -587,7 +587,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] enableSuite[GlutenSessionExtensionSuite] - enableSuite[TestFileSourceScanExecTransformer] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala deleted file mode 100644 index 5de35daaedf9..000000000000 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} - -case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { - - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case fileSourceScan: FileSourceScanExec => - val transformer = new TestFileSourceScanExecTransformer( - fileSourceScan.relation, - SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), - fileSourceScan.output, - fileSourceScan.requiredSchema, - fileSourceScan.partitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) - if (transformer.doValidate().ok()) { - transformer - } else { - plan - } - } -} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala deleted file mode 100644 index 4bd104f04c9f..000000000000 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait - -class GlutenCustomerExtensionSuite extends GlutenSQLTestsTrait { - // These configs only take effect on ClickHouse backend. - private val ExtendedColumnarTransformRulesKey = - "spark.gluten.sql.columnar.extended.columnar.transform.rules" - private val ExtendedColumnarPostRulesKey = - "spark.gluten.sql.columnar.extended.columnar.post.rules" - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.adaptive.enabled", "false") - .set( - ExtendedColumnarTransformRulesKey, - "org.apache.spark.sql" + - ".extension.CustomerColumnarPreRules") - .set(ExtendedColumnarPostRulesKey, "") - } - - testGluten("test customer column rules") { - withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { - sql("create table my_parquet(id int) using parquet") - sql("insert into my_parquet values (1)") - sql("insert into my_parquet values (2)") - } - withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { - val df = sql("select * from my_parquet") - val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { - case f: TestFileSourceScanExecTransformer => f - } - assert(!testFileSourceScanExecTransformer.isEmpty) - assert(testFileSourceScanExecTransformer(0).nodeNamePrefix.equals("TestFile")) - } - } -} diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala deleted file mode 100644 index c2a775bd7f4d..000000000000 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.Partition -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.BitSet - -/** Test for customer column rules */ -case class TestFileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, - @transient stream: Option[SparkDataStream], - override val output: Seq[Attribute], - override val requiredSchema: StructType, - override val partitionFilters: Seq[Expression], - override val optionalBucketSet: Option[BitSet], - override val optionalNumCoalescedBuckets: Option[Int], - override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false, - override val pushDownFilters: Option[Seq[Expression]] = None) - extends FileSourceScanExecTransformerBase( - relation, - stream, - output, - requiredSchema, - partitionFilters, - optionalBucketSet, - optionalNumCoalescedBuckets, - dataFilters, - tableIdentifier, - disableBucketedScan) { - - override def getPartitions: Seq[Partition] = - BackendsApiManager.getTransformerApiInstance - .genPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) - - override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = - getPartitions.map((_, fileFormat)) - - override val nodeNamePrefix: String = "TestFile" - - override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = - copy(pushDownFilters = Some(filters)) -} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 30ee897ab42c..4da1c6537609 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite import org.apache.spark.sql.sources._ @@ -442,7 +442,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("CREATE TABLE USING AS SELECT based on the file without write permission") .exclude("create a table, drop it and create another one with the same name") enableSuite[GlutenCsvFunctionsSuite] - enableSuite[GlutenCustomerExtensionSuite] enableSuite[GlutenDDLSourceLoadSuite] enableSuite[GlutenDSV2CharVarcharTestSuite] // Excluded. The Gluten tests for char/varchar validation were rewritten for Velox. diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 365f5198f5a7..ed9f9ccfd968 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2Strategy import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.GlutenFallbackSuite import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -534,7 +534,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] enableSuite[GlutenSessionExtensionSuite] - enableSuite[TestFileSourceScanExecTransformer] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala deleted file mode 100644 index 5de35daaedf9..000000000000 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} - -case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { - - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case fileSourceScan: FileSourceScanExec => - val transformer = new TestFileSourceScanExecTransformer( - fileSourceScan.relation, - SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), - fileSourceScan.output, - fileSourceScan.requiredSchema, - fileSourceScan.partitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) - if (transformer.doValidate().ok()) { - transformer - } else { - plan - } - } -} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala deleted file mode 100644 index cb37cec8958a..000000000000 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait - -class GlutenCustomerExtensionSuite extends GlutenSQLTestsTrait { - // These configs only take effect on ClickHouse backend. - private val ExtendedColumnarTransformRulesKey = - "spark.gluten.sql.columnar.extended.columnar.transform.rules" - private val ExtendedColumnarPostRulesKey = - "spark.gluten.sql.columnar.extended.columnar.post.rules" - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.adaptive.enabled", "false") - .set( - ExtendedColumnarTransformRulesKey, - "org.apache.spark.sql" + - ".extension.CustomerColumnarPreRules") - .set(ExtendedColumnarPostRulesKey, "") - } - - testGluten("test customer column rules") { - withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { - sql("create table my_parquet(id int) using parquet") - sql("insert into my_parquet values (1)") - sql("insert into my_parquet values (2)") - } - withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { - val df = sql("select * from my_parquet") - val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { - case f: TestFileSourceScanExecTransformer => f - } - assert(testFileSourceScanExecTransformer.nonEmpty) - assert(testFileSourceScanExecTransformer.head.nodeNamePrefix.equals("TestFile")) - } - } -} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala deleted file mode 100644 index c2a775bd7f4d..000000000000 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.Partition -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.BitSet - -/** Test for customer column rules */ -case class TestFileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, - @transient stream: Option[SparkDataStream], - override val output: Seq[Attribute], - override val requiredSchema: StructType, - override val partitionFilters: Seq[Expression], - override val optionalBucketSet: Option[BitSet], - override val optionalNumCoalescedBuckets: Option[Int], - override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false, - override val pushDownFilters: Option[Seq[Expression]] = None) - extends FileSourceScanExecTransformerBase( - relation, - stream, - output, - requiredSchema, - partitionFilters, - optionalBucketSet, - optionalNumCoalescedBuckets, - dataFilters, - tableIdentifier, - disableBucketedScan) { - - override def getPartitions: Seq[Partition] = - BackendsApiManager.getTransformerApiInstance - .genPartitionSeq( - relation, - requiredSchema, - selectedPartitions, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) - - override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = - getPartitions.map((_, fileFormat)) - - override val nodeNamePrefix: String = "TestFile" - - override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = - copy(pushDownFilters = Some(filters)) -} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 5a04389186b2..c5ac28bc7daf 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, GlutenValidateRequirementsSuite} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite import org.apache.spark.sql.sources._ @@ -441,7 +441,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("CREATE TABLE USING AS SELECT based on the file without write permission") .exclude("create a table, drop it and create another one with the same name") enableSuite[GlutenCsvFunctionsSuite] - enableSuite[GlutenCustomerExtensionSuite] enableSuite[GlutenDDLSourceLoadSuite] enableSuite[GlutenDSV2CharVarcharTestSuite] enableSuite[GlutenDSV2SQLInsertTestSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 498483cb659a..3c557524ee95 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, G import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQLMetricsSuite} import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -740,7 +740,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] enableSuite[GlutenSessionExtensionSuite] - enableSuite[TestFileSourceScanExecTransformer] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala deleted file mode 100644 index 5de35daaedf9..000000000000 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} - -case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { - - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case fileSourceScan: FileSourceScanExec => - val transformer = new TestFileSourceScanExecTransformer( - fileSourceScan.relation, - SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), - fileSourceScan.output, - fileSourceScan.requiredSchema, - fileSourceScan.partitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) - if (transformer.doValidate().ok()) { - transformer - } else { - plan - } - } -} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala deleted file mode 100644 index cb37cec8958a..000000000000 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait - -class GlutenCustomerExtensionSuite extends GlutenSQLTestsTrait { - // These configs only take effect on ClickHouse backend. - private val ExtendedColumnarTransformRulesKey = - "spark.gluten.sql.columnar.extended.columnar.transform.rules" - private val ExtendedColumnarPostRulesKey = - "spark.gluten.sql.columnar.extended.columnar.post.rules" - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.adaptive.enabled", "false") - .set( - ExtendedColumnarTransformRulesKey, - "org.apache.spark.sql" + - ".extension.CustomerColumnarPreRules") - .set(ExtendedColumnarPostRulesKey, "") - } - - testGluten("test customer column rules") { - withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { - sql("create table my_parquet(id int) using parquet") - sql("insert into my_parquet values (1)") - sql("insert into my_parquet values (2)") - } - withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { - val df = sql("select * from my_parquet") - val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { - case f: TestFileSourceScanExecTransformer => f - } - assert(testFileSourceScanExecTransformer.nonEmpty) - assert(testFileSourceScanExecTransformer.head.nodeNamePrefix.equals("TestFile")) - } - } -} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala deleted file mode 100644 index 18c5709bf51a..000000000000 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.Partition -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.BitSet - -/** Test for customer column rules */ -case class TestFileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, - @transient val stream: Option[SparkDataStream], - override val output: Seq[Attribute], - override val requiredSchema: StructType, - override val partitionFilters: Seq[Expression], - override val optionalBucketSet: Option[BitSet], - override val optionalNumCoalescedBuckets: Option[Int], - override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false, - override val pushDownFilters: Option[Seq[Expression]] = None) - extends FileSourceScanExecTransformerBase( - relation, - stream, - output, - requiredSchema, - partitionFilters, - optionalBucketSet, - optionalNumCoalescedBuckets, - dataFilters, - tableIdentifier, - disableBucketedScan) { - - override def getPartitions: Seq[Partition] = - BackendsApiManager.getTransformerApiInstance.genPartitionSeq( - relation, - requiredSchema, - getPartitionArray, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) - - override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = - getPartitions.map((_, fileFormat)) - - override val nodeNamePrefix: String = "TestFile" - - override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = - copy(pushDownFilters = Some(filters)) -} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 5a04389186b2..c5ac28bc7daf 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite} import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, GlutenValidateRequirementsSuite} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite import org.apache.spark.sql.sources._ @@ -441,7 +441,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("CREATE TABLE USING AS SELECT based on the file without write permission") .exclude("create a table, drop it and create another one with the same name") enableSuite[GlutenCsvFunctionsSuite] - enableSuite[GlutenCustomerExtensionSuite] enableSuite[GlutenDDLSourceLoadSuite] enableSuite[GlutenDSV2CharVarcharTestSuite] enableSuite[GlutenDSV2SQLInsertTestSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index c010f1b026ee..7cf5a28e0194 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.{GlutenEnsureRequirementsSuite, G import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.metric.{GlutenCustomMetricsSuite, GlutenSQLMetricsSuite} import org.apache.spark.sql.execution.python._ -import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} +import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite} import org.apache.spark.sql.gluten.{GlutenFallbackStrategiesSuite, GlutenFallbackSuite} import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite import org.apache.spark.sql.sources._ @@ -725,7 +725,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("test with low buffer spill threshold") enableSuite[GlutenTakeOrderedAndProjectSuite] enableSuite[GlutenSessionExtensionSuite] - enableSuite[TestFileSourceScanExecTransformer] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala deleted file mode 100644 index 5de35daaedf9..000000000000 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.sql.shims.SparkShimLoader - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} - -case class CustomerColumnarPreRules(session: SparkSession) extends Rule[SparkPlan] { - - override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { - case fileSourceScan: FileSourceScanExec => - val transformer = new TestFileSourceScanExecTransformer( - fileSourceScan.relation, - SparkShimLoader.getSparkShims.getFileSourceScanStream(fileSourceScan), - fileSourceScan.output, - fileSourceScan.requiredSchema, - fileSourceScan.partitionFilters, - fileSourceScan.optionalBucketSet, - fileSourceScan.optionalNumCoalescedBuckets, - fileSourceScan.dataFilters, - fileSourceScan.tableIdentifier, - fileSourceScan.disableBucketedScan - ) - if (transformer.doValidate().ok()) { - transformer - } else { - plan - } - } -} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala deleted file mode 100644 index cb37cec8958a..000000000000 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/GlutenCustomerExtensionSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.GlutenSQLTestsTrait - -class GlutenCustomerExtensionSuite extends GlutenSQLTestsTrait { - // These configs only take effect on ClickHouse backend. - private val ExtendedColumnarTransformRulesKey = - "spark.gluten.sql.columnar.extended.columnar.transform.rules" - private val ExtendedColumnarPostRulesKey = - "spark.gluten.sql.columnar.extended.columnar.post.rules" - - override def sparkConf: SparkConf = { - super.sparkConf - .set("spark.sql.adaptive.enabled", "false") - .set( - ExtendedColumnarTransformRulesKey, - "org.apache.spark.sql" + - ".extension.CustomerColumnarPreRules") - .set(ExtendedColumnarPostRulesKey, "") - } - - testGluten("test customer column rules") { - withSQLConf((GlutenConfig.GLUTEN_ENABLED.key, "false")) { - sql("create table my_parquet(id int) using parquet") - sql("insert into my_parquet values (1)") - sql("insert into my_parquet values (2)") - } - withSQLConf((GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key, "false")) { - val df = sql("select * from my_parquet") - val testFileSourceScanExecTransformer = df.queryExecution.executedPlan.collect { - case f: TestFileSourceScanExecTransformer => f - } - assert(testFileSourceScanExecTransformer.nonEmpty) - assert(testFileSourceScanExecTransformer.head.nodeNamePrefix.equals("TestFile")) - } - } -} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala deleted file mode 100644 index 18c5709bf51a..000000000000 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.extension - -import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.execution.{BasicScanExecTransformer, FileSourceScanExecTransformerBase} -import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat - -import org.apache.spark.Partition -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.connector.read.streaming.SparkDataStream -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.BitSet - -/** Test for customer column rules */ -case class TestFileSourceScanExecTransformer( - @transient override val relation: HadoopFsRelation, - @transient val stream: Option[SparkDataStream], - override val output: Seq[Attribute], - override val requiredSchema: StructType, - override val partitionFilters: Seq[Expression], - override val optionalBucketSet: Option[BitSet], - override val optionalNumCoalescedBuckets: Option[Int], - override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false, - override val pushDownFilters: Option[Seq[Expression]] = None) - extends FileSourceScanExecTransformerBase( - relation, - stream, - output, - requiredSchema, - partitionFilters, - optionalBucketSet, - optionalNumCoalescedBuckets, - dataFilters, - tableIdentifier, - disableBucketedScan) { - - override def getPartitions: Seq[Partition] = - BackendsApiManager.getTransformerApiInstance.genPartitionSeq( - relation, - requiredSchema, - getPartitionArray, - output, - bucketedScan, - optionalBucketSet, - optionalNumCoalescedBuckets, - disableBucketedScan) - - override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = - getPartitions.map((_, fileFormat)) - - override val nodeNamePrefix: String = "TestFile" - - override def withNewPushdownFilters(filters: Seq[Expression]): BasicScanExecTransformer = - copy(pushDownFilters = Some(filters)) -}