From edda8f0d9ab79ae068a2d53ed60bf12c545a7390 Mon Sep 17 00:00:00 2001 From: David Murray Date: Sun, 5 Nov 2017 14:29:24 -0800 Subject: [PATCH 1/2] Adding support for sequence id tracking --- .../dynamodbv2/AcquireLockOptions.java | 12 +- .../dynamodbv2/AmazonDynamoDBLockClient.java | 55 ++++-- .../AmazonDynamoDBLockClientOptions.java | 42 ++++- .../services/dynamodbv2/LockItem.java | 27 ++- .../AmazonDynamoDBLockClientTest.java | 156 ++++++++++++++++-- .../dynamodbv2/BasicLockClientTests.java | 19 ++- .../dynamodbv2/InMemoryLockClientTester.java | 4 + .../services/dynamodbv2/LockItemTest.java | 7 + 8 files changed, 282 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java index cce7c05..92c8133 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java @@ -34,7 +34,7 @@ public class AcquireLockOptions { private final Optional sortKey; private final Optional data; private final Boolean replaceData; - private final Boolean deleteLockOnRelease; + private final Optional deleteLockOnRelease; private final Long refreshPeriod; private final Long additionalTimeToWaitForLock; private final TimeUnit timeUnit; @@ -53,7 +53,7 @@ public static class AcquireLockOptionsBuilder { private Optional sortKey; private Optional data; private Boolean replaceData; - private Boolean deleteLockOnRelease; + private Optional deleteLockOnRelease; private Long refreshPeriod; private Long additionalTimeToWaitForLock; private TimeUnit timeUnit; @@ -71,7 +71,7 @@ public static class AcquireLockOptionsBuilder { this.requestMetricCollector = Optional.empty(); this.data = Optional.empty(); this.replaceData = true; - this.deleteLockOnRelease = true; + this.deleteLockOnRelease = Optional.empty(); } /** @@ -116,7 +116,7 @@ public AcquireLockOptionsBuilder withReplaceData(final Boolean replaceData) { * @return a reference to this builder for fluent method chaining */ public AcquireLockOptionsBuilder withDeleteLockOnRelease(final Boolean deleteLockOnRelease) { - this.deleteLockOnRelease = deleteLockOnRelease; + this.deleteLockOnRelease = Optional.ofNullable(deleteLockOnRelease); return this; } @@ -274,7 +274,7 @@ public static AcquireLockOptionsBuilder builder(final String partitionKey) { } private AcquireLockOptions(final String partitionKey, final Optional sortKey, final Optional data, final Boolean replaceData, - final Boolean deleteLockOnRelease, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit, + final Optional deleteLockOnRelease, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit, final Map additionalAttributes, final Optional sessionMonitor, final Optional requestMetricCollector) { this.partitionKey = partitionKey; @@ -306,7 +306,7 @@ Boolean getReplaceData() { return this.replaceData; } - Boolean getDeleteLockOnRelease() { + Optional getDeleteLockOnRelease() { return this.deleteLockOnRelease; } diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java index ed1128e..17f670c 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java @@ -185,6 +185,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { private final long leaseDurationInMilliseconds; private final long heartbeatPeriodInMilliseconds; private final String ownerName; + private final boolean trackSequenceIds; private final ConcurrentHashMap locks; private final ConcurrentHashMap sessionMonitors; private final Optional backgroundThread; @@ -196,6 +197,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String OWNER_NAME = "ownerName"; protected static final String LEASE_DURATION = "leaseDuration"; protected static final String RECORD_VERSION_NUMBER = "recordVersionNumber"; + protected static final String SEQUENCE_ID = "sequenceId"; protected static final String IS_RELEASED = "isReleased"; protected static final String IS_RELEASED_VALUE = "1"; protected static final AttributeValue IS_RELEASED_ATTRIBUTE_VALUE = new AttributeValue(IS_RELEASED_VALUE); @@ -227,6 +229,7 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna this.locks = new ConcurrentHashMap<>(); this.sessionMonitors = new ConcurrentHashMap<>(); this.ownerName = amazonDynamoDBLockClientOptions.getOwnerName(); + this.trackSequenceIds = amazonDynamoDBLockClientOptions.getTrackSequenceIds(); this.leaseDurationInMilliseconds = amazonDynamoDBLockClientOptions.getTimeUnit().toMillis(amazonDynamoDBLockClientOptions.getLeaseDuration()); this.heartbeatPeriodInMilliseconds = amazonDynamoDBLockClientOptions.getTimeUnit().toMillis(amazonDynamoDBLockClientOptions.getHeartbeatPeriod()); this.partitionKeyName = amazonDynamoDBLockClientOptions.getPartitionKeyName(); @@ -358,7 +361,8 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options .getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options - .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) { + .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get()) + || this.trackSequenceIds && options.getAdditionalAttributes().containsKey(SEQUENCE_ID)) { throw new IllegalArgumentException(String .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION, RECORD_VERSION_NUMBER, DATA)); @@ -376,7 +380,16 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran refreshPeriodInMilliseconds = options.getTimeUnit().toMillis(options.getRefreshPeriod()); } - final boolean deleteLockOnRelease = options.getDeleteLockOnRelease(); + final boolean deleteLockOnRelease; + if (this.trackSequenceIds) { + if (options.getDeleteLockOnRelease().isPresent() && options.getDeleteLockOnRelease().get()) { + throw new IllegalArgumentException("Cannot set deleteLockOnRelease when sequence id tracking is enabled"); + } + deleteLockOnRelease = false; + } else { + deleteLockOnRelease = options.getDeleteLockOnRelease().orElse(true); + } + final boolean replaceData = options.getReplaceData(); final Optional sessionMonitor = options.getSessionMonitor(); @@ -412,6 +425,17 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran newLockData = options.getData(); // If there is no existing data, we write the input data to the lock. } + // If we're tracking sequence IDs, initialize or increment the current value. + Optional sequenceId = Optional.empty(); + if (trackSequenceIds) { + sequenceId = existingLock.flatMap(value -> value.getSequenceId()); + if (sequenceId.isPresent()) { + sequenceId = sequenceId.map(value -> value + 1); + } else { + sequenceId = Optional.of(1L); + } + } + final Map item = new HashMap<>(); item.putAll(options.getAdditionalAttributes()); item.put(this.partitionKeyName, new AttributeValue().withS(key)); @@ -419,13 +443,14 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran item.put(LEASE_DURATION, new AttributeValue().withS(String.valueOf(this.leaseDurationInMilliseconds))); final String recordVersionNumber = this.generateRecordVersionNumber(); item.put(RECORD_VERSION_NUMBER, new AttributeValue().withS(String.valueOf(recordVersionNumber))); + sequenceId.ifPresent(sid -> item.put(SEQUENCE_ID, new AttributeValue().withN(sid.toString()))); sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, new AttributeValue().withS(sortKey.get()))); newLockData.ifPresent(byteBuffer -> item.put(DATA, new AttributeValue().withB(byteBuffer))); //if the existing lock does not exist or exists and is released if (!existingLock.isPresent() || existingLock.get().isReleased()) { return upsertAndMonitorNewOrReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, - newLockData, item, recordVersionNumber); + newLockData, item, recordVersionNumber, sequenceId); } // we know that we didnt enter the if block above because it returns at the end. // we also know that the existingLock.isPresent() is true @@ -447,7 +472,7 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran /* If the version numbers match, then we can acquire the lock, assuming it has already expired */ if (lockTryingToBeAcquired.isExpired()) { return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item, - recordVersionNumber); + recordVersionNumber, sequenceId); } } else { /* @@ -484,7 +509,8 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran } private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber) { + Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber, + Optional sequenceId) { final String conditionalExpression; final Map expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(RVN_VALUE_EXPRESSION_VARIABLE, new AttributeValue(existingLock.get().getRecordVersionNumber())); @@ -503,7 +529,7 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String .withExpressionAttributeNames(expressionAttributeNames).withExpressionAttributeValues(expressionAttributeValues); logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); - return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); + return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, sequenceId, putItemRequest); } /** @@ -520,7 +546,7 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String */ private LockItem upsertAndMonitorNewOrReleasedLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, Optional sessionMonitor, - Optional newLockData, Map item, String recordVersionNumber) { + Optional newLockData, Map item, String recordVersionNumber, Optional sequenceId) { final Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, this.partitionKeyName); expressionAttributeNames.put(IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED); @@ -534,11 +560,12 @@ private LockItem upsertAndMonitorNewOrReleasedLock(AcquireLockOptions options, S * sooner than it actually will, so they start counting towards its expiration before the Put succeeds */ logger.trace("Acquiring a new lock or an existing yet released lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey); - return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); + return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, sequenceId, putItemRequest); } private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional newLockData, String recordVersionNumber, PutItemRequest putItemRequest) { + Optional sessionMonitor, Optional newLockData, String recordVersionNumber, Optional sequenceId, + PutItemRequest putItemRequest) { final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime(); if (options.getRequestMetricCollector().isPresent()) { putItemRequest.setRequestMetricCollector(options.getRequestMetricCollector().get()); @@ -547,7 +574,7 @@ private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, S final LockItem lockItem = new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime, - recordVersionNumber, false, sessionMonitor, options.getAdditionalAttributes()); + recordVersionNumber, sequenceId, false, sessionMonitor, options.getAdditionalAttributes()); this.locks.put(lockItem.getUniqueIdentifier(), lockItem); this.tryAddSessionMonitor(lockItem.getUniqueIdentifier(), lockItem); return lockItem; @@ -760,6 +787,10 @@ private LockItem createLockItem(final GetLockOptions options, final Map sequenceId = trackSequenceIds + ? Optional.ofNullable(item.remove(SEQUENCE_ID)).map(value -> Long.valueOf(value.getN())) + : Optional.empty(); + final boolean isReleased = item.containsKey(IS_RELEASED); item.remove(IS_RELEASED); item.remove(this.partitionKeyName); @@ -777,7 +808,9 @@ private LockItem createLockItem(final GetLockOptions options, final Map sortKeyName; private final String ownerName; + private final boolean trackSequenceIds; private final Long leaseDuration; private final Long heartbeatPeriod; private final TimeUnit timeUnit; @@ -58,6 +60,7 @@ public static class AmazonDynamoDBLockClientOptionsBuilder { private String partitionKeyName; private Optional sortKeyName; private String ownerName; + private boolean trackSequenceIds; private Long leaseDuration; private Long heartbeatPeriod; private TimeUnit timeUnit; @@ -88,6 +91,7 @@ private static Function namedThreadCreator() { this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; this.partitionKeyName = DEFAULT_PARTITION_KEY_NAME; + this.trackSequenceIds = DEFAULT_TRACK_SEQUENCE_IDS; this.leaseDuration = DEFAULT_LEASE_DURATION; this.heartbeatPeriod = DEFAULT_HEARTBEAT_PERIOD; this.timeUnit = DEFAULT_TIME_UNIT; @@ -128,6 +132,22 @@ public AmazonDynamoDBLockClientOptionsBuilder withOwnerName(final String ownerNa return this; } + /** + * @param trackSequenceIds True to track sequence IDs for locks acquired by this client. + * If sequence ID tracking is enabled, a monotonically-increasing + * sequence ID is associated with each lock and incremented whenever + * the lock is acquired. The sequence ID can be included in write + * requests issued while holding the lock, and used by downstream + * systems to detect and reject out-of-order writes from a system + * which used to hold the lock but no longer does when the write + * arrives. + * @return a reference to this builder for fluent method chaining + */ + public AmazonDynamoDBLockClientOptionsBuilder withSequenceIdTracking(final boolean trackSequenceIds) { + this.trackSequenceIds = trackSequenceIds; + return this; + } + /** * @param leaseDuration The length of time that the lease for the lock will be * granted for. If this is set to, for example, 30 seconds, @@ -185,15 +205,15 @@ public AmazonDynamoDBLockClientOptionsBuilder withCreateHeartbeatBackgroundThrea public AmazonDynamoDBLockClientOptions build() { Objects.requireNonNull(this.tableName, "Table Name must not be null"); Objects.requireNonNull(this.ownerName, "Owner Name must not be null"); - return new AmazonDynamoDBLockClientOptions(this.dynamoDBClient, this.tableName, this.partitionKeyName, this.sortKeyName, this.ownerName, this.leaseDuration, - this.heartbeatPeriod, this.timeUnit, this.createHeartbeatBackgroundThread, this.namedThreadCreator); + return new AmazonDynamoDBLockClientOptions(this.dynamoDBClient, this.tableName, this.partitionKeyName, this.sortKeyName, this.ownerName, this.trackSequenceIds, + this.leaseDuration, this.heartbeatPeriod, this.timeUnit, this.createHeartbeatBackgroundThread, this.namedThreadCreator); } @Override public String toString() { return "AmazonDynamoDBLockClientOptionsBuilder(dynamoDBClient=" + this.dynamoDBClient + ", tableName=" + this.tableName + ", partitionKeyName=" + this.partitionKeyName - + ", sortKeyName=" + this.sortKeyName + ", ownerName=" + this.ownerName + ", leaseDuration=" + this.leaseDuration + ", heartbeatPeriod=" + this.heartbeatPeriod - + ", timeUnit=" + this.timeUnit + ", createHeartbeatBackgroundThread=" + this.createHeartbeatBackgroundThread + ")"; + + ", sortKeyName=" + this.sortKeyName + ", ownerName=" + this.ownerName + ", trackSequenceIds=" + trackSequenceIds + ", leaseDuration=" + this.leaseDuration + + ", heartbeatPeriod=" + this.heartbeatPeriod + ", timeUnit=" + this.timeUnit + ", createHeartbeatBackgroundThread=" + this.createHeartbeatBackgroundThread + ")"; } } @@ -211,13 +231,14 @@ public static AmazonDynamoDBLockClientOptionsBuilder builder(final AmazonDynamoD } private AmazonDynamoDBLockClientOptions(final AmazonDynamoDB dynamoDBClient, final String tableName, final String partitionKeyName, final Optional sortKeyName, - final String ownerName, final Long leaseDuration, final Long heartbeatPeriod, final TimeUnit timeUnit, final Boolean createHeartbeatBackgroundThread, - final Function namedThreadCreator) { + final String ownerName, final boolean trackSequenceIds, final Long leaseDuration, final Long heartbeatPeriod, final TimeUnit timeUnit, + final Boolean createHeartbeatBackgroundThread, final Function namedThreadCreator) { this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; this.partitionKeyName = partitionKeyName; this.sortKeyName = sortKeyName; this.ownerName = ownerName; + this.trackSequenceIds = trackSequenceIds; this.leaseDuration = leaseDuration; this.heartbeatPeriod = heartbeatPeriod; this.timeUnit = timeUnit; @@ -261,6 +282,13 @@ String getOwnerName() { return this.ownerName; } + /** + * @return True to track a monotonically-increasing sequence ID for each lock. + */ + boolean getTrackSequenceIds() { + return trackSequenceIds; + } + /** * @return The length of time that the lease for the lock will be granted * for. If this is set to, for example, 30 seconds, then the lock @@ -300,4 +328,4 @@ Boolean getCreateHeartbeatBackgroundThread() { Function getNamedThreadCreator() { return this.namedThreadCreator; } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java b/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java index f670510..5379bf4 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java @@ -45,6 +45,7 @@ public class LockItem implements Closeable { private final AtomicLong lookupTime; private final StringBuffer recordVersionNumber; + private final Optional sequenceId; private final AtomicLong leaseDuration; private final Map additionalAttributes; @@ -68,6 +69,9 @@ public class LockItem implements Closeable { * @param lastUpdatedTimeInMilliseconds How recently the lock was updated (in milliseconds) * @param recordVersionNumber The current record version number of the lock -- this is * globally unique and changes each time the lock is updated + * @param sequenceId The monotonically-increasing sequence ID of the lock -- + * this is incremented every time the lock ownership changes + * if sequence ID tracking is enabled * @param isReleased Whether the item in DynamoDB is marked as released, but still * exists in the table * @param sessionMonitor Optionally, the SessionMonitor object with which to associate @@ -76,12 +80,14 @@ public class LockItem implements Closeable { * the lock */ LockItem(final AmazonDynamoDBLockClient client, final String partitionKey, final Optional sortKey, final Optional data, final boolean deleteLockItemOnClose, - final String ownerName, final long leaseDuration, final long lastUpdatedTimeInMilliseconds, final String recordVersionNumber, final boolean isReleased, - final Optional sessionMonitor, final Map additionalAttributes) { + final String ownerName, final long leaseDuration, final long lastUpdatedTimeInMilliseconds, final String recordVersionNumber, final Optional sequenceId, + final boolean isReleased, final Optional sessionMonitor, final Map additionalAttributes) { + Objects.requireNonNull(partitionKey, "Cannot create a lock with a null key"); Objects.requireNonNull(ownerName, "Cannot create a lock with a null owner"); Objects.requireNonNull(sortKey, "Cannot create a lock with a null sortKey (use Optional.empty())"); Objects.requireNonNull(data, "Cannot create a lock with a null data (use Optional.empty())"); + this.client = client; this.partitionKey = partitionKey; this.sortKey = sortKey; @@ -92,6 +98,7 @@ public class LockItem implements Closeable { this.leaseDuration = new AtomicLong(leaseDuration); this.lookupTime = new AtomicLong(lastUpdatedTimeInMilliseconds); this.recordVersionNumber = new StringBuffer(recordVersionNumber); + this.sequenceId = sequenceId; this.isReleased = isReleased; this.sessionMonitor = sessionMonitor; this.additionalAttributes = additionalAttributes; @@ -162,6 +169,18 @@ public String getRecordVersionNumber() { return this.recordVersionNumber.toString(); } + /** + * Returns the current sequence id of the lock in DynamoDB. This allows downstream systems to detect out-of-order writes + * by a previous writer who has not yet realized that they have lost the lock. Only populated when sequence ID tracking + * has been enabled. + * + * @return the sequence id associated with the lock, if set + * @see AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder#withSequenceIdTracking(boolean) + */ + public Optional getSequenceId() { + return sequenceId; + } + /** * Returns the amount of time that the client has this lock for, which can be kept up to date by calling * {@code sendHeartbeat}. @@ -195,8 +214,8 @@ public void close() { @Override public String toString() { return String - .format("LockItem{Partition Key=%s, Sort Key=%s, Owner Name=%s, Lookup Time=%d, Lease Duration=%d, " + "Record Version Number=%s, Delete On Close=%s, Is Released=%s}", - this.partitionKey, this.sortKey, this.ownerName, this.lookupTime.get(), this.leaseDuration.get(), this.recordVersionNumber, this.deleteLockItemOnClose, + .format("LockItem{Partition Key=%s, Sort Key=%s, Owner Name=%s, Lookup Time=%d, Lease Duration=%d, " + "Record Version Number=%s, Sequence ID=%s, Delete On Close=%s, Is Released=%s}", + this.partitionKey, this.sortKey, this.ownerName, this.lookupTime.get(), this.leaseDuration.get(), this.recordVersionNumber, this.sequenceId, this.deleteLockItemOnClose, this.isReleased); } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java index ecf4b93..a5b5d38 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java @@ -15,6 +15,7 @@ package com.amazonaws.services.dynamodbv2; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -35,8 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import javax.swing.text.html.Option; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -139,6 +138,134 @@ public void createLockTableInDynamoDB_whenMetricCollectorIsPresent() { verify(createTableOptions, times(2)).getRequestMetricCollector(); } + @Test + public void acquireLock_whenLockDoesNotExist() throws InterruptedException { + setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClient(); + when(dynamodb.getItem(any())).thenReturn(new GetItemResult()); + LockItem item = client.acquireLock(AcquireLockOptions.builder("asdf").build()); + assertNotNull(item); + assertFalse(item.getSequenceId().isPresent()); + } + + @Test + public void acquireLock_whenLockDoesNotExist_andWithSequenceIdTracking() throws InterruptedException { + setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClientWithSequenceIdTracking(); + when(dynamodb.getItem(any())).thenReturn(new GetItemResult()); + LockItem item = client.acquireLock(AcquireLockOptions.builder("asdf").build()); + assertNotNull(item); + assertEquals(1L, (long) item.getSequenceId().get()); + } + + @Test + public void acquireLock_whenLockIsReleased() throws InterruptedException { + UUID uuid = setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClient(); + + when(dynamodb.getItem(any())) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + .withBoolean("isReleased", true) + ))); + + LockItem item = client.acquireLock(AcquireLockOptions.builder("customer1") + .withAdditionalTimeToWaitForLock(1000L) + .withTimeUnit(TimeUnit.MILLISECONDS) + .build()); + + assertNotNull(item); + assertFalse(item.getSequenceId().isPresent()); + } + + @Test + public void acquireLock_whenLockIsReleased_andWithSequenceIdTracking() throws InterruptedException { + UUID uuid = setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClientWithSequenceIdTracking(); + + when(dynamodb.getItem(any())) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + .withBoolean("isReleased", true) + ))); + + LockItem item = client.acquireLock(AcquireLockOptions.builder("customer1") + .withAdditionalTimeToWaitForLock(1000L) + .withTimeUnit(TimeUnit.MILLISECONDS) + .build()); + + assertNotNull(item); + assertEquals(124L, (long) item.getSequenceId().get()); + } + + @Test + public void acquireLock_whenLockHasExpired() throws InterruptedException { + UUID uuid = setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClient(); + + when(dynamodb.getItem(any())) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + ))) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + ))); + + LockItem item = client.acquireLock(AcquireLockOptions.builder("customer1") + .withAdditionalTimeToWaitForLock(1000L) + .withTimeUnit(TimeUnit.MILLISECONDS) + .build()); + + assertNotNull(item); + assertFalse(item.getSequenceId().isPresent()); + } + + @Test + public void acquireLock_whenLockHasExpired_andWithSequenceIdTracking() throws InterruptedException { + UUID uuid = setOwnerNameToUuid(); + AmazonDynamoDBLockClient client = getLockClientWithSequenceIdTracking(); + + when(dynamodb.getItem(any())) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + ))) + .thenReturn(new GetItemResult().withItem(InternalUtils.toAttributeValues(new Item() + .withString("customer", "customer1") + .withString("ownerName", "foobar") + .withString("recordVersionNumber", uuid.toString()) + .withNumber("sequenceId", 123L) + .withString("leaseDuration", "1") + ))); + + LockItem item = client.acquireLock(AcquireLockOptions.builder("customer1") + .withAdditionalTimeToWaitForLock(1000L) + .withTimeUnit(TimeUnit.MILLISECONDS) + .build()); + + assertNotNull(item); + assertEquals(124L, (long) item.getSequenceId().get()); + } + @Test(expected = LockNotGrantedException.class) public void acquireLock_whenLockAlreadyExists_throwLockNotGrantedException() throws InterruptedException { setOwnerNameToUuid(); @@ -191,7 +318,7 @@ public void sendHeartbeat_whenDeleteDataTrueAndDataNotNull_throwsIllegalArgument UUID uuid = setOwnerNameToUuid(); AmazonDynamoDBLockClient client = getLockClient(); LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), - false, uuid.toString(), 1L, 2L, "rvn", false, + false, uuid.toString(), 1L, 2L, "rvn", Optional.empty(), false, Optional.empty(), null); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(true).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -203,7 +330,7 @@ public void sendHeartbeat_whenExpired_throwsLockNotGrantedException() { long lastUpdatedTimeInMilliseconds = 2l; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", Optional.empty(), false, Optional.empty(), null); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -214,7 +341,7 @@ public void sendHeartbeat_whenNotExpiredAndDifferentOwner_throwsLockNotGrantedEx long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, "different owner", 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", Optional.empty(), false, Optional.empty(), null); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -225,7 +352,7 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedTrue_throwsLockNot long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", true, Optional.empty(), null); + "rvn", Optional.empty(), true, Optional.empty(), null); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -236,7 +363,7 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedFalse_setsRequestM long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", Optional.empty(), false, Optional.empty(), null); client.sendHeartbeat(SendHeartbeatOptions.builder(item) .withDeleteData(null) .withData(ByteBuffer.wrap("data".getBytes())) @@ -245,16 +372,23 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedFalse_setsRequestM } private AmazonDynamoDBLockClient getLockClient() { - return spy(new AmazonDynamoDBLockClient( + return new AmazonDynamoDBLockClient( getLockClientBuilder(null) - .build())); + .build()); } private AmazonDynamoDBLockClient getLockClientWithSortKey() { - return spy(new AmazonDynamoDBLockClient( + return new AmazonDynamoDBLockClient( getLockClientBuilder(null) .withSortKeyName("sort") - .build())); + .build()); + } + + private AmazonDynamoDBLockClient getLockClientWithSequenceIdTracking() { + return new AmazonDynamoDBLockClient( + getLockClientBuilder(null) + .withSequenceIdTracking(true) + .build()); } private AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder getLockClientBuilder(Function threadFactoryFunction) { diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java b/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java index 537a70f..adb8979 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java @@ -206,6 +206,23 @@ public void testAcquireLockLeaveOrReplaceDataFromAcquiredLock() throws IOExcepti item.close(); } + @Test + public void testAcquireLockWithSequenceIdTracking() throws Exception { + LockItem item = this.lockClientWithSequenceIdTracking.acquireLock(AcquireLockOptions.builder("testKeySid").build()); + assertEquals(1L, (long) item.getSequenceId().get()); + item.close(); + + item = this.lockClientWithSequenceIdTracking.acquireLock(AcquireLockOptions.builder("testKeySid").build()); + assertEquals(2L, (long) item.getSequenceId().get()); + + item.sendHeartBeat(); + item.sendHeartBeat(); + item.sendHeartBeat(); + + item = this.lockClientWithSequenceIdTracking.acquireLock(AcquireLockOptions.builder("testKeySid").build()); + assertEquals(3L, (long) item.getSequenceId().get()); + } + @Test public void testSendHeatbeatWithRangeKey() throws IOException, LockNotGrantedException, InterruptedException { @@ -610,7 +627,7 @@ private void testInvalidAttribute(final String invalidAttribute) throws LockNotG public void testLockItemToString() throws LockNotGrantedException, InterruptedException { final LockItem lockItem = this.lockClient.acquireLock(ACQUIRE_LOCK_OPTIONS_TEST_KEY_1); final Pattern p = Pattern.compile("LockItem\\{Partition Key=testKey1, Sort Key=Optional.empty, Owner Name=" + INTEGRATION_TESTER + ", Lookup Time=\\d+, Lease Duration=3000, " - + "Record Version Number=\\w+-\\w+-\\w+-\\w+-\\w+, Delete On Close=true, Is Released=false\\}"); + + "Record Version Number=\\w+-\\w+-\\w+-\\w+-\\w+, Sequence ID=Optional.empty, Delete On Close=true, Is Released=false\\}"); assertTrue(p.matcher(lockItem.toString()).matches()); } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/InMemoryLockClientTester.java b/src/test/java/com/amazonaws/services/dynamodbv2/InMemoryLockClientTester.java index 7390702..9bf5e72 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/InMemoryLockClientTester.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/InMemoryLockClientTester.java @@ -69,6 +69,7 @@ public class InMemoryLockClientTester { protected AmazonDynamoDB dynamoDBMock; protected AmazonDynamoDBLockClient lockClient; protected AmazonDynamoDBLockClient lockClientWithHeartbeating; + protected AmazonDynamoDBLockClient lockClientWithSequenceIdTracking; protected AmazonDynamoDBLockClient shortLeaseLockClient; protected AmazonDynamoDBLockClient lockClientNoTable; protected AmazonDynamoDBLockClientOptions lockClient1Options; @@ -95,6 +96,9 @@ public void setUp() { this.lockClientWithHeartbeating = new AmazonDynamoDBLockClient( new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER).withLeaseDuration(3L).withHeartbeatPeriod(1L) .withTimeUnit(TimeUnit.SECONDS).withCreateHeartbeatBackgroundThread(true).build()); + this.lockClientWithSequenceIdTracking = new AmazonDynamoDBLockClient( + new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER).withLeaseDuration(3L).withHeartbeatPeriod(1L) + .withTimeUnit(TimeUnit.SECONDS).withCreateHeartbeatBackgroundThread(false).withSequenceIdTracking(true).build()); this.lockClientNoTable = new AmazonDynamoDBLockClient( new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, "doesNotExist", INTEGRATION_TESTER).withLeaseDuration(3L).withHeartbeatPeriod(1L) .withTimeUnit(TimeUnit.SECONDS).withCreateHeartbeatBackgroundThread(false).build()); diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java index 7c34b03..d56121b 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java @@ -67,6 +67,7 @@ public void equals_differentPartitionKey_returnFalse() { 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor new HashMap<>()))); @@ -82,6 +83,7 @@ public void equals_differentOwner_returnFalse() { 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor new HashMap<>()))); @@ -102,6 +104,7 @@ public void isExpired_whenIsReleasedTrue_returnTrue() { 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), true, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor new HashMap<>()).isExpired()); @@ -122,6 +125,7 @@ public void ensure_whenIsReleasedTrue_throwsLockNotGrantedException() { 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), true, //released Optional.empty(), //session monitor new HashMap<>()).ensure(2L, TimeUnit.MILLISECONDS); @@ -137,6 +141,7 @@ public void millisecondsUntilDangerZoneEntered_whenIsReleasedTrue_throwsIllegalS 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), true, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor new HashMap<>()).millisecondsUntilDangerZoneEntered(); @@ -164,6 +169,7 @@ public void hasCallback_sessionMonitorNotPresent_throwSessionMonitorNotSetExcept 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), false, //released Optional.empty(), //session monitor new HashMap<>()).hasCallback(); @@ -178,6 +184,7 @@ static LockItem createLockItem(AmazonDynamoDBLockClient lockClient) { 1L, //lease duration 1000, //last updated time in milliseconds "recordVersionNumber", + Optional.empty(), false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor new HashMap<>()); //additional attributes From 262714128883a61e95f44e53aca3230c995b3039 Mon Sep 17 00:00:00 2001 From: David Murray Date: Sun, 5 Nov 2017 15:40:25 -0800 Subject: [PATCH 2/2] Adding README section on fencing --- README.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/README.md b/README.md index d9a65da..7cb12c5 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,34 @@ You can read the data in the lock without acquiring it, and find out who owns th LockItem lock = lockClient.getLock("Moe"); ``` +### Fencing +By default the lock client relies on lock holders to ensure that they have finished all of their work before their +lease expires. This can be [hard to guarantee](https://fpj.me/2016/02/10/note-on-fencing-and-distributed-locks/) +when you account for things like thread schedulers, garbage collectors, and network latency. To protect against +corruption in the event that a request from an "old" lock holder arrives after its lease has expired and the lock +has been acquired by a different system, the lock client can optionally associate a unique, monotonically-increasing +**sequence id** with each lease: + +```java +AmazonDynamoDBLockClient client = new AmazonDynamoDBLockClient( + AmazonDynamoDBLockClientOptions.builder(dynamoDB, "lockTable") + .withSequenceIdTracking(true) + .withLeaseDuration(10L) + .withTimeUnit(TimeUnit.SECONDS) + .build()); + +try (LockItem lock = client.acquireLock(AcquireLockOptions.builder("GloriousLeader").build())) { + // Simulate a particularly nasty garbage collection pause. + Thread.sleep(30 * 1000); + + // Send a message to all of our followers; they can ignore it if they've already heard from a + // leader with a later sequence id. + for (String follower : getFollowers()) { + sendMessage(follower, "Hello from leader " + lock.getSequenceId() + "!"); + } +} +``` + ## How we handle clock skew The lock client never stores absolute times in DynamoDB -- only the relative "lease duration" time is stored in DynamoDB. The way locks are expired is that a call to acquireLock reads in the current lock, checks the