Skip to content

Commit 5fcdbbe

Browse files
authored
[spark] Introduce paimon_incremental_between_timestamp and paimon_incremental_to_auto_tag tvf (apache#4855)
1 parent f2252d7 commit 5fcdbbe

File tree

8 files changed

+193
-22
lines changed

8 files changed

+193
-22
lines changed

docs/content/spark/sql-query.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,17 @@ You can also force specifying `'incremental-between-scan-mode'`.
7777

7878
Paimon supports that use Spark SQL to do the incremental query that implemented by Spark Table Valued Function.
7979

80-
you can use `paimon_incremental_query` in query to extract the incremental data:
81-
8280
```sql
8381
-- read the incremental data between snapshot id 12 and snapshot id 20.
8482
SELECT * FROM paimon_incremental_query('tableName', 12, 20);
83+
84+
-- read the incremental data between ts 1692169900000 and ts 1692169900000.
85+
SELECT * FROM paimon_incremental_between_timestamp('tableName', '1692169000000', '1692169900000');
86+
87+
-- read the incremental data to tag '2024-12-04'.
88+
-- Paimon will find an earlier tag and return changes between them.
89+
-- If the tag doesn't exist or the earlier tag doesn't exist, return empty.
90+
SELECT * FROM paimon_incremental_to_auto_tag('tableName', '2024-12-04');
8591
```
8692

8793
In Batch SQL, the `DELETE` records are not allowed to be returned, so records of `-D` will be dropped.

docs/layouts/shortcodes/generated/core_configuration.html

+4-4
Original file line numberDiff line numberDiff line change
@@ -396,25 +396,25 @@
396396
<td><h5>incremental-between</h5></td>
397397
<td style="word-wrap: break-word;">(none)</td>
398398
<td>String</td>
399-
<td>Read incremental changes between start snapshot (exclusive) and end snapshot, for example, '5,10' means changes between snapshot 5 and snapshot 10.</td>
399+
<td>Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive), for example, '5,10' means changes between snapshot 5 and snapshot 10.</td>
400400
</tr>
401401
<tr>
402402
<td><h5>incremental-between-scan-mode</h5></td>
403403
<td style="word-wrap: break-word;">auto</td>
404404
<td><p>Enum</p></td>
405-
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. <br /><br />Possible values:<ul><li>"auto": Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
405+
<td>Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive). <br /><br />Possible values:<ul><li>"auto": Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan newly changed files between snapshots.</li><li>"changelog": Scan changelog files between snapshots.</li></ul></td>
406406
</tr>
407407
<tr>
408408
<td><h5>incremental-between-timestamp</h5></td>
409409
<td style="word-wrap: break-word;">(none)</td>
410410
<td>String</td>
411-
<td>Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2.</td>
411+
<td>Read incremental changes between start timestamp (exclusive) and end timestamp (inclusive), for example, 't1,t2' means changes between timestamp t1 and timestamp t2.</td>
412412
</tr>
413413
<tr>
414414
<td><h5>incremental-to-auto-tag</h5></td>
415415
<td style="word-wrap: break-word;">(none)</td>
416416
<td>String</td>
417-
<td>Used to specify the auto-created tag to reading incremental changes.</td>
417+
<td>Used to specify the end tag (inclusive), and Paimon will find an earlier tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty. </td>
418418
</tr>
419419
<tr>
420420
<td><h5>local-merge-buffer-size</h5></td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -1089,30 +1089,31 @@ public class CoreOptions implements Serializable {
10891089
.stringType()
10901090
.noDefaultValue()
10911091
.withDescription(
1092-
"Read incremental changes between start snapshot (exclusive) and end snapshot, "
1092+
"Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive), "
10931093
+ "for example, '5,10' means changes between snapshot 5 and snapshot 10.");
10941094

10951095
public static final ConfigOption<IncrementalBetweenScanMode> INCREMENTAL_BETWEEN_SCAN_MODE =
10961096
key("incremental-between-scan-mode")
10971097
.enumType(IncrementalBetweenScanMode.class)
10981098
.defaultValue(IncrementalBetweenScanMode.AUTO)
10991099
.withDescription(
1100-
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. ");
1100+
"Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot (inclusive). ");
11011101

11021102
public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
11031103
key("incremental-between-timestamp")
11041104
.stringType()
11051105
.noDefaultValue()
11061106
.withDescription(
1107-
"Read incremental changes between start timestamp (exclusive) and end timestamp, "
1107+
"Read incremental changes between start timestamp (exclusive) and end timestamp (inclusive), "
11081108
+ "for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");
11091109

11101110
public static final ConfigOption<String> INCREMENTAL_TO_AUTO_TAG =
11111111
key("incremental-to-auto-tag")
11121112
.stringType()
11131113
.noDefaultValue()
11141114
.withDescription(
1115-
"Used to specify the auto-created tag to reading incremental changes.");
1115+
"Used to specify the end tag (inclusive), and Paimon will find an earlier tag and return changes between them. "
1116+
+ "If the tag doesn't exist or the earlier tag doesn't exist, return empty. ");
11161117

11171118
public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE =
11181119
key("end-input.check-partition-expire")

paimon-docs/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ The `@ConfigGroups` annotation can be used to generate multiple files from a sin
2828

2929
To integrate an `*Options` class from another package, add another module-package argument pair to `ConfigOptionsDocGenerator#LOCATIONS`.
3030

31-
The files can be generated by running `mvn clean install -DskipTests` and `mvn package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests`, and can be integrated into the documentation using `{{ include generated/<file-name> >}}`.
31+
The files can be generated by running `mvn package -Pgenerate-docs -pl paimon-docs -nsu -DskipTests -am`, and can be integrated into the documentation using `{{ include generated/<file-name> >}}`.
3232

3333
**NOTE:** You need to make sure that the changed jar has been installed in the local maven repository.
3434

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala

+55-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.spark.catalyst.plans.logical
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
2223

2324
import org.apache.spark.sql.SparkSession
2425
import org.apache.spark.sql.catalyst.FunctionIdentifier
@@ -35,15 +36,22 @@ import scala.collection.JavaConverters._
3536
object PaimonTableValuedFunctions {
3637

3738
val INCREMENTAL_QUERY = "paimon_incremental_query"
39+
val INCREMENTAL_BETWEEN_TIMESTAMP = "paimon_incremental_between_timestamp"
40+
val INCREMENTAL_TO_AUTO_TAG = "paimon_incremental_to_auto_tag"
3841

39-
val supportedFnNames: Seq[String] = Seq(INCREMENTAL_QUERY)
42+
val supportedFnNames: Seq[String] =
43+
Seq(INCREMENTAL_QUERY, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_TO_AUTO_TAG)
4044

4145
private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
4246

4347
def getTableValueFunctionInjection(fnName: String): TableFunctionDescription = {
4448
val (info, builder) = fnName match {
4549
case INCREMENTAL_QUERY =>
4650
FunctionRegistryBase.build[IncrementalQuery](fnName, since = None)
51+
case INCREMENTAL_BETWEEN_TIMESTAMP =>
52+
FunctionRegistryBase.build[IncrementalBetweenTimestamp](fnName, since = None)
53+
case INCREMENTAL_TO_AUTO_TAG =>
54+
FunctionRegistryBase.build[IncrementalToAutoTag](fnName, since = None)
4755
case _ =>
4856
throw new Exception(s"Function $fnName isn't a supported table valued function.")
4957
}
@@ -58,12 +66,23 @@ object PaimonTableValuedFunctions {
5866

5967
val sessionState = spark.sessionState
6068
val catalogManager = sessionState.catalogManager
61-
val sparkCatalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog]
62-
val tableId = sessionState.sqlParser.parseTableIdentifier(args.head.eval().toString)
63-
val namespace = tableId.database.map(Array(_)).getOrElse(catalogManager.currentNamespace)
64-
val ident = Identifier.of(namespace, tableId.table)
69+
70+
val identifier = args.head.eval().toString
71+
val (catalogName, dbName, tableName) = {
72+
sessionState.sqlParser.parseMultipartIdentifier(identifier) match {
73+
case Seq(table) =>
74+
(catalogManager.currentCatalog.name(), catalogManager.currentNamespace.head, table)
75+
case Seq(db, table) => (catalogManager.currentCatalog.name(), db, table)
76+
case Seq(catalog, db, table) => (catalog, db, table)
77+
case _ => throw new RuntimeException(s"Invalid table identifier: $identifier")
78+
}
79+
}
80+
81+
val sparkCatalog = catalogManager.catalog(catalogName).asInstanceOf[TableCatalog]
82+
val ident: Identifier = Identifier.of(Array(dbName), tableName)
6583
val sparkTable = sparkCatalog.loadTable(ident)
6684
val options = tvf.parseArgs(args.tail)
85+
6786
DataSourceV2Relation.create(
6887
sparkTable,
6988
Some(sparkCatalog),
@@ -87,20 +106,46 @@ abstract class PaimonTableValueFunction(val fnName: String) extends LeafNode {
87106
val args: Seq[Expression]
88107

89108
def parseArgs(args: Seq[Expression]): Map[String, String]
90-
91109
}
92110

93-
/** Plan for the "paimon_incremental_query" function */
111+
/** Plan for the [[INCREMENTAL_QUERY]] function */
94112
case class IncrementalQuery(override val args: Seq[Expression])
95-
extends PaimonTableValueFunction(PaimonTableValuedFunctions.INCREMENTAL_QUERY) {
113+
extends PaimonTableValueFunction(INCREMENTAL_QUERY) {
96114

97115
override def parseArgs(args: Seq[Expression]): Map[String, String] = {
98116
assert(
99-
args.size >= 1 && args.size <= 2,
100-
"paimon_incremental_query needs two parameters: startSnapshotId, and endSnapshotId.")
117+
args.size == 2,
118+
s"$INCREMENTAL_QUERY needs two parameters: startSnapshotId, and endSnapshotId.")
101119

102120
val start = args.head.eval().toString
103121
val end = args.last.eval().toString
104122
Map(CoreOptions.INCREMENTAL_BETWEEN.key -> s"$start,$end")
105123
}
106124
}
125+
126+
/** Plan for the [[INCREMENTAL_BETWEEN_TIMESTAMP]] function */
127+
case class IncrementalBetweenTimestamp(override val args: Seq[Expression])
128+
extends PaimonTableValueFunction(INCREMENTAL_BETWEEN_TIMESTAMP) {
129+
130+
override def parseArgs(args: Seq[Expression]): Map[String, String] = {
131+
assert(
132+
args.size == 2,
133+
s"$INCREMENTAL_BETWEEN_TIMESTAMP needs two parameters: startTimestamp, and endTimestamp.")
134+
135+
val start = args.head.eval().toString
136+
val end = args.last.eval().toString
137+
Map(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key -> s"$start,$end")
138+
}
139+
}
140+
141+
/** Plan for the [[INCREMENTAL_TO_AUTO_TAG]] function */
142+
case class IncrementalToAutoTag(override val args: Seq[Expression])
143+
extends PaimonTableValueFunction(INCREMENTAL_TO_AUTO_TAG) {
144+
145+
override def parseArgs(args: Seq[Expression]): Map[String, String] = {
146+
assert(args.size == 1, s"$INCREMENTAL_TO_AUTO_TAG needs one parameter: endTagName.")
147+
148+
val endTagName = args.head.eval().toString
149+
Map(CoreOptions.INCREMENTAL_TO_AUTO_TAG.key -> endTagName)
150+
}
151+
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.spark
2020

2121
import org.apache.paimon.hive.TestHiveMetastore
22+
import org.apache.paimon.table.FileStoreTable
2223

2324
import org.apache.hadoop.conf.Configuration
2425
import org.apache.spark.SparkConf
@@ -78,6 +79,10 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
7879
spark.sql(s"USE $sparkCatalogName")
7980
spark.sql(s"USE $hiveDbName")
8081
}
82+
83+
override def loadTable(tableName: String): FileStoreTable = {
84+
loadTable(hiveDbName, tableName)
85+
}
8186
}
8287

8388
object PaimonHiveTestBase {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.scalactic.source.Position
3838
import org.scalatest.Tag
3939

4040
import java.io.File
41-
import java.util.TimeZone
41+
import java.util.{TimeZone, UUID}
4242

4343
import scala.util.Random
4444

@@ -48,6 +48,8 @@ class PaimonSparkTestBase
4848
with WithTableOptions
4949
with SparkVersionSupport {
5050

51+
protected lazy val commitUser: String = UUID.randomUUID.toString
52+
5153
protected lazy val fileIO: FileIO = LocalFileIO.create
5254

5355
protected lazy val tempDBDir: File = Utils.createTempDir

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala

+112
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21+
import org.apache.paimon.data.{BinaryString, GenericRow, Timestamp}
22+
import org.apache.paimon.manifest.ManifestCommittable
2123
import org.apache.paimon.spark.PaimonHiveTestBase
2224

2325
import org.apache.spark.sql.{DataFrame, Row}
2426

27+
import java.time.LocalDateTime
28+
import java.util.Collections
29+
2530
class TableValuedFunctionsTest extends PaimonHiveTestBase {
2631

2732
withPk.foreach {
@@ -91,10 +96,117 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
9196
}
9297
}
9398

99+
test("Table Valued Functions: paimon_incremental_between_timestamp") {
100+
Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
101+
catalogName =>
102+
sql(s"USE $catalogName")
103+
val dbName = "test_tvf_db"
104+
withDatabase(dbName) {
105+
sql(s"CREATE DATABASE $dbName")
106+
withTable("t") {
107+
sql(s"USE $dbName")
108+
sql("CREATE TABLE t (id INT) USING paimon")
109+
110+
sql("INSERT INTO t VALUES 1")
111+
Thread.sleep(100)
112+
val t1 = System.currentTimeMillis()
113+
sql("INSERT INTO t VALUES 2")
114+
Thread.sleep(100)
115+
val t2 = System.currentTimeMillis()
116+
sql("INSERT INTO t VALUES 3")
117+
sql("INSERT INTO t VALUES 4")
118+
Thread.sleep(100)
119+
val t3 = System.currentTimeMillis()
120+
sql("INSERT INTO t VALUES 5")
121+
122+
checkAnswer(
123+
sql(
124+
s"SELECT * FROM paimon_incremental_between_timestamp('t', '$t1', '$t2') ORDER BY id"),
125+
Seq(Row(2)))
126+
checkAnswer(
127+
sql(
128+
s"SELECT * FROM paimon_incremental_between_timestamp('$dbName.t', '$t2', '$t3') ORDER BY id"),
129+
Seq(Row(3), Row(4)))
130+
checkAnswer(
131+
sql(
132+
s"SELECT * FROM paimon_incremental_between_timestamp('$catalogName.$dbName.t', '$t1', '$t3') ORDER BY id"),
133+
Seq(Row(2), Row(3), Row(4)))
134+
}
135+
}
136+
}
137+
}
138+
139+
test("Table Valued Functions: paimon_incremental_to_auto_tag") {
140+
withTable("t") {
141+
sql("""
142+
|CREATE TABLE t (a INT, b STRING) USING paimon
143+
|TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1', 'tag.automatic-creation'='watermark', 'tag.creation-period'='daily')
144+
|""".stripMargin)
145+
146+
val table = loadTable("t")
147+
val write = table.newWrite(commitUser)
148+
val commit = table.newCommit(commitUser).ignoreEmptyCommit(false)
149+
150+
write.write(GenericRow.of(1, BinaryString.fromString("a")))
151+
var commitMessages = write.prepareCommit(false, 0)
152+
commit.commit(
153+
new ManifestCommittable(
154+
0,
155+
utcMills("2024-12-02T10:00:00"),
156+
Collections.emptyMap[Integer, java.lang.Long],
157+
commitMessages))
158+
159+
write.write(GenericRow.of(2, BinaryString.fromString("b")))
160+
commitMessages = write.prepareCommit(false, 1)
161+
commit.commit(
162+
new ManifestCommittable(
163+
1,
164+
utcMills("2024-12-03T10:00:00"),
165+
Collections.emptyMap[Integer, java.lang.Long],
166+
commitMessages))
167+
168+
write.write(GenericRow.of(3, BinaryString.fromString("c")))
169+
commitMessages = write.prepareCommit(false, 2)
170+
commit.commit(
171+
new ManifestCommittable(
172+
2,
173+
utcMills("2024-12-05T10:00:00"),
174+
Collections.emptyMap[Integer, java.lang.Long],
175+
commitMessages))
176+
177+
checkAnswer(
178+
sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-01') ORDER BY a"),
179+
Seq())
180+
checkAnswer(
181+
sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-02') ORDER BY a"),
182+
Seq(Row(2, "b")))
183+
checkAnswer(
184+
sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-03') ORDER BY a"),
185+
Seq())
186+
checkAnswer(
187+
sql(s"SELECT * FROM paimon_incremental_to_auto_tag('t', '2024-12-04') ORDER BY a"),
188+
Seq(Row(3, "c")))
189+
}
190+
}
191+
94192
private def incrementalDF(tableIdent: String, start: Int, end: Int): DataFrame = {
95193
spark.read
96194
.format("paimon")
97195
.option("incremental-between", s"$start,$end")
98196
.table(tableIdent)
99197
}
198+
199+
private def utcMills(timestamp: String) =
200+
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond
201+
202+
object GenericRow {
203+
def of(values: Any*): GenericRow = {
204+
val row = new GenericRow(values.length)
205+
values.zipWithIndex.foreach {
206+
case (value, index) =>
207+
row.setField(index, value)
208+
}
209+
row
210+
}
211+
}
100212
}

0 commit comments

Comments
 (0)