Skip to content

Commit a81000b

Browse files
committed
add test
1 parent daa72f5 commit a81000b

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.iceberg.PlanningMode.LOCAL;
2424
import static org.apache.iceberg.data.FileHelpers.encrypt;
2525
import static org.assertj.core.api.Assertions.assertThat;
26+
import static org.assertj.core.api.Assumptions.assumeThat;
2627

2728
import java.io.File;
2829
import java.io.IOException;
@@ -73,6 +74,7 @@
7374
import org.apache.spark.sql.sources.Not;
7475
import org.apache.spark.sql.sources.StringStartsWith;
7576
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
77+
import org.assertj.core.api.AbstractObjectAssert;
7678
import org.junit.jupiter.api.AfterAll;
7779
import org.junit.jupiter.api.BeforeAll;
7880
import org.junit.jupiter.api.BeforeEach;
@@ -267,6 +269,98 @@ public void testUnpartitionedTimestampFilter() {
267269
"ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
268270
}
269271

272+
@TestTemplate
273+
public void limitPushedDownToSparkScan() {
274+
assumeThat(fileFormat)
275+
.as("no need to run this across the entire test matrix")
276+
.isEqualTo(FileFormat.PARQUET);
277+
278+
CaseInsensitiveStringMap options =
279+
new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString()));
280+
281+
SparkScanBuilder builder =
282+
new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
283+
284+
long limit = 23;
285+
// simulate Spark pushing down the limit to the scan builder
286+
builder.pushLimit((int) limit);
287+
assertThat(builder).extracting("limit").isEqualTo((int) limit);
288+
289+
// verify batch scan
290+
AbstractObjectAssert<?, ?> scanAssert = assertThat(builder.build()).extracting("scan");
291+
if (LOCAL == planningMode) {
292+
scanAssert = scanAssert.extracting("scan");
293+
}
294+
295+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
296+
297+
// verify changelog scan
298+
assertThat(builder.buildChangelogScan())
299+
.extracting("scan")
300+
.extracting("context")
301+
.extracting("minRowsRequested")
302+
.isEqualTo(limit);
303+
304+
// verify CoW scan
305+
assertThat(builder.buildCopyOnWriteScan())
306+
.extracting("scan")
307+
.extracting("scan")
308+
.extracting("context")
309+
.extracting("minRowsRequested")
310+
.isEqualTo(limit);
311+
312+
// verify MoR scan
313+
scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
314+
if (LOCAL == planningMode) {
315+
scanAssert = scanAssert.extracting("scan");
316+
}
317+
318+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
319+
}
320+
321+
@TestTemplate
322+
public void limitPushedDownToSparkScanForMetadataTable() {
323+
assumeThat(fileFormat)
324+
.as("no need to run this across the entire test matrix")
325+
.isEqualTo(FileFormat.PARQUET);
326+
327+
CaseInsensitiveStringMap options =
328+
new CaseInsensitiveStringMap(ImmutableMap.of("path", unpartitioned.toString()));
329+
330+
// load the snapshots metadata table
331+
SparkScanBuilder builder =
332+
new SparkScanBuilder(spark, TABLES.load(options.get("path") + "#snapshots"), options);
333+
334+
long limit = 23;
335+
// simulate Spark pushing down the limit to the scan builder
336+
builder.pushLimit((int) limit);
337+
assertThat(builder).extracting("limit").isEqualTo((int) limit);
338+
339+
// verify batch scan
340+
assertThat(builder.build())
341+
.extracting("scan")
342+
.extracting("scan")
343+
.extracting("context")
344+
.extracting("minRowsRequested")
345+
.isEqualTo(limit);
346+
347+
// verify CoW scan
348+
assertThat(builder.buildCopyOnWriteScan())
349+
.extracting("scan")
350+
.extracting("scan")
351+
.extracting("context")
352+
.extracting("minRowsRequested")
353+
.isEqualTo(limit);
354+
355+
// verify MoR scan
356+
assertThat(builder.buildMergeOnReadScan())
357+
.extracting("scan")
358+
.extracting("scan")
359+
.extracting("context")
360+
.extracting("minRowsRequested")
361+
.isEqualTo(limit);
362+
}
363+
270364
@TestTemplate
271365
public void testBucketPartitionedIDFilters() {
272366
Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ public void selectWithLimit() {
155155
Object[] first = row(1L, "a", 1.0F);
156156
Object[] second = row(2L, "b", 2.0F);
157157
Object[] third = row(3L, "c", Float.NaN);
158+
159+
// verify that LIMIT is properly applied in case SupportsPushDownLimit.isPartiallyPushed() is
160+
// ever overridden in SparkScanBuilder
158161
assertThat(sql("SELECT * FROM %s LIMIT 1", tableName)).containsExactly(first);
159162
assertThat(sql("SELECT * FROM %s LIMIT 2", tableName)).containsExactly(first, second);
160163
assertThat(sql("SELECT * FROM %s LIMIT 3", tableName)).containsExactly(first, second, third);

0 commit comments

Comments
 (0)