diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java index 852ce92..d537bcf 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java @@ -45,6 +45,7 @@ public class AcquireLockOptions { private final Boolean acquireReleasedLocksConsistently; private final Optional sessionMonitor; private final Boolean reentrant; + private final Optional clockSkewUpperBound; /** * Setting this flag to true will prevent the thread from being blocked (put to sleep) for the lease duration and @@ -72,6 +73,7 @@ public static class AcquireLockOptionsBuilder { private Boolean updateExistingLockRecord; private Boolean acquireReleasedLocksConsistently; private Boolean reentrant; + private Optional clockSkewUpperBound; private long safeTimeWithoutHeartbeat; private Optional sessionMonitorCallback; @@ -90,6 +92,7 @@ public static class AcquireLockOptionsBuilder { this.shouldSkipBlockingWait = false; this.acquireReleasedLocksConsistently = false; this.reentrant = false; + this.clockSkewUpperBound = Optional.empty(); } /** @@ -256,6 +259,22 @@ public AcquireLockOptionsBuilder withReentrant(final boolean reentrant) { return this; } + /** + * In combination with withShouldSkipBlockingWait(true) this allows a node to rely on a lastTouchedAt value on the DynamoDb + * entry to take over locks which have expired but have not been deleted or marked as released within DynamoDb (which can occur + * due to an ungraceful shutdown of the owning node). + * + * It's critically important that this error bound is accurate to the nodes that are relying on the lock client. If not, + * correctness problems can occur. + * + * @param clockSkewUpperBound the upper error bound in milliseconds of clock skew across the nodes running this client + * @return a reference to this builder for fluent method chaining + */ + public AcquireLockOptionsBuilder withClockSkewUpperBound(final Long clockSkewUpperBound) { + this.clockSkewUpperBound = Optional.ofNullable(clockSkewUpperBound); + return this; + } + /** *

* Registers a "SessionMonitor." @@ -333,7 +352,7 @@ public AcquireLockOptions build() { } return new AcquireLockOptions(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, sessionMonitor, - this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant); + this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound); } @Override @@ -342,7 +361,8 @@ public String toString() { + this.replaceData + ", deleteLockOnRelease=" + this.deleteLockOnRelease + ", refreshPeriod=" + this.refreshPeriod + ", additionalTimeToWaitForLock=" + this.additionalTimeToWaitForLock + ", timeUnit=" + this.timeUnit + ", additionalAttributes=" + this.additionalAttributes + ", safeTimeWithoutHeartbeat=" + this.safeTimeWithoutHeartbeat + ", sessionMonitorCallback=" + this.sessionMonitorCallback + ", acquireReleasedLocksConsistently=" - + this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ")"; + + this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ", this.clockSkewUpperBound=" + this.clockSkewUpperBound + + ")"; } } @@ -360,7 +380,8 @@ public static AcquireLockOptionsBuilder builder(final String partitionKey) { private AcquireLockOptions(final String partitionKey, final Optional sortKey, final Optional data, final Boolean replaceData, final Boolean deleteLockOnRelease, final Boolean acquireOnlyIfLockAlreadyExists, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit, final Map additionalAttributes, final Optional sessionMonitor, - final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant) { + final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant, + final Optional clockSkewUpperBound) { this.partitionKey = partitionKey; this.sortKey = sortKey; this.data = data; @@ -376,6 +397,7 @@ private AcquireLockOptions(final String partitionKey, final Optional sor this.shouldSkipBlockingWait = shouldSkipBlockingWait; this.acquireReleasedLocksConsistently = acquireReleasedLocksConsistently; this.reentrant = reentrant; + this.clockSkewUpperBound = clockSkewUpperBound; } String getPartitionKey() { @@ -424,6 +446,8 @@ Boolean getReentrant() { return this.reentrant; } + Optional getClockSkewUpperBound() { return this.clockSkewUpperBound; } + Map getAdditionalAttributes() { return this.additionalAttributes; } @@ -460,7 +484,8 @@ public boolean equals(final Object other) { && Objects.equals(this.updateExistingLockRecord, otherOptions.updateExistingLockRecord) && Objects.equals(this.shouldSkipBlockingWait, otherOptions.shouldSkipBlockingWait) && Objects.equals(this.acquireReleasedLocksConsistently, otherOptions.acquireReleasedLocksConsistently) - && Objects.equals(this.reentrant, otherOptions.reentrant); + && Objects.equals(this.reentrant, otherOptions.reentrant) + && Objects.equals(this.clockSkewUpperBound, otherOptions.clockSkewUpperBound); } @Override @@ -468,7 +493,7 @@ public int hashCode() { return Objects.hash(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, this.sessionMonitor, this.updateExistingLockRecord, - this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant); + this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound); } diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java index da30ee4..1f7aa6d 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -154,6 +155,8 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String DATA_VALUE_EXPRESSION_VARIABLE = ":d"; protected static final String IS_RELEASED_PATH_EXPRESSION_VARIABLE = "#ir"; protected static final String IS_RELEASED_VALUE_EXPRESSION_VARIABLE = ":ir"; + protected static final String LAST_TOUCHED_AT_PATH_EXPRESSION_VARIABLE = "#lta"; + protected static final String LAST_TOUCHED_AT_VALUE_EXPRESSION_VARIABLE = ":lta"; //attribute_not_exists(#pk) protected static final String ACQUIRE_LOCK_THAT_DOESNT_EXIST_PK_CONDITION = String.format( @@ -212,11 +215,14 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final String UPDATE_LEASE_DURATION_AND_RVN = String.format( "SET %s = %s, %s = %s", LEASE_DURATION_PATH_VALUE_EXPRESSION_VARIABLE, LEASE_DURATION_VALUE_EXPRESSION_VARIABLE, RVN_PATH_EXPRESSION_VARIABLE, NEW_RVN_VALUE_EXPRESSION_VARIABLE); - protected static final String UPDATE_LEASE_DURATION_AND_RVN_AND_REMOVE_DATA = String.format("%s REMOVE %s", UPDATE_LEASE_DURATION_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE); - protected static final String UPDATE_LEASE_DURATION_AND_RVN_AND_DATA = String.format("%s, %s = %s", - UPDATE_LEASE_DURATION_AND_RVN, DATA_PATH_EXPRESSION_VARIABLE, DATA_VALUE_EXPRESSION_VARIABLE); protected static final String REMOVE_IS_RELEASED_UPDATE_EXPRESSION = String.format(" REMOVE %s ", IS_RELEASED_PATH_EXPRESSION_VARIABLE); protected static final String QUERY_PK_EXPRESSION = String.format("%s = %s", PK_PATH_EXPRESSION_VARIABLE, PK_VALUE_EXPRESSION_VARIABLE); + protected static final String + ALSO_UPDATE_TOUCHED_AT = String.format(", %s = %s", LAST_TOUCHED_AT_PATH_EXPRESSION_VARIABLE, LAST_TOUCHED_AT_VALUE_EXPRESSION_VARIABLE); + protected static final String + ALSO_UPDATE_DATA = String.format(", %s = %s", DATA_PATH_EXPRESSION_VARIABLE, DATA_VALUE_EXPRESSION_VARIABLE); + protected static final String + ALSO_DELETE_DATA = String.format(" REMOVE %s", DATA_PATH_EXPRESSION_VARIABLE); static { availableStatuses = new HashSet<>(); @@ -248,6 +254,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { protected static final AttributeValue IS_RELEASED_ATTRIBUTE_VALUE = AttributeValue.builder().s(IS_RELEASED_VALUE).build(); protected static volatile AtomicInteger lockClientId = new AtomicInteger(0); protected static final Boolean IS_RELEASED_INDICATOR = true; + protected static final String LAST_TOUCHED_AT = "lastTouchedAt"; /* * Used as a default buffer for how long extra to wait when querying DynamoDB for a lock in acquireLock (can be overriden by * specifying a timeout when calling acquireLock) @@ -420,10 +427,11 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options .getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options - .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) { + .getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options + .getAdditionalAttributes().containsKey(this.sortKeyName.get()) || options.getAdditionalAttributes().containsKey(LAST_TOUCHED_AT)) { throw new IllegalArgumentException(String - .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION, - RECORD_VERSION_NUMBER, DATA)); + .format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION, + RECORD_VERSION_NUMBER, DATA, LAST_TOUCHED_AT)); } long millisecondsToWait = DEFAULT_BUFFER_MS; @@ -469,14 +477,6 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran throw new LockNotGrantedException("Lock does not exist."); } - if (options.shouldSkipBlockingWait() && existingLock.isPresent() && !existingLock.get().isExpired()) { - /* - * The lock is being held by some one and is still not expired. And the caller explicitly said not to perform a blocking wait; - * We will throw back a lock not grant exception, so that the caller can retry if needed. - */ - throw new LockCurrentlyUnavailableException("The lock being requested is being held by another client."); - } - Optional newLockData = Optional.empty(); if (replaceData) { newLockData = options.getData(); @@ -497,14 +497,48 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran item.put(RECORD_VERSION_NUMBER, AttributeValue.builder().s(String.valueOf(recordVersionNumber)).build()); sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, AttributeValue.builder().s(sortKey.get()).build())); newLockData.ifPresent(byteBuffer -> item.put(DATA, AttributeValue.builder().b(SdkBytes.fromByteBuffer(byteBuffer)).build())); + Optional lastTouchedAt = Optional.empty(); + if (options.getClockSkewUpperBound().isPresent()) { + lastTouchedAt = Optional.of(new AtomicLong(System.currentTimeMillis())); + } + lastTouchedAt.ifPresent(value -> item.put(LAST_TOUCHED_AT, AttributeValue.builder().s(String.valueOf(value.get())).build())); + + if (options.shouldSkipBlockingWait() && existingLock.isPresent() && !existingLock.get().isExpired()) { + LockItem lockItem = existingLock.get(); + if (options.getClockSkewUpperBound().isPresent()) { + // If we have provided an upper clock skew error bound then we rely on lastTouchedAt to know + // whether the lock is expired without having to block. + Long clockSkewUpperBound = options.getClockSkewUpperBound().get(); + if (lockItem.getLastTouchedAt().isPresent()) { + // If the lock hasn't been touched since the lease duration plus error bound then it's ours! + if (lockItem.isExpired(clockSkewUpperBound)) { + synchronized (lockItem) { + return upsertAndMonitorExpiredLock(options, key, + sortKey, deleteLockOnRelease, sessionMonitor, + existingLock, newLockData, item, + recordVersionNumber, lastTouchedAt); + } + } + } + } + + /* + * The lock is being held by someone and is still not expired. And the caller explicitly said not to perform a blocking wait; + * We will throw back a lock not grant exception, so that the caller can retry if needed. + * + * Note that if we don't provide an upper clock skew error bound then we will NEED to wait to acquire a lock which is + * expired but failed to be released/deleted due to ungraceful shutdown of the owning node. + */ + throw new LockCurrentlyUnavailableException("The lock being requested is being held by another client."); + } //if the existing lock does not exist or exists and is released if (!existingLock.isPresent() && !options.getAcquireOnlyIfLockAlreadyExists()) { return upsertAndMonitorNewLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, - item, recordVersionNumber); + item, recordVersionNumber, lastTouchedAt); } else if (existingLock.isPresent() && existingLock.get().isReleased()) { return upsertAndMonitorReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, - newLockData, item, recordVersionNumber); + newLockData, item, recordVersionNumber, lastTouchedAt); } // we know that we didnt enter the if block above because it returns at the end. // we also know that the existingLock.isPresent() is true @@ -526,7 +560,7 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran /* If the version numbers match, then we can acquire the lock, assuming it has already expired */ if (lockTryingToBeAcquired.isExpired()) { return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item, - recordVersionNumber); + recordVersionNumber, lastTouchedAt); } } else { /* @@ -582,7 +616,8 @@ public boolean hasLock(final String key, final Optional sortKey) { } private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber) { + Optional sessionMonitor, Optional existingLock, Optional newLockData, Map item, String recordVersionNumber, + Optional lastTouchedAt) { final String conditionalExpression; final Map expressionAttributeValues = new HashMap<>(); final boolean updateExistingLockRecord = options.getUpdateExistingLockRecord(); @@ -609,19 +644,20 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); - return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); + return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest, lastTouchedAt); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName).conditionExpression(conditionalExpression) .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); - return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); + return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest, lastTouchedAt); } } private LockItem upsertAndMonitorReleasedLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, Optional sessionMonitor, Optional existingLock, Optional - newLockData, Map item, String recordVersionNumber) { + newLockData, Map item, String recordVersionNumber, + Optional lastTouchedAt) { final String conditionalExpression; final boolean updateExistingLockRecord = options.getUpdateExistingLockRecord(); @@ -667,26 +703,27 @@ private LockItem upsertAndMonitorReleasedLock(AcquireLockOptions options, String .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring an existing released whose revisionVersionNumber did not change for " + partitionKeyName + " " + "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); - return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); + return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest, lastTouchedAt); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName).conditionExpression(conditionalExpression) .expressionAttributeNames(expressionAttributeNames).expressionAttributeValues(expressionAttributeValues).build(); logger.trace("Acquiring an existing released lock whose revisionVersionNumber did not change for " + partitionKeyName + " " + "partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey); - return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); + return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest, lastTouchedAt); } } private LockItem updateItemAndStartSessionMonitor(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional newLockData, String recordVersionNumber, UpdateItemRequest updateItemRequest) { + Optional sessionMonitor, Optional newLockData, String recordVersionNumber, + UpdateItemRequest updateItemRequest, Optional lastTouchedAt) { final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime(); this.dynamoDB.updateItem(updateItemRequest); final LockItem lockItem = new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime, - recordVersionNumber, !IS_RELEASED_INDICATOR, sessionMonitor, options.getAdditionalAttributes()); + recordVersionNumber, !IS_RELEASED_INDICATOR, sessionMonitor, options.getAdditionalAttributes(), lastTouchedAt); this.locks.put(lockItem.getUniqueIdentifier(), lockItem); this.tryAddSessionMonitor(lockItem.getUniqueIdentifier(), lockItem); return lockItem; @@ -694,19 +731,22 @@ private LockItem updateItemAndStartSessionMonitor(AcquireLockOptions options, St /** * This method puts a new lock item in the lock table and returns an optionally monitored LockItem object - * @param options a wrapper of RequestMetricCollector and an "additional attributes" map - * @param key the partition key of the lock to write - * @param sortKey the optional sort key of the lock to write + * + * @param options a wrapper of RequestMetricCollector and an "additional attributes" map + * @param key the partition key of the lock to write + * @param sortKey the optional sort key of the lock to write * @param deleteLockOnRelease whether or not to delete the lock when releasing it - * @param sessionMonitor the optional session monitor to start for this lock - * @param newLockData the new lock data - * @param item the lock item to write to the lock table + * @param sessionMonitor the optional session monitor to start for this lock + * @param newLockData the new lock data + * @param item the lock item to write to the lock table * @param recordVersionNumber the rvn to condition the PutItem call on. + * @param lastTouchedAt the last wall clock time in ms that this was updated * @return a new monitored LockItem */ private LockItem upsertAndMonitorNewLock(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, Optional sessionMonitor, - Optional newLockData, Map item, String recordVersionNumber) { + Optional newLockData, Map item, String recordVersionNumber, + Optional lastTouchedAt) { final Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, this.partitionKeyName); @@ -733,7 +773,7 @@ private LockItem upsertAndMonitorNewLock(AcquireLockOptions options, String key, .updateExpression(updateExpression).expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues).conditionExpression(conditionalExpression).build(); logger.trace("Acquiring a new lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey); - return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest); + return updateItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, updateItemRequest, lastTouchedAt); } else { final PutItemRequest putItemRequest = PutItemRequest.builder().item(item).tableName(tableName) .conditionExpression(conditionalExpression) @@ -743,25 +783,26 @@ private LockItem upsertAndMonitorNewLock(AcquireLockOptions options, String key, * sooner than it actually will, so they start counting towards its expiration before the Put succeeds */ logger.trace("Acquiring a new lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey); - return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest); + return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest, lastTouchedAt); } } private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, String key, Optional sortKey, boolean deleteLockOnRelease, - Optional sessionMonitor, Optional newLockData, String recordVersionNumber, PutItemRequest putItemRequest) { + Optional sessionMonitor, Optional newLockData, String recordVersionNumber, PutItemRequest putItemRequest, + Optional lastTouchedAt) { final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime(); this.dynamoDB.putItem(putItemRequest); final LockItem lockItem = new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime, - recordVersionNumber, false, sessionMonitor, options.getAdditionalAttributes()); + recordVersionNumber, false, sessionMonitor, options.getAdditionalAttributes(), lastTouchedAt); this.locks.put(lockItem.getUniqueIdentifier(), lockItem); this.tryAddSessionMonitor(lockItem.getUniqueIdentifier(), lockItem); return lockItem; } /** - * Builds an updateExpression for all fields in item map and updates the correspoding expression attribute name and + * Builds an updateExpression for all fields in item map and updates the corresponding expression attribute name and * value maps. * @param item Map of Name and AttributeValue to update or create * @param expressionAttributeNames @@ -1011,6 +1052,10 @@ private LockItem createLockItem(final GetLockOptions options, final Map lastTouchedAtMs = Optional.ofNullable(item.get(LAST_TOUCHED_AT)).map(value -> { + item.remove(LAST_TOUCHED_AT); + return new AtomicLong(Long.parseLong(value.s())); + }); /* * The person retrieving the lock in DynamoDB should err on the side of @@ -1025,7 +1070,7 @@ private LockItem createLockItem(final GetLockOptions options, final Map additionalAttributes; + private final Optional lastTouchedAt; private final Optional sessionMonitor; @@ -63,7 +64,8 @@ public class LockItem implements Closeable { * @param deleteLockItemOnClose Whether or not to delete the lock item when releasing it * @param ownerName The owner associated with the lock * @param leaseDuration How long the lease for the lock is (in milliseconds) - * @param lastUpdatedTimeInMilliseconds How recently the lock was updated (in milliseconds) + * @param lookupTime How recently the lock was updated (in milliseconds). Not the + * wall clock time. This is backed by nanoTime(). * @param recordVersionNumber The current record version number of the lock -- this is * globally unique and changes each time the lock is updated * @param isReleased Whether the item in DynamoDB is marked as released, but still @@ -72,14 +74,18 @@ public class LockItem implements Closeable { * the lock * @param additionalAttributes Additional attributes that can optionally be stored alongside * the lock + * @param lastTouchedAt The wall clock time (in milliseconds since epoch) that this lease + * was last touched at via heartbeat or acquire. Used when + * clockSkewUpperBound is set. */ LockItem(final AmazonDynamoDBLockClient client, final String partitionKey, final Optional sortKey, final Optional data, final boolean deleteLockItemOnClose, - final String ownerName, final long leaseDuration, final long lastUpdatedTimeInMilliseconds, final String recordVersionNumber, final boolean isReleased, - final Optional sessionMonitor, final Map additionalAttributes) { + final String ownerName, final long leaseDuration, final long lookupTime, final String recordVersionNumber, final boolean isReleased, + final Optional sessionMonitor, final Map additionalAttributes, final Optional lastTouchedAt) { Objects.requireNonNull(partitionKey, "Cannot create a lock with a null key"); Objects.requireNonNull(ownerName, "Cannot create a lock with a null owner"); Objects.requireNonNull(sortKey, "Cannot create a lock with a null sortKey (use Optional.empty())"); Objects.requireNonNull(data, "Cannot create a lock with a null data (use Optional.empty())"); + Objects.requireNonNull(lastTouchedAt, "Cannot create a lock with a null lastTouchedAt (use Optional.empty())"); this.client = client; this.partitionKey = partitionKey; this.sortKey = sortKey; @@ -88,11 +94,12 @@ public class LockItem implements Closeable { this.deleteLockItemOnClose = deleteLockItemOnClose; this.leaseDuration = new AtomicLong(leaseDuration); - this.lookupTime = new AtomicLong(lastUpdatedTimeInMilliseconds); + this.lookupTime = new AtomicLong(lookupTime); this.recordVersionNumber = new StringBuffer(recordVersionNumber); this.isReleased = isReleased; this.sessionMonitor = sessionMonitor; this.additionalAttributes = additionalAttributes; + this.lastTouchedAt = lastTouchedAt; } /** @@ -130,6 +137,14 @@ public Map getAdditionalAttributes() { return this.additionalAttributes; } + /** + * Returns the last touched at time since heartbeat or initial lock acquisition. + * + * @return The last touched at milliseconds since epoch + */ + public Optional getLastTouchedAt() { + return this.lastTouchedAt; + } /** * Returns the name of the owner that owns this lock. @@ -197,9 +212,9 @@ public String toString() { .orElse(""); return String .format("LockItem{Partition Key=%s, Sort Key=%s, Owner Name=%s, Lookup Time=%d, Lease Duration=%d, " - + "Record Version Number=%s, Delete On Close=%s, Data=%s, Is Released=%s}", + + "Record Version Number=%s, Delete On Close=%s, Data=%s, Is Released=%s, Last touched at=%s}", this.partitionKey, this.sortKey, this.ownerName, this.lookupTime.get(), this.leaseDuration.get(), this.recordVersionNumber, this.deleteLockItemOnClose, - dataString, this.isReleased); + dataString, this.isReleased, this.lastTouchedAt); } /** @@ -236,6 +251,27 @@ public boolean isExpired() { return LockClientUtils.INSTANCE.millisecondTime() - this.lookupTime.get() > this.leaseDuration.get(); } + /** + * Returns whether the lock is expired, based on if the lock hasn't been touched since the lease duration plus + * cloud skew error threshold. + * + * @param clockSkewUpperBound the amount of time to add to the lease duration to account for clock skew precision errors + * @return True if the lock is expired, false otherwise + */ + public boolean isExpired(Long clockSkewUpperBound) { + if (this.isReleased) { + return true; + } + + if (this.lastTouchedAt.isPresent()) { + long currentTime = System.currentTimeMillis(); + // If the lock hasn't been touched since the lease duration plus error bound then it's ours! + return currentTime > this.lastTouchedAt.get().get() + this.leaseDuration.get() + clockSkewUpperBound; + } + + return false; + } + /** * Returns whether or not the lock was marked as released when loaded from DynamoDB. Does not consider expiration time. * @@ -298,6 +334,14 @@ void updateRecordVersionNumber(final String recordVersionNumber, final long last this.leaseDuration.set(leaseDurationToEnsureInMilliseconds); } + /* + * Updates the last touched at field of the lock. This method is package private -- it should only be called by the lock + * client. + */ + void updateLastTouchedAt(long lastTouchedAt) { + this.lastTouchedAt.ifPresent(value -> value.set(lastTouchedAt)); + } + /* * Updates the data of the lock. This method is package private -- it should only be called by the lock * client. diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java index 0480d56..5efc853 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientTest.java @@ -454,7 +454,7 @@ public void sendHeartbeat_whenDeleteDataTrueAndDataNotNull_throwsIllegalArgument AmazonDynamoDBLockClient client = getLockClient(); LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, 2L, "rvn", false, - Optional.empty(), null); + Optional.empty(), null, Optional.empty()); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(true).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -465,7 +465,7 @@ public void sendHeartbeat_whenExpired_throwsLockNotGrantedException() { long lastUpdatedTimeInMilliseconds = 2l; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", false, Optional.empty(), null, Optional.empty()); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -476,7 +476,7 @@ public void sendHeartbeat_whenNotExpiredAndDifferentOwner_throwsLockNotGrantedEx long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, "different owner", 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", false, Optional.empty(), null, Optional.empty()); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -487,7 +487,7 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedTrue_throwsLockNot long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", true, Optional.empty(), null); + "rvn", true, Optional.empty(), null, Optional.empty()); client.sendHeartbeat(SendHeartbeatOptions.builder(item).withDeleteData(null).withData(ByteBuffer.wrap("data".getBytes())).build()); } @@ -498,7 +498,7 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedFalse_setsRequestM long lastUpdatedTimeInMilliseconds = Long.MAX_VALUE; LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", false, Optional.empty(), null, Optional.empty()); client.sendHeartbeat(SendHeartbeatOptions.builder(item) .withDeleteData(null) .withData(ByteBuffer.wrap("data".getBytes())) @@ -513,7 +513,7 @@ public void sendHeartbeat_whenNotExpired_andSameOwner_releasedFalse_deleteDataFa String partitionKey = "partition_key"; LockItem item = new LockItem(client, partitionKey, Optional.empty(), Optional.of(ByteBuffer.wrap("data1".getBytes())), false, uuid.toString(), 1L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", false, Optional.empty(), null, Optional.empty()); assertTrue(item.getData().isPresent()); ByteBuffer updated = ByteBuffer.wrap("data2".getBytes()); client.sendHeartbeat(SendHeartbeatOptions.builder(item) @@ -536,7 +536,7 @@ public void sendHeartbeat_whenServiceUnavailable_andHoldLockOnServiceUnavailable long lastUpdatedTimeInMilliseconds = LockClientUtils.INSTANCE.millisecondTime(); LockItem item = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), 10000L, lastUpdatedTimeInMilliseconds, - "rvn", false, Optional.empty(), null); + "rvn", false, Optional.empty(), null, Optional.empty()); AwsServiceException amazonServiceException = null; try { @@ -564,7 +564,7 @@ public void sendHeartbeat_whenServiceUnavailable_andHoldLockOnServiceUnavailable long lastUpdatedTimeInMilliseconds = LockClientUtils.INSTANCE.millisecondTime(); LockItem lockItem = new LockItem(client, "a", Optional.empty(), Optional.of(ByteBuffer.wrap("data".getBytes())), false, uuid.toString(), leaseDuration, lastUpdatedTimeInMilliseconds, - recordVersionNumber, false, Optional.empty(), null); + recordVersionNumber, false, Optional.empty(), null, Optional.empty()); // Setting up a spy mock to inspect the method on lockItem object created above LockItem lockItemSpy = PowerMockito.spy(lockItem); diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java b/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java index d485bcf..0957ec4 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/BasicLockClientTests.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.dynamodbv2; +import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException; import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException; import com.amazonaws.services.dynamodbv2.model.LockTableDoesNotExistException; import com.amazonaws.services.dynamodbv2.model.SessionMonitorNotSetException; @@ -45,6 +46,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; @@ -1486,7 +1488,7 @@ private void testInvalidAttribute(final String invalidAttribute) throws LockNotG public void testLockItemToString() throws LockNotGrantedException, InterruptedException { final LockItem lockItem = this.lockClient.acquireLock(ACQUIRE_LOCK_OPTIONS_TEST_KEY_1); final Pattern p = Pattern.compile("LockItem\\{Partition Key=testKey1, Sort Key=Optional.empty, Owner Name=" + INTEGRATION_TESTER + ", Lookup Time=\\d+, Lease Duration=3000, " - + "Record Version Number=\\w+-\\w+-\\w+-\\w+-\\w+, Delete On Close=true, Data=" + TEST_DATA + ", Is Released=false\\}"); + + "Record Version Number=\\w+-\\w+-\\w+-\\w+-\\w+, Delete On Close=true, Data=" + TEST_DATA + ", Is Released=false, Last touched at=Optional\\.empty\\}"); assertTrue(p.matcher(lockItem.toString()).matches()); } @@ -1689,6 +1691,96 @@ public void testSetRequestLevelMetricCollector() throws InterruptedException, IO final PutItemRequest putRequest = putRequestCaptor.getValue(); } + @Test + public void testUpperClockSkewErrorBoundWithNoHeartbeats() throws InterruptedException { + final long leaseDuration = 1_000; + final long clockSkewErrorBound = 500L; + final String partition = "super_key_LOCK"; + + AcquireLockOptions lockOptions = AcquireLockOptions + .builder(partition) + .withAcquireReleasedLocksConsistently(true) + .withShouldSkipBlockingWait(true) + .withClockSkewUpperBound(clockSkewErrorBound) + .build(); + + final AmazonDynamoDBLockClient lockClientOne = new AmazonDynamoDBLockClient( + new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER_2) + .withLeaseDuration(leaseDuration) + .withTimeUnit(TimeUnit.MILLISECONDS) + .withCreateHeartbeatBackgroundThread(false) + .build()); + + final AmazonDynamoDBLockClient lockClientTwo = new AmazonDynamoDBLockClient( + new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER_2) + .withLeaseDuration(leaseDuration) + .withTimeUnit(TimeUnit.MILLISECONDS) + .withCreateHeartbeatBackgroundThread(false) + .build()); + + // Acquire lock successfully. Note the lack of heartbeats. + LockItem lockItem = lockClientOne.acquireLock(lockOptions); + assertNotNull(lockItem); + + // Fail to acquire as we have only waited half the upper error bound. + Thread.sleep(leaseDuration + clockSkewErrorBound / 2); + assertThrows(LockCurrentlyUnavailableException.class, () -> lockClientTwo.acquireLock(lockOptions)); + + // Success since we've waited 1.5 * the error bound now! + Thread.sleep(clockSkewErrorBound); + lockItem = lockClientTwo.acquireLock(lockOptions); + assertNotNull(lockItem); + } + + @Test + public void testUpperClockSkewErrorBoundWithHeartbeats() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + final long leaseDuration = 1_000; + final long clockSkewErrorBound = 500L; + final String partition = "super_key_LOCK"; + + AcquireLockOptions lockOptions = AcquireLockOptions + .builder(partition) + .withAcquireReleasedLocksConsistently(true) + .withShouldSkipBlockingWait(true) + .withClockSkewUpperBound(clockSkewErrorBound) + .build(); + + final AmazonDynamoDBLockClient lockClientOne = new AmazonDynamoDBLockClient( + new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER_2) + .withLeaseDuration(leaseDuration) + .withTimeUnit(TimeUnit.MILLISECONDS) + .withCreateHeartbeatBackgroundThread(false) + .build()); + + final AmazonDynamoDBLockClient lockClientTwo = new AmazonDynamoDBLockClient( + new AmazonDynamoDBLockClientOptions.AmazonDynamoDBLockClientOptionsBuilder(this.dynamoDBMock, TABLE_NAME, INTEGRATION_TESTER_2) + .withLeaseDuration(leaseDuration) + .withTimeUnit(TimeUnit.MILLISECONDS) + .withCreateHeartbeatBackgroundThread(false) + .build()); + + // Acquire lock successfully. Note the lack of an implicit heartbeat background thread! + // We send them explicitly below. + LockItem lockItem = lockClientOne.acquireLock(lockOptions); + assertNotNull(lockItem); + + // Sleep a total of 2 lease durations. + Thread.sleep(leaseDuration / 2); + lockItem.sendHeartBeat(); + Thread.sleep(leaseDuration / 2); + lockItem.sendHeartBeat(); + Thread.sleep(leaseDuration / 2); + lockItem.sendHeartBeat(); + Thread.sleep(leaseDuration / 2); + // We need to wait another lease duration / 2 + clock skew error bound for another client + // to acquire the lock! + assertThrows(LockCurrentlyUnavailableException.class, () -> lockClientTwo.acquireLock(lockOptions)); + + Thread.sleep(leaseDuration); + lockItem = lockClientTwo.acquireLock(lockOptions); + assertNotNull(lockItem); + } + private LockItem getShortLeaseLock() throws InterruptedException { return this.shortLeaseLockClient.acquireLock(ACQUIRE_LOCK_OPTIONS_TEST_KEY_1); } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java index ca1dc7a..1983c66 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java @@ -70,7 +70,7 @@ public void equals_differentPartitionKey_returnFalse() { "recordVersionNumber", false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor - new HashMap<>()))); + new HashMap<>(), Optional.empty()))); } @Test @@ -85,7 +85,7 @@ public void equals_differentOwner_returnFalse() { "recordVersionNumber", false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor - new HashMap<>()))); + new HashMap<>(), Optional.empty()))); } @Test @@ -105,7 +105,7 @@ public void isExpired_whenIsReleasedTrue_returnTrue() { "recordVersionNumber", true, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor - new HashMap<>()).isExpired()); + new HashMap<>(), Optional.empty()).isExpired()); } @Test @@ -125,7 +125,7 @@ public void ensure_whenIsReleasedTrue_throwsLockNotGrantedException() { "recordVersionNumber", true, //released Optional.empty(), //session monitor - new HashMap<>()).ensure(2L, TimeUnit.MILLISECONDS); + new HashMap<>(), Optional.empty()).ensure(2L, TimeUnit.MILLISECONDS); } @Test(expected = IllegalStateException.class) @@ -140,7 +140,7 @@ public void millisecondsUntilDangerZoneEntered_whenIsReleasedTrue_throwsIllegalS "recordVersionNumber", true, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor - new HashMap<>()).millisecondsUntilDangerZoneEntered(); + new HashMap<>(), Optional.empty()).millisecondsUntilDangerZoneEntered(); } @Test @@ -167,7 +167,7 @@ public void hasCallback_sessionMonitorNotPresent_throwSessionMonitorNotSetExcept "recordVersionNumber", false, //released Optional.empty(), //session monitor - new HashMap<>()).hasCallback(); + new HashMap<>(), Optional.empty()).hasCallback(); } @Test @@ -192,6 +192,6 @@ static LockItem createLockItem(AmazonDynamoDBLockClient lockClient) { "recordVersionNumber", false, //released Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor - new HashMap<>()); //additional attributes + new HashMap<>(), Optional.empty()); //additional attributes } }