diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index a4064fec769..078b305da59 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -26,9 +26,7 @@ import org.apache.kylin.common.{KylinConfig, QueryContextFacade} import org.apache.kylin.engine.spark.metadata.cube.StructField import org.apache.kylin.query.runtime.plans.QueryToExecutionIDCache import org.apache.spark.network.util.JavaUtils -import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.utils.{QueryMetricUtils, ResourceDetectUtils} -import org.apache.spark.sql.types.StringType import org.apache.spark.sql.utils.SparkTypeUtil import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -75,15 +73,9 @@ object SparkSqlClient { ss.sparkContext.setJobGroup(jobGroup, "Pushdown Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true) try { - val temporarySchema = df.schema.fields.zipWithIndex.map { - case (_, index) => s"temporary_$index" - } - val tempDF = df.toDF(temporarySchema: _*) - val columns = tempDF.schema.map(tp => col(s"`${tp.name}`").cast(StringType)) - val frame = tempDF.select(columns: _*) - val rowList = frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava + val rowList = df.collect().map(_.toSeq.map(col => if (col == null) "" else col.toString).asJava).toSeq.asJava val fieldList = df.schema.map(field => SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava - val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan) + val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum) QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum) QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum)