Skip to content

Commit

Permalink
cdf prototype halfway
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Apr 6, 2023
1 parent bcac424 commit 13ccb48
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1797,4 +1797,19 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
}
verifyPreSignedUrl(actualFiles(0).url, 1030)
}

integrationTest("linzhou_cdf_prototype") {
val response = readNDJson(requestPath("/shares/share8/schemas/default/tables/cdf_table_cdf_enabled/changes?startingVersion=0&endingVersion=1"), Some("GET"), None, Some(0))
val lines = response.split("\n")
val protocol = lines(0)
val metadata = lines(1)
val expectedProtocol = Protocol(minReaderVersion = 1).wrap
assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol))
Console.println(s"----[linzhou]----metadata:${JsonUtils.fromJson[SingleAction](metadata)}")
val files = lines.drop(2)
// assert(files.size == 5)
files.foreach{ f=>
Console.println(s"----[linzhou]----cdf file:${f}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,33 @@ private[sharing] class DeltaSharingDataSource
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)

// scalastyle:off println
Console.println(s"----[linzhou]----, path: $path")
Console.println(s"----[linzhou]----createRelation path: $path")
if (path.startsWith("delta-sharing-log")) {
Console.println(s"----[linzhou]---- 1")
Console.println(s"----[linzhou]----createRelation is using DeltaLog")
val dL = DeltaLog.forTable(sqlContext.sparkSession, path, parameters)

// Testing cdf read
import org.apache.spark.sql.delta.util.FileNames._
val startVersion = 1
val hadoopConf = dL.newDeltaHadoopConf()
val deltas = dL.store.listFrom(
deltaFile(dL.logPath, startVersion), hadoopConf).filter(isDeltaFile)
Console.println(s"----[linzhou]---- deltas:$deltas")
deltas.foreach { status =>
val p = status.getPath
val version = deltaVersion(p)
Console.println(s"----[linzhou]---- p:$p, version:$version")
val readContent = dL.store.read(status, hadoopConf)
readContent.foreach{ r =>
Console.println(s"----[linzhou]---- r:$r")
}
// (version, store.read(status, hadoopConf).map(Action.fromJson))
}


return dL.createRelation()
}
Console.println(s"----[linzhou]---- 2")
Console.println(s"----[linzhou]---- createRelation, is using RemoteDeltaLog")
val deltaLog = RemoteDeltaLog(path)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
{"metaData":{"id":"8bf14108-032f-4292-a93c-6fe30e73a42b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677282362103}}
{"add":{"path":"delta-sharing:///share1.default.linzhou_test_table_two/f3c23ec1ae8aa5c9cd5b7641e801adfa/1030","partitionValues":{},"size":1030,"modificationTime":1677282366000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1677282366000000","MIN_INSERTION_TIME":"1677282366000000","MAX_INSERTION_TIME":"1677282366000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
""".stripMargin
val cdfJson1 = """{"commitInfo":{"timestamp":1651272634441,"userId":"7953272455820895","userName":"[email protected]","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3173513222201325"},"clusterId":"0819-204509-hill72","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"3900"},"engineInfo":"Databricks-Runtime/11.x-snapshot-scala2.12","txnId":"d66d1362-4920-4c0c-ae90-7392801dca42"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1651272615011}}
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/d7ed708546dd70fdff9191b3e3d6448b/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"1\",\"age\":1,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/60d0cf57f3e4367db154aa2c36152a1f/1030","partitionValues":{},"size":1030,"modificationTime":1651272635000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"2\",\"age\":2,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"delta-sharing:///share8.default.cdf_table_cdf_enabled/a6dc5694a4ebcc9a067b19c348526ad6/1030","partitionValues":{},"size":1030,"modificationTime":1651272634000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"maxValues\":{\"name\":\"3\",\"age\":3,\"birthday\":\"2020-01-01\"},\"nullCount\":{\"name\":0,\"age\":0,\"birthday\":0}}","tags":{"INSERTION_TIME":"1651272634000002","OPTIMIZE_TARGET_SIZE":"268435456"}}}""".stripMargin
// scalastyle:on
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
// scalastyle:off println
Expand All @@ -110,9 +116,18 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
"delta-sharing-log:/linzhou_test_table_two/_delta_log/00000000000000000006.json") {
Console.println(s"----[linzhou]----returning 6.json:${json1}")
return new FSDataInputStream(new SeekableByteArrayInputStream(json1.getBytes(), "6.json"))
} else if (f.toString ==
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json") {
Console.println(s"----[linzhou]----returning cdf 1.json:${cdfJson1.length}")
return new FSDataInputStream(new SeekableByteArrayInputStream(
cdfJson1.getBytes(), "cdf_1.json"))
} else if (f.toString ==
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.crc") {
Console.println(s"----[linzhou]----throwing exception for 1.crc")
throw new UnsupportedOperationException("00001.crc")
}
Console.println(s"----[linzhou]----returning emptry for :${f.toString}")
new FSDataInputStream(new SeekableByteArrayInputStream("".getBytes(), "content"))
new FSDataInputStream(new SeekableByteArrayInputStream("".getBytes(), f.toString))
}

override def create(
Expand Down Expand Up @@ -144,13 +159,13 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {

override def exists(f: Path): Boolean = {
Console.println(s"----[linzhou]----exists:${f}")
return f.toString == "delta-sharing-log:/linzhou_test_table_two/_delta_log"
return f.toString == "delta-sharing-log:/linzhou_test_table_two/_delta_log" ||
f.toString == "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log"
}

override def listStatus(f: Path): Array[FileStatus] = {
Console.println(s"----[linzhou]----listStatus:${f}")
if (f.toString == "delta-sharing-log:/linzhou_test_table_two/_delta_log") {

val a = Array(
new FileStatus(0, false, 0, 1, 0, new Path(
"delta-sharing-log:/linzhou_test_table_two/_delta_log/00000000000000000000.json")),
Expand All @@ -169,6 +184,15 @@ private[sharing] class DeltaSharingLogFileSystem extends FileSystem {
)
Console.println(s"----[linzhou]----listing:${a}")
return a
} else if (f.toString == "delta-sharing-log:/cdf_table_cdf_enabled/_delta_log") {
val a = Array(
new FileStatus(0, false, 0, 1, 0, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000000.json")),
new FileStatus(cdfJson1.length, false, 0, 1, 1651272635000L, new Path(
"delta-sharing-log:/cdf_table_cdf_enabled/_delta_log/00000000000000000001.json"))
)
Console.println(s"----[linzhou]----listing:${a}")
return a
}
throw new UnsupportedOperationException("listStatus")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ object DeltaSharingCDFReader {
params.profileProvider,
refresher
)
CachedTableManager.INSTANCE.register(
params.path.toString.split("#")(1),
getIdToUrl(addFiles, cdfFiles, removeFiles),
refs,
params.profileProvider,
refresher
)

dfs.reduce((df1, df2) => df1.unionAll(df2))
.select(requiredColumns.map(c => col(quoteIdentifier(c))): _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class RemoteSnapshot(
)
CachedTableManager.INSTANCE
.register(
"share1.default.linzhou_test_table_two",
fileIndex.params.path.toString.split("#")(1),
idToUrl,
Seq(new WeakReference(fileIndex)),
fileIndex.params.profileProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,53 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
val df = spark.read.format("delta").load("delta-sharing-log:///linzhou_test_table_two")
checkAnswer(df, expected)

checkAnswer(df, expected)

Console.println(s"----[linzhou]-------------[Test DeltaLog]------")
checkAnswer(spark.read.format("deltaSharing").load(
"delta-sharing-log:///linzhou_test_table_two"), expected)
}

integrationTest("linzhou_cdf_prototype") {
val tablePath = testProfileFile.getCanonicalPath + "#share8.default.cdf_table_cdf_enabled"

val expected = Seq(
Row("1", 1, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"),
Row("2", 2, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert"),
Row("3", 3, sqlDate("2020-01-01"), 1L, 1651272635000L, "insert")
// Row("2", 2, sqlDate("2020-01-01"), 3L, 1651272660000L, "update_preimage"),
// Row("2", 2, sqlDate("2020-02-02"), 3L, 1651272660000L, "update_postimage"),
// Row("3", 3, sqlDate("2020-01-01"), 2L, 1651272655000L, "delete")
)
val result = spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.option("endingVersion", 1).load(tablePath)
checkAnswer(result, expected)

// Console.println(s"----[linzhou]-------------[Test DeltaLog CDF]------")
// checkAnswer(spark.read.format("deltaSharing").load(
// "delta-sharing-log:///cdf_table_cdf_enabled"), expected)

Console.println(s"----[linzhou]-------------[Test CDF]------")

val df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 1)
.option("endingVersion", 1)
.load("delta-sharing-log:///cdf_table_cdf_enabled")
checkAnswer(df, expected)
// // should work when selecting some columns in a different order
// checkAnswer(
// result.select("_change_type", "birthday", "age"),
// Seq(
// Row("insert", sqlDate("2020-01-01"), 1),
// Row("insert", sqlDate("2020-01-01"), 2),
// Row("insert", sqlDate("2020-01-01"), 3),
// Row("update_preimage", sqlDate("2020-01-01"), 2),
// Row("update_postimage", sqlDate("2020-02-02"), 2),
// Row("delete", sqlDate("2020-01-01"), 3)
// )
// )
}
}

0 comments on commit 13ccb48

Please sign in to comment.