From 5828d798f10097d8a7314e0ab979c1a5e565bc0d Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Wed, 5 Apr 2023 23:02:09 -0700 Subject: [PATCH] cdf prototype succeeded --- .../server/DeltaSharingServiceSuite.scala | 2 +- .../spark/DeltaSharingLogFileSystem.scala | 20 +++++++- .../sharing/spark/DeltaSharingSuite.scala | 49 ++++++++++++++----- 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 3839f7c36..6652c64a8 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -1799,7 +1799,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } integrationTest("linzhou_cdf_prototype") { - val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=1"), Some("GET"), None, Some(0)) + val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=2"), Some("GET"), None, Some(0)) val lines = response.split("\n") val protocol = lines(0) val metadata = lines(1) diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala index 624c7c56d..d96381e76 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala @@ -101,6 +101,10 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem { {"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/d7ed708546dd70fdff9191b3e3d6448b/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} {"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/60d0cf57f3e4367db154aa2c36152a1f/1030","partitionValues":{},"size":1030,"modificationTime":1651272635000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000001","OPTIMIZE_TARGET_SIZE":"268435456"}}} {"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/a6dc5694a4ebcc9a067b19c348526ad6/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000002","OPTIMIZE_TARGET_SIZE":"268435456"}}}""".stripMargin + val cdfJson2 = """{"cdc":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/6521ba910108d4b54d27beaa9fc2373f/1301","partitionValues":{},"size":1301,"dataChange":false}} +{"commitInfo":{"timestamp":1651272654866,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.delta.`s3://AKIA2JMHUIXTTBXLCSB2:o4bZ+L6Oo8XT5j0oGoGzG4Jr6waNeXpHufPbtGrM@delta-exchange-test/delta-exchange-test/cdf_table_cdf_enabled`.age = 3)\"]"},"notebook":{"notebookId":"3173513222201325"},"clusterId":"0819-204509-hill72","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"1","numAddedChangeFiles":"1","executionTimeMs":"1536","numDeletedRows":"1","scanTimeMs":"916","numAddedFiles":"0","rewriteTimeMs":"618"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"336afaea-72f4-46a4-9c69-809a90b3bdbd"}}""".stripMargin + val cdfJson3 = """{"cdc":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/2508998dce55bd726369e53761c4bc3f/1416","partitionValues":{},"size":1416,"dataChange":false}} +{"commitInfo":{"timestamp":1651272659127,"userId":"7953272455820895","userName":"lin.zhou@databricks.com","operation":"UPDATE","operationParameters":{"predicate":"(age#15119 = 2)"},"notebook":{"notebookId":"3173513222201325"},"clusterId":"0819-204509-hill72","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"0","numAddedChangeFiles":"1","executionTimeMs":"1222","scanTimeMs":"99","numAddedFiles":"1","numUpdatedRows":"1","rewriteTimeMs":"1119"},"engineInfo":"Databricks-Runtime/11.x-snapshot-aarch64-scala2.12","txnId":"4c39d77a-4105-4df9-aef8-2353411dd622"}}""".stripMargin // scalastyle:on override def open(f: Path, bufferSize: Int): FSDataInputStream = { // scalastyle:off println @@ -121,6 +125,16 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem { Console.println(s"----[linzhou]----returning cdf 1.json:${cdfJson1.length}") return new FSDataInputStream(new SeekableByteArrayInputStream( cdfJson1.getBytes(), "cdf_1.json")) + } else if (f.toString == + "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000002.json") { + Console.println(s"----[linzhou]----returning cdf 2.json:${cdfJson2.length}") + return new FSDataInputStream(new SeekableByteArrayInputStream( + cdfJson2.getBytes(), "cdf_2.json")) + } else if (f.toString == + "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000003.json") { + Console.println(s"----[linzhou]----returning cdf 3.json:${cdfJson3.length}") + return new FSDataInputStream(new SeekableByteArrayInputStream( + cdfJson3.getBytes(), "cdf_3.json")) } else if (f.toString == "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.crc") { Console.println(s"----[linzhou]----throwing exception for 1.crc") @@ -189,7 +203,11 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem { new FileStatus(0, false, 0, 1, 0, new Path( "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000000.json")), new FileStatus(cdfJson1.length, false, 0, 1, 1651272635000L, new Path( - "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json")) + "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json")), + new FileStatus(cdfJson2.length, false, 0, 1, 1651272655000L, new Path( + "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000002.json")), + new FileStatus(cdfJson3.length, false, 0, 1, 1651272660000L, new Path( + "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000003.json")) ) Console.println(s"----[linzhou]----listing:${a}") return a diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala index 27d65f641..3d91cad35 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import io.delta.sharing.spark.TestUtils._ @@ -93,6 +93,7 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar StructField("eventTime", TimestampType), StructField("date", DateType), StructField("type", StringType).withComment("this is a comment"))) + assert(spark.read.format("deltaSharing").load(tablePath).schema == expectedSchema) withTable("delta_sharing_test") { sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'") @@ -559,29 +560,51 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar val expected = Seq( Row("1", 1, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"), Row("2", 2, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"), - Row("3", 3, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert") -// Row("2", 2, sqlDate("2020-01-01"), 3L, 1651272660000L, "update_preimage"), -// Row("2", 2, sqlDate("2020-02-02"), 3L, 1651272660000L, "update_postimage"), -// Row("3", 3, sqlDate("2020-01-01"), 2L, 1651272655000L, "delete") + Row("3", 3, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"), + Row("2", 2, sqlDate("2020-01-01"), 3L, 1651272660000L, "update_preimage"), + Row("2", 2, sqlDate("2020-02-02"), 3L, 1651272660000L, "update_postimage"), + Row("3", 3, sqlDate("2020-01-01"), 2L, 1651272655000L, "delete") ) val result = spark.read.format("deltaSharing") .option("readChangeFeed", "true") .option("startingVersion", 1) - .option("endingVersion", 1).load(tablePath) + .option("endingVersion", 3).load(tablePath) checkAnswer(result, expected) // Console.println(s"----[linzhou]-------------[Test DeltaLog CDF]------") // checkAnswer(spark.read.format("deltaSharing").load( // "delta-sharing-log:///cdf_table_cdf_enabled"), expected) - Console.println(s"----[linzhou]-------------[Test CDF]------") + if (true) { + Console.println(s"----[linzhou]-------------[Test CDF]------") - val df = spark.read.format("delta") - .option("readChangeFeed", "true") - .option("startingVersion", 1) - .option("endingVersion", 1) - .load("delta-sharing-log:///cdf_table_cdf_enabled") - checkAnswer(df, expected) + val df = spark.read.format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 1) + .option("endingVersion", 3) + .load("delta-sharing-log:///cdf_table_cdf_enabled") + + val expectedRows = Seq( + Row("1", 1, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")), + Row("2", 2, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")), + Row("3", 3, sqlDate("2020-01-01"), "insert", 1L, sqlTimestamp("2022-04-29 15:50:35.0")), + Row("3", 3, sqlDate("2020-01-01"), "delete", 2L, sqlTimestamp("2022-04-29 15:50:55.0")), + Row("2", 2, sqlDate("2020-01-01"), "update_preimage", 3L, sqlTimestamp("2022-04-29 15:51:00.0")), + Row("2", 2, sqlDate("2020-02-02"), "update_postimage", 3L, sqlTimestamp("2022-04-29 15:51:00.0")) + ) + val expectedSchema = StructType(Array( + StructField("name", StringType), + StructField("age", IntegerType), + StructField("birthday", DateType), + StructField("_change_type", StringType), + StructField("_commit_version", LongType), + StructField("_commit_timestamp", TimestampType) + )) + import scala.collection.JavaConversions._ + var expectedDF = spark.createDataFrame(expectedRows,expectedSchema) + + checkAnswer(expectedDF, df) + } // // should work when selecting some columns in a different order // checkAnswer( // result.select("_change_type", "birthday", "age"),