diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/wal/AerospikeBasicWalCompleter.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/wal/AerospikeBasicWalCompleter.java index 1d5a90d..8341223 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/wal/AerospikeBasicWalCompleter.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/basic/wal/AerospikeBasicWalCompleter.java @@ -17,7 +17,7 @@ public class AerospikeBasicWalCompleter { public static WriteAheadLogCompleter, AerospikeLock, Value> basicCompleter( BatchOperations, AerospikeLock, Value> batchOperations, - Duration staleBatchesThreshold){ + Duration staleBatchesThreshold, int batchSize){ WriteAheadLogManager, Value> writeAheadLogManager = batchOperations.getWriteAheadLogManager(); AerospikeWriteAheadLogManager aerospikeWriteAheadLogManager = (AerospikeWriteAheadLogManager)writeAheadLogManager; @@ -25,6 +25,7 @@ public static WriteAheadLogCompleter, AerospikeLock, return new WriteAheadLogCompleter<>( batchOperations, staleBatchesThreshold, + batchSize, new AerospikeExclusiveLocker( aerospikeWriteAheadLogManager.getClient(), aerospikeWriteAheadLogManager.getWalNamespace(), diff --git a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManager.java b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManager.java index cb3939b..d0394f5 100644 --- a/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManager.java +++ b/aerospike-batch-updater/src/main/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManager.java @@ -16,6 +16,7 @@ import nosql.batch.update.BatchUpdate; import nosql.batch.update.aerospike.lock.AerospikeBatchLocks; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import nosql.batch.update.wal.WriteAheadLogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +28,6 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutorService; public class AerospikeWriteAheadLogManager, UPDATES, EV> implements WriteAheadLogManager { @@ -101,10 +101,22 @@ public boolean deleteBatch(Value batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { + public List getTimeRanges(Duration staleThreshold, int batchSize) { Statement statement = staleBatchesStatement(staleThreshold, walNamespace, walSetName, clock); RecordSet recordSet = client.query(null, statement); + List timestamps = new ArrayList<>(); + recordSet.iterator().forEachRemaining(keyRecord -> timestamps.add(keyRecord.record.getLong(TIMESTAMP_BIN_NAME))); + Collections.sort(timestamps); + + return getTimeRangesForTimestamps(timestamps, batchSize); + } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + Statement statement = staleBatchesStatement(walNamespace, walSetName, timeRange.getFromTimestamp(), timeRange.getToTimestamp()); + RecordSet recordSet = client.query(null, statement); + List> staleTransactions = new ArrayList<>(); recordSet.iterator().forEachRemaining(keyRecord -> { Record record = keyRecord.record; @@ -127,6 +139,37 @@ public static Statement staleBatchesStatement(Duration staleThreshold, String wa return statement; } + public static Statement staleBatchesStatement(String walNamespace, String walSetName, long begin, long end) { + Statement statement = new Statement(); + statement.setNamespace(walNamespace); + statement.setSetName(walSetName); + statement.setFilter(Filter.range(TIMESTAMP_BIN_NAME, begin, end)); + return statement; + } + + public static List getTimeRangesForTimestamps(List timestamps, int batchSize) { + List walTimeRanges = new ArrayList<>(); + + int fromIdx = 0; + int size = timestamps.size(); + int toIdx = Math.min(batchSize, size) - 1; + + while (fromIdx < size) { + long fromTimestamp = timestamps.get(fromIdx); + long toTimestamp = timestamps.get(toIdx); + walTimeRanges.add(new WalTimeRange(fromTimestamp, toTimestamp)); + + fromIdx = toIdx; + while (fromIdx < size && timestamps.get(fromIdx) == toTimestamp) { + fromIdx++; + } + + toIdx = Math.min(fromIdx + batchSize, size) - 1; + } + + return walTimeRanges; + } + static byte[] getBytesFromUUID(UUID uuid) { ByteBuffer bb = ByteBuffer.wrap(new byte[16]); bb.putLong(uuid.getMostSignificantBits()); diff --git a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicBatchRetentionTest.java b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicBatchRetentionTest.java index 3c0b459..49b64e6 100644 --- a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicBatchRetentionTest.java +++ b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicBatchRetentionTest.java @@ -46,10 +46,11 @@ public class BasicBatchRetentionTest extends BatchRetentionTest { = new BatchUpdater<>(operations); public static final Duration STALE_BATCHES_THRESHOLD = Duration.ofSeconds(1); + public static final int BATCH_SIZE = 100; static WriteAheadLogCompleter, AerospikeLock, Value> walCompleter = new WriteAheadLogCompleter<>( - operations, STALE_BATCHES_THRESHOLD, + operations, STALE_BATCHES_THRESHOLD, BATCH_SIZE, new BasicRecoveryTest.DummyExclusiveLocker(), Executors.newScheduledThreadPool(1)); @@ -72,7 +73,7 @@ protected void checkForConsistency() { assertThat(getValue(key1, client)).isEqualTo(getValue(key2, client)); await().timeout(ONE_SECOND).untilAsserted(() -> - assertThat(operations.getWriteAheadLogManager().getStaleBatches(STALE_BATCHES_THRESHOLD)).isEmpty()); + assertThat(operations.getWriteAheadLogManager().getTimeRanges(STALE_BATCHES_THRESHOLD, BATCH_SIZE)).isEmpty()); } @Override diff --git a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicRecoveryTest.java b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicRecoveryTest.java index 9241aee..86ee184 100644 --- a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicRecoveryTest.java +++ b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/basic/BasicRecoveryTest.java @@ -48,10 +48,11 @@ public class BasicRecoveryTest extends RecoveryTest { = new BatchUpdater<>(operations); public static final Duration STALE_BATCHES_THRESHOLD = Duration.ofSeconds(1); + public static final int BATCH_SIZE = 100; static WriteAheadLogCompleter, AerospikeLock, Value> walCompleter = new WriteAheadLogCompleter<>( - operations, STALE_BATCHES_THRESHOLD, + operations, STALE_BATCHES_THRESHOLD, BATCH_SIZE, new DummyExclusiveLocker(), Executors.newScheduledThreadPool(1)); @@ -78,7 +79,7 @@ protected void checkForConsistency() { assertThat(getValue(key1, client)).isEqualTo(getValue(key2, client)); await().timeout(ONE_SECOND).untilAsserted(() -> - assertThat(operations.getWriteAheadLogManager().getStaleBatches(STALE_BATCHES_THRESHOLD)).isEmpty()); + assertThat(operations.getWriteAheadLogManager().getTimeRanges(STALE_BATCHES_THRESHOLD, BATCH_SIZE)).isEmpty()); } @Override diff --git a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManagerTest.java b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManagerTest.java index 3f148e2..0deb7cf 100644 --- a/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManagerTest.java +++ b/aerospike-batch-updater/src/test/java/nosql/batch/update/aerospike/wal/AerospikeWriteAheadLogManagerTest.java @@ -11,6 +11,7 @@ import org.testcontainers.containers.GenericContainer; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -32,6 +33,7 @@ public class AerospikeWriteAheadLogManagerTest extends WriteAheadLogManagerTest< clock.setTime(1000); } static final Duration staleThreshold = Duration.ofMillis(100); + static final int batchSize = 100; static String walSetName = String.valueOf(AerospikeWriteAheadLogManagerTest.class.hashCode()); @@ -80,9 +82,11 @@ protected void switchClockAhead() { @Override protected List getStaleBatches() { - return writeAheadLogManager.getStaleBatches(staleThreshold).stream() - .map(record -> record.batchId) - .collect(Collectors.toList()); + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize).stream() + .map(writeAheadLogManager::getStaleBatchesForRange) + .flatMap(Collection::stream) + .map(record -> record.batchId) + .collect(Collectors.toList()); } } diff --git a/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/basic/wal/AerospikeBasicWalCompleter.java b/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/basic/wal/AerospikeBasicWalCompleter.java index d464949..9f9f17b 100644 --- a/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/basic/wal/AerospikeBasicWalCompleter.java +++ b/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/basic/wal/AerospikeBasicWalCompleter.java @@ -17,7 +17,7 @@ public class AerospikeBasicWalCompleter { public static ReactorWriteAheadLogCompleter, AerospikeLock, Value> basicCompleter( ReactorBatchOperations, AerospikeLock, Value> batchOperations, - Duration staleBatchesThreshold){ + Duration staleBatchesThreshold, int batchSize){ ReactorWriteAheadLogManager, Value> writeAheadLogManager = batchOperations.getWriteAheadLogManager(); AerospikeReactorWriteAheadLogManager aerospikeReactorWriteAheadLogManager = (AerospikeReactorWriteAheadLogManager)writeAheadLogManager; @@ -25,6 +25,7 @@ public static ReactorWriteAheadLogCompleter, Aerospi return new ReactorWriteAheadLogCompleter<>( batchOperations, staleBatchesThreshold, + batchSize, new AerospikeExclusiveLocker( aerospikeReactorWriteAheadLogManager.getClient(), aerospikeReactorWriteAheadLogManager.getWalNamespace(), diff --git a/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManager.java b/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManager.java index 4b40f0e..8f551d6 100644 --- a/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManager.java +++ b/aerospike-reactor-batch-updater/src/main/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManager.java @@ -9,7 +9,6 @@ import com.aerospike.client.Value; import com.aerospike.client.policy.RecordExistsAction; import com.aerospike.client.policy.WritePolicy; -import com.aerospike.client.query.Filter; import com.aerospike.client.query.IndexType; import com.aerospike.client.query.RecordSet; import com.aerospike.client.query.Statement; @@ -19,6 +18,7 @@ import nosql.batch.update.aerospike.wal.AerospikeBatchUpdateSerde; import nosql.batch.update.reactor.wal.ReactorWriteAheadLogManager; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -32,6 +32,7 @@ import java.util.UUID; import static nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager.generateBatchId; +import static nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager.getTimeRangesForTimestamps; import static nosql.batch.update.aerospike.wal.AerospikeWriteAheadLogManager.staleBatchesStatement; public class AerospikeReactorWriteAheadLogManager, UPDATES, EV> @@ -107,10 +108,22 @@ public Mono deleteBatch(Value batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { + public List getTimeRanges(Duration staleThreshold, int batchSize) { Statement statement = staleBatchesStatement(staleThreshold, walNamespace, walSetName, clock); RecordSet recordSet = client.query(null, statement); + List timestamps = new ArrayList<>(); + recordSet.iterator().forEachRemaining(keyRecord -> timestamps.add(keyRecord.record.getLong(TIMESTAMP_BIN_NAME))); + Collections.sort(timestamps); + + return getTimeRangesForTimestamps(timestamps, batchSize); + } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + Statement statement = staleBatchesStatement(walNamespace, walSetName, timeRange.getFromTimestamp(), timeRange.getToTimestamp()); + RecordSet recordSet = client.query(null, statement); + List> staleTransactions = new ArrayList<>(); recordSet.iterator().forEachRemaining(keyRecord -> { Record record = keyRecord.record; diff --git a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicBatchRetentionTest.java b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicBatchRetentionTest.java index 38327d7..6a5e993 100644 --- a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicBatchRetentionTest.java +++ b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicBatchRetentionTest.java @@ -51,10 +51,11 @@ public class BasicBatchRetentionTest extends BatchRetentionTest { = new ReactorBatchUpdater<>(operations); public static final Duration STALE_BATCHES_THRESHOLD = Duration.ofSeconds(1); + public static final int BATCH_SIZE = 100; static ReactorWriteAheadLogCompleter, AerospikeLock, Value> walCompleter = new ReactorWriteAheadLogCompleter<>( - operations, STALE_BATCHES_THRESHOLD, + operations, STALE_BATCHES_THRESHOLD, BATCH_SIZE, new BasicRecoveryTest.DummyExclusiveLocker(), Executors.newScheduledThreadPool(1)); @@ -77,7 +78,7 @@ protected void checkForConsistency() { assertThat(getValue(key1, client)).isEqualTo(getValue(key2, client)); await().timeout(ONE_SECOND).untilAsserted(() -> - assertThat(operations.getWriteAheadLogManager().getStaleBatches(STALE_BATCHES_THRESHOLD)).isEmpty()); + assertThat(operations.getWriteAheadLogManager().getTimeRanges(STALE_BATCHES_THRESHOLD, BATCH_SIZE)).isEmpty()); } @Override diff --git a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicRecoveryTest.java b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicRecoveryTest.java index 7bb8430..c62b968 100644 --- a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicRecoveryTest.java +++ b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/basic/BasicRecoveryTest.java @@ -51,10 +51,11 @@ public class BasicRecoveryTest extends RecoveryTest { = new ReactorBatchUpdater<>(operations); public static final Duration STALE_BATCHES_THRESHOLD = Duration.ofSeconds(1); + public static final int BATCH_SIZE = 100; static ReactorWriteAheadLogCompleter, AerospikeLock, Value> walCompleter = new ReactorWriteAheadLogCompleter<>( - operations, STALE_BATCHES_THRESHOLD, + operations, STALE_BATCHES_THRESHOLD, BATCH_SIZE, new DummyExclusiveLocker(), Executors.newScheduledThreadPool(1)); @@ -81,7 +82,7 @@ protected void checkForConsistency() { assertThat(getValue(key1, client)).isEqualTo(getValue(key2, client)); await().timeout(ONE_SECOND).untilAsserted(() -> - assertThat(operations.getWriteAheadLogManager().getStaleBatches(STALE_BATCHES_THRESHOLD)).isEmpty()); + assertThat(operations.getWriteAheadLogManager().getTimeRanges(STALE_BATCHES_THRESHOLD, BATCH_SIZE)).isEmpty()); } @Override diff --git a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManagerTest.java b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManagerTest.java index 191aea4..bfc4b5b 100644 --- a/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManagerTest.java +++ b/aerospike-reactor-batch-updater/src/test/java/nosql/batch/update/reactor/aerospike/wal/AerospikeReactorWriteAheadLogManagerTest.java @@ -14,6 +14,7 @@ import org.testcontainers.containers.GenericContainer; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -36,6 +37,7 @@ public class AerospikeReactorWriteAheadLogManagerTest extends WriteAheadLogManag clock.setTime(1000); } static final Duration staleThreshold = Duration.ofMillis(100); + static final int batchSize = 100; static String walSetName = String.valueOf(AerospikeReactorWriteAheadLogManagerTest.class.hashCode()); @@ -84,7 +86,9 @@ protected void switchClockAhead() { @Override protected List getStaleBatches() { - return writeAheadLogManager.getStaleBatches(staleThreshold).stream() + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize).stream() + .map(writeAheadLogManager::getStaleBatchesForRange) + .flatMap(Collection::stream) .map(record -> record.batchId) .collect(Collectors.toList()); } diff --git a/batch-updater/src/main/java/nosql/batch/update/wal/AbstractWriteAheadLogCompleter.java b/batch-updater/src/main/java/nosql/batch/update/wal/AbstractWriteAheadLogCompleter.java index 59f32dc..df97781 100644 --- a/batch-updater/src/main/java/nosql/batch/update/wal/AbstractWriteAheadLogCompleter.java +++ b/batch-updater/src/main/java/nosql/batch/update/wal/AbstractWriteAheadLogCompleter.java @@ -20,6 +20,7 @@ abstract public class AbstractWriteAheadLogCompleter { private static final Logger logger = LoggerFactory.getLogger(AbstractWriteAheadLogCompleter.class); private final Duration staleBatchesThreshold; + private final int batchSize; private final ExclusiveLocker exclusiveLocker; private final ScheduledExecutorService scheduledExecutorService; @@ -29,12 +30,16 @@ abstract public class AbstractWriteAheadLogCompleter { /** * @param staleBatchesThreshold + * @param batchSize * @param exclusiveLocker * @param scheduledExecutorService */ public AbstractWriteAheadLogCompleter(Duration staleBatchesThreshold, - ExclusiveLocker exclusiveLocker, ScheduledExecutorService scheduledExecutorService){ + int batchSize, + ExclusiveLocker exclusiveLocker, + ScheduledExecutorService scheduledExecutorService){ this.staleBatchesThreshold = staleBatchesThreshold; + this.batchSize = batchSize; this.exclusiveLocker = exclusiveLocker; this.scheduledExecutorService = scheduledExecutorService; } @@ -86,41 +91,46 @@ public CompletionStatistic completeHangedTransactions() { int ignoredBatchesCount = 0; int errorBatchesCount = 0; try { - if(exclusiveLocker.acquire()){ - List> staleBatches = getStaleBatches(staleBatchesThreshold); - staleBatchesCount += staleBatches.size(); - logger.info("Got {} stale transactions", staleBatches.size()); - for(WalRecord batch : staleBatches){ - if(suspended.get()){ - logger.info("WAL completion was suspended"); - break; - } - if(Thread.currentThread().isInterrupted()){ - logger.info("WAL completion was interrupted"); - break; - } - - if(exclusiveLocker.acquire()) { - logger.info("Trying to complete batch batchId=[{}], timestamp=[{}]", - batch.batchId, batch.timestamp); - try { - processAndDeleteTransactions(batch); - completeBatchesCount++; - logger.info("Successfully complete batch batchId=[{}]", batch.batchId); + if(exclusiveLocker.acquire()) { + List timeRanges = getTimeRanges(staleBatchesThreshold, batchSize); + logger.info("Got {} chunks of stale transactions. Max chunk size {}", timeRanges.size(), batchSize); + + for (WalTimeRange timeRange : timeRanges) { + List> staleBatches = getStaleBatchesForRange(timeRange); + staleBatchesCount += staleBatches.size(); + logger.info("Processing {} stale transactions", staleBatches.size()); + for(WalRecord batch : staleBatches){ + if(suspended.get()){ + logger.info("WAL completion was suspended"); + break; } - //this is expected behaviour that may have place in case of hanged transaction was not completed: - //not able to acquire all locks (didn't match expected value - // (may have place if initial transaction was interrupted on release stage and released values were modified)) - catch (LockingException be) { - logger.info("Failed to complete batch batchId=[{}] as it's already completed", batch.batchId, be); - releaseLocksAndDeleteWalTransactionOnError(batch); - ignoredBatchesCount ++; - logger.info("released locks for batch batchId=[{}]", batch.batchId, be); + if(Thread.currentThread().isInterrupted()){ + logger.info("WAL completion was interrupted"); + break; } - //even in case of error need to move to the next one - catch (Exception e) { - errorBatchesCount ++; - logger.error("!!! Failed to complete batch batchId=[{}], need to be investigated", batch.batchId, e); + + if(exclusiveLocker.acquire()) { + logger.info("Trying to complete batch batchId=[{}], timestamp=[{}]", + batch.batchId, batch.timestamp); + try { + processAndDeleteTransactions(batch); + completeBatchesCount++; + logger.info("Successfully complete batch batchId=[{}]", batch.batchId); + } + //this is expected behaviour that may have place in case of hanged transaction was not completed: + //not able to acquire all locks (didn't match expected value + // (may have place if initial transaction was interrupted on release stage and released values were modified)) + catch (LockingException be) { + logger.info("Failed to complete batch batchId=[{}] as it's already completed", batch.batchId, be); + releaseLocksAndDeleteWalTransactionOnError(batch); + ignoredBatchesCount ++; + logger.info("released locks for batch batchId=[{}]", batch.batchId, be); + } + //even in case of error need to move to the next one + catch (Exception e) { + errorBatchesCount ++; + logger.error("!!! Failed to complete batch batchId=[{}], need to be investigated", batch.batchId, e); + } } } } @@ -137,6 +147,8 @@ public CompletionStatistic completeHangedTransactions() { abstract protected void processAndDeleteTransactions(WalRecord batch); - abstract protected List> getStaleBatches(Duration staleBatchesThreshold); + abstract protected List getTimeRanges(Duration staleBatchesThreshold, int batchSize); + + abstract protected List> getStaleBatchesForRange(WalTimeRange timeRange); } diff --git a/batch-updater/src/main/java/nosql/batch/update/wal/WalTimeRange.java b/batch-updater/src/main/java/nosql/batch/update/wal/WalTimeRange.java new file mode 100644 index 0000000..4116684 --- /dev/null +++ b/batch-updater/src/main/java/nosql/batch/update/wal/WalTimeRange.java @@ -0,0 +1,19 @@ +package nosql.batch.update.wal; + +public final class WalTimeRange { + public final long fromTimestamp; + public final long toTimestamp; + + public WalTimeRange(long fromTimestamp, long toTimestamp) { + this.fromTimestamp = fromTimestamp; + this.toTimestamp = toTimestamp; + } + + public long getFromTimestamp() { + return fromTimestamp; + } + + public long getToTimestamp() { + return toTimestamp; + } +} diff --git a/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogCompleter.java b/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogCompleter.java index 8703196..a4f8421 100644 --- a/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogCompleter.java +++ b/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogCompleter.java @@ -24,8 +24,9 @@ public class WriteAheadLogCompleter */ public WriteAheadLogCompleter(BatchOperations batchOperations, Duration staleBatchesThreshold, + int batchSize, ExclusiveLocker exclusiveLocker, ScheduledExecutorService scheduledExecutorService){ - super(staleBatchesThreshold, exclusiveLocker, scheduledExecutorService); + super(staleBatchesThreshold, batchSize, exclusiveLocker, scheduledExecutorService); this.writeAheadLogManager = batchOperations.getWriteAheadLogManager(); this.batchOperations = batchOperations; } @@ -43,8 +44,13 @@ protected void processAndDeleteTransactions(WalRecord } @Override - protected List> getStaleBatches(Duration staleBatchesThreshold) { - return writeAheadLogManager.getStaleBatches(staleBatchesThreshold); + protected List getTimeRanges(Duration staleBatchesThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleBatchesThreshold, batchSize); + } + + @Override + protected List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); } } diff --git a/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogManager.java b/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogManager.java index 2d5b728..82342f5 100644 --- a/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogManager.java +++ b/batch-updater/src/main/java/nosql/batch/update/wal/WriteAheadLogManager.java @@ -12,6 +12,8 @@ public interface WriteAheadLogManager { boolean deleteBatch(BATCH_ID batchId); - List> getStaleBatches(Duration staleThreshold); + List getTimeRanges(Duration staleThreshold, int batchSize); + + List> getStaleBatchesForRange(WalTimeRange timeRange); } diff --git a/batch-updater/src/test/java/nosql/batch/update/wal/FailingWriteAheadLogManager.java b/batch-updater/src/test/java/nosql/batch/update/wal/FailingWriteAheadLogManager.java index f45d2fd..ec9e484 100644 --- a/batch-updater/src/test/java/nosql/batch/update/wal/FailingWriteAheadLogManager.java +++ b/batch-updater/src/test/java/nosql/batch/update/wal/FailingWriteAheadLogManager.java @@ -46,7 +46,12 @@ public boolean deleteBatch(BATCH_ID batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { - return writeAheadLogManager.getStaleBatches(staleThreshold); + public List getTimeRanges(Duration staleThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize); + } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); } } diff --git a/batch-updater/src/test/java/nosql/batch/update/wal/HangingWriteAheadLogManager.java b/batch-updater/src/test/java/nosql/batch/update/wal/HangingWriteAheadLogManager.java index e969c7e..4fd86df 100644 --- a/batch-updater/src/test/java/nosql/batch/update/wal/HangingWriteAheadLogManager.java +++ b/batch-updater/src/test/java/nosql/batch/update/wal/HangingWriteAheadLogManager.java @@ -39,7 +39,12 @@ public boolean deleteBatch(BATCH_ID batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { - return writeAheadLogManager.getStaleBatches(staleThreshold); + public List getTimeRanges(Duration staleThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize); + } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); } } diff --git a/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogCompleter.java b/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogCompleter.java index ebd57a0..686df93 100644 --- a/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogCompleter.java +++ b/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogCompleter.java @@ -5,6 +5,7 @@ import nosql.batch.update.wal.AbstractWriteAheadLogCompleter; import nosql.batch.update.wal.ExclusiveLocker; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import java.time.Duration; import java.util.List; @@ -27,8 +28,9 @@ public class ReactorWriteAheadLogCompleter batchOperations, Duration staleBatchesThreshold, + int batchSize, ExclusiveLocker exclusiveLocker, ScheduledExecutorService scheduledExecutorService){ - super(staleBatchesThreshold, exclusiveLocker, scheduledExecutorService); + super(staleBatchesThreshold, batchSize, exclusiveLocker, scheduledExecutorService); this.writeAheadLogManager = batchOperations.getWriteAheadLogManager(); this.batchOperations = batchOperations; } @@ -46,8 +48,13 @@ protected void processAndDeleteTransactions(WalRecord } @Override - protected List> getStaleBatches(Duration staleBatchesThreshold) { - return writeAheadLogManager.getStaleBatches(staleBatchesThreshold); + protected List getTimeRanges(Duration staleBatchesThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleBatchesThreshold, batchSize); + } + + @Override + protected List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); } } diff --git a/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogManager.java b/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogManager.java index 3d429ab..610b8b2 100644 --- a/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogManager.java +++ b/reactor-batch-updater/src/main/java/nosql/batch/update/reactor/wal/ReactorWriteAheadLogManager.java @@ -3,6 +3,7 @@ import nosql.batch.update.BatchUpdate; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import reactor.core.publisher.Mono; import java.time.Duration; @@ -14,6 +15,8 @@ public interface ReactorWriteAheadLogManager { Mono deleteBatch(BATCH_ID batchId); - List> getStaleBatches(Duration staleThreshold); + List getTimeRanges(Duration staleThreshold, int batchSize); + + List> getStaleBatchesForRange(WalTimeRange timeRange); } diff --git a/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorFailingWriteAheadLogManager.java b/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorFailingWriteAheadLogManager.java index 45b9751..6891b6b 100644 --- a/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorFailingWriteAheadLogManager.java +++ b/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorFailingWriteAheadLogManager.java @@ -2,6 +2,7 @@ import nosql.batch.update.BatchUpdate; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -49,7 +50,13 @@ public Mono deleteBatch(BATCH_ID batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { - return writeAheadLogManager.getStaleBatches(staleThreshold); + public List getTimeRanges(Duration staleThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize); } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); + } + } diff --git a/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorHangingWriteAheadLogManager.java b/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorHangingWriteAheadLogManager.java index 16431a8..e83f789 100644 --- a/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorHangingWriteAheadLogManager.java +++ b/reactor-batch-updater/src/test/java/nosql/batch/update/reactor/wal/ReactorHangingWriteAheadLogManager.java @@ -2,6 +2,7 @@ import nosql.batch.update.BatchUpdate; import nosql.batch.update.wal.WalRecord; +import nosql.batch.update.wal.WalTimeRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -40,7 +41,13 @@ public Mono deleteBatch(BATCH_ID batchId) { } @Override - public List> getStaleBatches(Duration staleThreshold) { - return writeAheadLogManager.getStaleBatches(staleThreshold); + public List getTimeRanges(Duration staleThreshold, int batchSize) { + return writeAheadLogManager.getTimeRanges(staleThreshold, batchSize); } + + @Override + public List> getStaleBatchesForRange(WalTimeRange timeRange) { + return writeAheadLogManager.getStaleBatchesForRange(timeRange); + } + }