From caf34ab3c9617c6872fdedab05cd676e70fc8d81 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 15 Jan 2025 08:22:37 -0800 Subject: [PATCH 1/4] refactor timestamp formatter and add test --- .../sql/execution/command/v2/DescribeTableSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 9cd7f0d8aade6..e9b75bbb3fdb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -55,6 +55,15 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase Row("# col_name", "data_type", "comment"), Row("s.id", "int", null), Row("s.a", "bigint", null))) + + // example date format: Mon Nov 01 12:00:00 UTC 2021 + val timeRegex = raw"""^[A-Z][a-z]{2} [A-Z][a-z]{2} [ 0-9][0-9] + |[0-9]{2}:[0-9]{2}:[0-9]{2} [A-Z]{3,4} + |[0-9]{4}$""".stripMargin.r + + val createdTimeValue = descriptionDf.filter("col_name = 'Created Time'") + .collect().head.getString(1) + assert(timeRegex.matches(createdTimeValue)) } } From 05bd5e5cc8d2afbf84032a4816d96f3c21d2ac70 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 15 Jan 2025 09:11:02 -0800 Subject: [PATCH 2/4] lint --- .../sql/execution/command/v2/DescribeTableSuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index e9b75bbb3fdb4..9cd7f0d8aade6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -55,15 +55,6 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase Row("# col_name", "data_type", "comment"), Row("s.id", "int", null), Row("s.a", "bigint", null))) - - // example date format: Mon Nov 01 12:00:00 UTC 2021 - val timeRegex = raw"""^[A-Z][a-z]{2} [A-Z][a-z]{2} [ 0-9][0-9] - |[0-9]{2}:[0-9]{2}:[0-9]{2} [A-Z]{3,4} - |[0-9]{4}$""".stripMargin.r - - val createdTimeValue = descriptionDf.filter("col_name = 'Created Time'") - .collect().head.getString(1) - assert(timeRegex.matches(createdTimeValue)) } } From 71a785ee26bc049715e4f8538b2a22df726fbb8d Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 29 Jan 2025 09:00:44 -0800 Subject: [PATCH 3/4] statistics support --- .../sql/catalyst/catalog/interface.scala | 86 +++++++++++++------ .../command/v2/DescribeTableSuite.scala | 38 ++++++++ 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 6963e89cf0418..4e72008477c76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} import org.apache.commons.lang3.StringUtils -import org.json4s.JsonAST.{JArray, JBool, JDouble, JInt, JLong, JNull, JObject, JString, JValue} +import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JInt, JLong, JNull, JObject, JString, JValue} import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkException @@ -61,36 +61,60 @@ trait MetadataMapSupport { jsonToString(toJsonLinkedHashMap) } + private def jsonToStringReformat( + key: String, jValue: JValue, map: mutable.LinkedHashMap[String, String]): Unit = { + val reformattedValue: String = if (key == "Statistics") { + jValue match { + case JObject(fields) => + fields.flatMap { + case ("size_in_bytes", JDecimal(bytes)) => + Some(s"$bytes bytes") + case ("num_rows", JDecimal(rows)) => + Some(s"$rows rows") + case _ => None + }.mkString(", ") + case _ => jValue.values.toString + } + } else if (key == "Created Time" || key == "Last Access") { + jValue match { + case JLong(value) => new Date(value).toString + case _ => jValue.values.toString + } + } else { + jValue.values.toString + } + map.put(key, reformattedValue) + } + protected def jsonToString( jsonMap: mutable.LinkedHashMap[String, JValue]): mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() - val timestampKeys = Set("Created Time", "Last Access") + val reformatKeys = Set("Statistics", "Created Time", "Last Access") jsonMap.foreach { case (key, jValue) => - val stringValue = jValue match { - case JString(value) => value - case JArray(values) => - values.map(_.values) - .map { - case str: String => quoteIdentifier(str) - case other => other.toString + if (reformatKeys.contains(key)) { + jsonToStringReformat(key, jValue, map) + } else { + val stringValue = jValue match { + case JString(value) => value + case JArray(values) => + values.map(_.values) + .map { + case str: String => quoteIdentifier(str) + case other => other.toString + } + .mkString("[", ", ", "]") + case JObject(fields) => + fields.map { case (k, v) => + s"$k=${v.values.toString}" } - .mkString("[", ", ", "]") - case JObject(fields) => - fields.map { case (k, v) => - s"$k=${v.values.toString}" - } - .mkString("[", ", ", "]") - case JInt(value) => value.toString - case JDouble(value) => value.toString - case JLong(value) => - if (timestampKeys.contains(key)) { - new Date(value).toString - } else { - value.toString - } - case _ => jValue.values.toString + .mkString("[", ", ", "]") + case JInt(value) => value.toString + case JDouble(value) => value.toString + case JLong(value) => value.toString + case _ => jValue.values.toString + } + map.put(key, stringValue) } - map.put(key, stringValue) } map } @@ -642,7 +666,9 @@ case class CatalogTable( map += "View Query Output Columns" -> viewQueryOutputColumns } if (tableProperties != JNull) map += "Table Properties" -> tableProperties - if (stats.isDefined) map += "Statistics" -> JString(stats.get.simpleString) + stats.foreach { s => + map += "Statistics" -> JObject(s.jsonString.toList) + } map ++= storage.toJsonLinkedHashMap.map { case (k, v) => k -> v } if (tracksPartitionsInCatalog) map += "Partition Provider" -> JString("Catalog") if (partitionColumns != JNull) map += "Partition Columns" -> partitionColumns @@ -811,6 +837,14 @@ case class CatalogStatistics( val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else "" s"$sizeInBytes bytes$rowCountString" } + + def jsonString: Map[String, JValue] = { + val rowCountInt: BigInt = rowCount.getOrElse(0L) + Map( + "size_in_bytes" -> JDecimal(BigDecimal(sizeInBytes)), + "num_rows" -> JDecimal(BigDecimal(rowCountInt)) + ) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 9cd7f0d8aade6..18a5976730d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -95,6 +95,44 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase } } + test("DESCRIBE TABLE EXTENDED AS JSON of a partitioned table") { + withNamespaceAndTable("ns", "table") { tbl => + spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + + " PARTITIONED BY (id)" + + " TBLPROPERTIES ('bar'='baz')" + + " COMMENT 'this is a test table'" + + " LOCATION 'file:/tmp/testcat/table_name'") + val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl AS JSON") + System.out.print(descriptionDf.show(truncate = false)) +// assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( +// ("col_name", StringType), +// ("data_type", StringType), +// ("comment", StringType))) +// QueryTest.checkAnswer( +// descriptionDf, +// Seq( +// Row("id", "bigint", null), +// Row("data", "string", null), +// Row("# Partition Information", "", ""), +// Row("# col_name", "data_type", "comment"), +// Row("id", "bigint", null), +// Row("", "", ""), +// Row("# Metadata Columns", "", ""), +// Row("index", "int", "Metadata column used to conflict with a data column"), +// Row("_partition", "string", "Partition key used to store the row"), +// Row("", "", ""), +// Row("# Detailed Table Information", "", ""), +// Row("Name", tbl, ""), +// Row("Type", "MANAGED", ""), +// Row("Comment", "this is a test table", ""), +// Row("Location", "file:/tmp/testcat/table_name", ""), +// Row("Provider", "_", ""), +// Row(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""), +// Row("Table Properties", "[bar=baz]", ""), +// Row("Statistics", "0 bytes, 0 rows", null))) + } + } + test("describe a non-existent column") { withNamespaceAndTable("ns", "tbl") { tbl => sql(s""" From cefbf776dfff0e73e050d34c9bbad6cc83ac8991 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Wed, 29 Jan 2025 09:16:24 -0800 Subject: [PATCH 4/4] fix test --- .../command/v1/DescribeTableSuite.scala | 10 ++++- .../command/v2/DescribeTableSuite.scala | 38 ------------------- 2 files changed, 8 insertions(+), 40 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala index eef8e212435c9..8afd6013916c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/DescribeTableSuite.scala @@ -273,7 +273,7 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase } } - test("DESCRIBE AS JSON partition spec") { + test("DESCRIBE AS JSON partition spec and statistics") { withNamespaceAndTable("ns", "table") { t => val tableCreationStr = s""" @@ -289,6 +289,7 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase |""".stripMargin spark.sql(tableCreationStr) spark.sql(s"ALTER TABLE $t ADD PARTITION (region='USA', category='tech')") + spark.sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS") val descriptionDf = spark.sql(s"DESCRIBE FORMATTED $t PARTITION (region='USA', category='tech') AS JSON") @@ -324,7 +325,11 @@ trait DescribeTableSuiteBase extends command.DescribeTableSuiteBase }, partition_provider = Some("Catalog"), partition_columns = Some(List("region", "category")), - partition_values = Some(Map("region" -> "USA", "category" -> "tech")) + partition_values = Some(Map("region" -> "USA", "category" -> "tech")), + statistics = Some(Map( + "size_in_bytes" -> 0, + "num_rows" -> 0 + )) ) assert(parsedOutput.location.isDefined) @@ -726,6 +731,7 @@ case class DescribeTableJson( partition_provider: Option[String] = None, partition_columns: Option[List[String]] = Some(Nil), partition_values: Option[Map[String, String]] = None, + statistics: Option[Map[String, Any]] = None, view_text: Option[String] = None, view_original_text: Option[String] = None, view_schema_mode: Option[String] = None, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala index 18a5976730d97..9cd7f0d8aade6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala @@ -95,44 +95,6 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase } } - test("DESCRIBE TABLE EXTENDED AS JSON of a partitioned table") { - withNamespaceAndTable("ns", "table") { tbl => - spark.sql(s"CREATE TABLE $tbl (id bigint, data string) $defaultUsing" + - " PARTITIONED BY (id)" + - " TBLPROPERTIES ('bar'='baz')" + - " COMMENT 'this is a test table'" + - " LOCATION 'file:/tmp/testcat/table_name'") - val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED $tbl AS JSON") - System.out.print(descriptionDf.show(truncate = false)) -// assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( -// ("col_name", StringType), -// ("data_type", StringType), -// ("comment", StringType))) -// QueryTest.checkAnswer( -// descriptionDf, -// Seq( -// Row("id", "bigint", null), -// Row("data", "string", null), -// Row("# Partition Information", "", ""), -// Row("# col_name", "data_type", "comment"), -// Row("id", "bigint", null), -// Row("", "", ""), -// Row("# Metadata Columns", "", ""), -// Row("index", "int", "Metadata column used to conflict with a data column"), -// Row("_partition", "string", "Partition key used to store the row"), -// Row("", "", ""), -// Row("# Detailed Table Information", "", ""), -// Row("Name", tbl, ""), -// Row("Type", "MANAGED", ""), -// Row("Comment", "this is a test table", ""), -// Row("Location", "file:/tmp/testcat/table_name", ""), -// Row("Provider", "_", ""), -// Row(TableCatalog.PROP_OWNER.capitalize, Utils.getCurrentUserName(), ""), -// Row("Table Properties", "[bar=baz]", ""), -// Row("Statistics", "0 bytes, 0 rows", null))) - } - } - test("describe a non-existent column") { withNamespaceAndTable("ns", "tbl") { tbl => sql(s"""