Skip to content

Commit 8fff1c0

Browse files
committed
[FLINK-37668][state] Make compaction filter related config take effect for ForStStateBackend
1 parent 7b5936b commit 8fff1c0

File tree

3 files changed

+41
-1
lines changed

3 files changed

+41
-1
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStConfigurableOptions.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ public class ForStConfigurableOptions implements Serializable {
397397
BLOOM_FILTER_BLOCK_BASED_MODE,
398398
RESTORE_OVERLAP_FRACTION_THRESHOLD,
399399
USE_INGEST_DB_RESTORE_MODE,
400-
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
400+
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
401+
COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES,
402+
COMPACT_FILTER_PERIODIC_COMPACTION_TIME
401403
};
402404

403405
private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java

+4
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,10 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
569569
currentOptions.setMinWriteBufferNumberToMerge(
570570
internalGetOption(ForStConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
571571

572+
currentOptions.setPeriodicCompactionSeconds(
573+
internalGetOption(ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME)
574+
.getSeconds());
575+
572576
TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
573577

574578
BlockBasedTableConfig blockBasedTableConfig;

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.io.File;
6666
import java.io.IOException;
6767
import java.nio.file.Files;
68+
import java.time.Duration;
6869
import java.util.ArrayList;
6970
import java.util.Arrays;
7071
import java.util.Collection;
@@ -496,6 +497,10 @@ public void testConfigurableOptionsFromConfig() throws Exception {
496497
verifyIllegalArgument(ForStConfigurableOptions.COMPRESSION_PER_LEVEL, "SNAP");
497498
verifyIllegalArgument(ForStConfigurableOptions.USE_BLOOM_FILTER, "NO");
498499
verifyIllegalArgument(ForStConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE, "YES");
500+
verifyIllegalArgument(
501+
ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME, "-1s");
502+
verifyIllegalArgument(
503+
ForStConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES, "1.1");
499504
}
500505

501506
// verify legal configuration
@@ -520,6 +525,8 @@ public void testConfigurableOptionsFromConfig() throws Exception {
520525
configuration.setString(ForStConfigurableOptions.METADATA_BLOCK_SIZE.key(), "8 kb");
521526
configuration.setString(ForStConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");
522527
configuration.setString(ForStConfigurableOptions.USE_BLOOM_FILTER.key(), "TRUE");
528+
configuration.setString(
529+
ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1h");
523530

524531
try (ForStResourceContainer optionsContainer =
525532
new ForStResourceContainer(
@@ -546,6 +553,7 @@ public void testConfigurableOptionsFromConfig() throws Exception {
546553
CompressionType.SNAPPY_COMPRESSION,
547554
CompressionType.LZ4_COMPRESSION),
548555
columnOptions.compressionPerLevel());
556+
assertEquals(3600, columnOptions.periodicCompactionSeconds());
549557

550558
BlockBasedTableConfig tableConfig =
551559
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
@@ -779,6 +787,32 @@ public void testSupportSavepoint() {
779787
assertTrue(forStStateBackend.supportsSavepointFormat(SavepointFormatType.NATIVE));
780788
}
781789

790+
@Test
791+
public void testConfigurePeriodicCompactionTime() throws Exception {
792+
ForStStateBackend forStStateBackend = new ForStStateBackend();
793+
Configuration configuration = new Configuration();
794+
configuration.setString(
795+
ForStConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1d");
796+
forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader());
797+
try (ForStResourceContainer resourceContainer =
798+
forStStateBackend.createOptionsAndResourceContainer(null)) {
799+
assertEquals(Duration.ofDays(1), resourceContainer.getPeriodicCompactionTime());
800+
}
801+
}
802+
803+
@Test
804+
public void testConfigureQueryTimeAfterNumEntries() throws Exception {
805+
ForStStateBackend forStStateBackend = new ForStStateBackend();
806+
Configuration configuration = new Configuration();
807+
configuration.setString(
808+
ForStConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES.key(), "100");
809+
forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader());
810+
try (ForStResourceContainer resourceContainer =
811+
forStStateBackend.createOptionsAndResourceContainer(null)) {
812+
assertEquals(100L, resourceContainer.getQueryTimeAfterNumEntries().longValue());
813+
}
814+
}
815+
782816
private void verifySetParameter(Runnable setter) {
783817
try {
784818
setter.run();

0 commit comments

Comments
 (0)