Skip to content

Commit 7b5936b

Browse files
committed
[FLINK-37668][state] Make compaction filter related config take effect for RocksDBStateBackend
1 parent 14c848b commit 7b5936b

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBConfigurableOptions.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,9 @@ public class RocksDBConfigurableOptions implements Serializable {
380380
BLOOM_FILTER_BLOCK_BASED_MODE,
381381
RESTORE_OVERLAP_FRACTION_THRESHOLD,
382382
USE_INGEST_DB_RESTORE_MODE,
383-
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE
383+
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
384+
COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES,
385+
COMPACT_FILTER_PERIODIC_COMPACTION_TIME
384386
};
385387

386388
private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBResourceContainer.java

+5
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,11 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
402402
currentOptions.setMinWriteBufferNumberToMerge(
403403
internalGetOption(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
404404

405+
currentOptions.setPeriodicCompactionSeconds(
406+
internalGetOption(
407+
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME)
408+
.getSeconds());
409+
405410
TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
406411

407412
BlockBasedTableConfig blockBasedTableConfig;

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/state/rocksdb/RocksDBStateBackendConfigTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868

6969
import java.io.File;
7070
import java.nio.file.Files;
71+
import java.time.Duration;
7172
import java.util.ArrayList;
7273
import java.util.Arrays;
7374
import java.util.Collection;
@@ -624,6 +625,10 @@ public void testConfigurableOptionsFromConfig() throws Exception {
624625
verifyIllegalArgument(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE, "YES");
625626
verifyIllegalArgument(
626627
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD, "2");
628+
verifyIllegalArgument(
629+
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME, "-1s");
630+
verifyIllegalArgument(
631+
RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES, "1.1");
627632
}
628633

629634
// verify legal configuration
@@ -651,6 +656,8 @@ public void testConfigurableOptionsFromConfig() throws Exception {
651656
configuration.setString(RocksDBConfigurableOptions.USE_BLOOM_FILTER.key(), "TRUE");
652657
configuration.setString(
653658
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.key(), "0.5");
659+
configuration.setString(
660+
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1h");
654661

655662
try (RocksDBResourceContainer optionsContainer =
656663
new RocksDBResourceContainer(
@@ -677,6 +684,7 @@ public void testConfigurableOptionsFromConfig() throws Exception {
677684
CompressionType.SNAPPY_COMPRESSION,
678685
CompressionType.LZ4_COMPRESSION),
679686
columnOptions.compressionPerLevel());
687+
assertEquals(3600, columnOptions.periodicCompactionSeconds());
680688

681689
BlockBasedTableConfig tableConfig =
682690
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
@@ -948,6 +956,35 @@ public void testConfigureIncrementalRestoreInstanceBufferSize() {
948956
notDefault, rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale());
949957
}
950958

959+
@Test
960+
public void testConfigurePeriodicCompactionTime() throws Exception {
961+
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
962+
Configuration configuration = new Configuration();
963+
configuration.setString(
964+
RocksDBConfigurableOptions.COMPACT_FILTER_PERIODIC_COMPACTION_TIME.key(), "1d");
965+
rocksDBStateBackend =
966+
rocksDBStateBackend.configure(configuration, getClass().getClassLoader());
967+
try (RocksDBResourceContainer resourceContainer =
968+
rocksDBStateBackend.createOptionsAndResourceContainer(null)) {
969+
assertEquals(Duration.ofDays(1), resourceContainer.getPeriodicCompactionTime());
970+
}
971+
}
972+
973+
@Test
974+
public void testConfigureQueryTimeAfterNumEntries() throws Exception {
975+
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
976+
Configuration configuration = new Configuration();
977+
configuration.setString(
978+
RocksDBConfigurableOptions.COMPACT_FILTER_QUERY_TIME_AFTER_NUM_ENTRIES.key(),
979+
"100");
980+
rocksDBStateBackend =
981+
rocksDBStateBackend.configure(configuration, getClass().getClassLoader());
982+
try (RocksDBResourceContainer resourceContainer =
983+
rocksDBStateBackend.createOptionsAndResourceContainer(null)) {
984+
assertEquals(100L, resourceContainer.getQueryTimeAfterNumEntries().longValue());
985+
}
986+
}
987+
951988
private void verifySetParameter(Runnable setter) {
952989
try {
953990
setter.run();

0 commit comments

Comments
 (0)