Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ You can read the data in the lock without acquiring it, and find out who owns th
LockItem lock = lockClient.getLock("Moe");
```

### Fencing
By default the lock client relies on lock holders to ensure that they have finished all of their work before their
lease expires. This can be [hard to guarantee](https://fpj.me/2016/02/10/note-on-fencing-and-distributed-locks/)
when you account for things like thread schedulers, garbage collectors, and network latency. To protect against
corruption in the event that a request from an "old" lock holder arrives after its lease has expired and the lock
has been acquired by a different system, the lock client can optionally associate a unique, monotonically-increasing
**sequence id** with each lease:

```java
AmazonDynamoDBLockClient client = new AmazonDynamoDBLockClient(
AmazonDynamoDBLockClientOptions.builder(dynamoDB, "lockTable")
.withSequenceIdTracking(true)
.withLeaseDuration(10L)
.withTimeUnit(TimeUnit.SECONDS)
.build());

try (LockItem lock = client.acquireLock(AcquireLockOptions.builder("GloriousLeader").build())) {
// Simulate a particularly nasty garbage collection pause.
Thread.sleep(30 * 1000);

// Send a message to all of our followers; they can ignore it if they've already heard from a
// leader with a later sequence id.
for (String follower : getFollowers()) {
sendMessage(follower, "Hello from leader " + lock.getSequenceId() + "!");
}
}
```

## How we handle clock skew
The lock client never stores absolute times in DynamoDB -- only the relative "lease duration" time is stored
in DynamoDB. The way locks are expired is that a call to acquireLock reads in the current lock, checks the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class AcquireLockOptions {
private final Optional<String> sortKey;
private final Optional<ByteBuffer> data;
private final Boolean replaceData;
private final Boolean deleteLockOnRelease;
private final Optional<Boolean> deleteLockOnRelease;
private final Long refreshPeriod;
private final Long additionalTimeToWaitForLock;
private final TimeUnit timeUnit;
Expand All @@ -53,7 +53,7 @@ public static class AcquireLockOptionsBuilder {
private Optional<String> sortKey;
private Optional<ByteBuffer> data;
private Boolean replaceData;
private Boolean deleteLockOnRelease;
private Optional<Boolean> deleteLockOnRelease;
private Long refreshPeriod;
private Long additionalTimeToWaitForLock;
private TimeUnit timeUnit;
Expand All @@ -71,7 +71,7 @@ public static class AcquireLockOptionsBuilder {
this.requestMetricCollector = Optional.empty();
this.data = Optional.empty();
this.replaceData = true;
this.deleteLockOnRelease = true;
this.deleteLockOnRelease = Optional.empty();
}

/**
Expand Down Expand Up @@ -116,7 +116,7 @@ public AcquireLockOptionsBuilder withReplaceData(final Boolean replaceData) {
* @return a reference to this builder for fluent method chaining
*/
public AcquireLockOptionsBuilder withDeleteLockOnRelease(final Boolean deleteLockOnRelease) {
this.deleteLockOnRelease = deleteLockOnRelease;
this.deleteLockOnRelease = Optional.ofNullable(deleteLockOnRelease);
return this;
}

Expand Down Expand Up @@ -274,7 +274,7 @@ public static AcquireLockOptionsBuilder builder(final String partitionKey) {
}

private AcquireLockOptions(final String partitionKey, final Optional<String> sortKey, final Optional<ByteBuffer> data, final Boolean replaceData,
final Boolean deleteLockOnRelease, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit,
final Optional<Boolean> deleteLockOnRelease, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit,
final Map<String, AttributeValue> additionalAttributes, final Optional<SessionMonitor> sessionMonitor,
final Optional<RequestMetricCollector> requestMetricCollector) {
this.partitionKey = partitionKey;
Expand Down Expand Up @@ -306,7 +306,7 @@ Boolean getReplaceData() {
return this.replaceData;
}

Boolean getDeleteLockOnRelease() {
Optional<Boolean> getDeleteLockOnRelease() {
return this.deleteLockOnRelease;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable {
private final long leaseDurationInMilliseconds;
private final long heartbeatPeriodInMilliseconds;
private final String ownerName;
private final boolean trackSequenceIds;
private final ConcurrentHashMap<String, LockItem> locks;
private final ConcurrentHashMap<String, Thread> sessionMonitors;
private final Optional<Thread> backgroundThread;
Expand All @@ -196,6 +197,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable {
protected static final String OWNER_NAME = "ownerName";
protected static final String LEASE_DURATION = "leaseDuration";
protected static final String RECORD_VERSION_NUMBER = "recordVersionNumber";
protected static final String SEQUENCE_ID = "sequenceId";
protected static final String IS_RELEASED = "isReleased";
protected static final String IS_RELEASED_VALUE = "1";
protected static final AttributeValue IS_RELEASED_ATTRIBUTE_VALUE = new AttributeValue(IS_RELEASED_VALUE);
Expand Down Expand Up @@ -227,6 +229,7 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna
this.locks = new ConcurrentHashMap<>();
this.sessionMonitors = new ConcurrentHashMap<>();
this.ownerName = amazonDynamoDBLockClientOptions.getOwnerName();
this.trackSequenceIds = amazonDynamoDBLockClientOptions.getTrackSequenceIds();
this.leaseDurationInMilliseconds = amazonDynamoDBLockClientOptions.getTimeUnit().toMillis(amazonDynamoDBLockClientOptions.getLeaseDuration());
this.heartbeatPeriodInMilliseconds = amazonDynamoDBLockClientOptions.getTimeUnit().toMillis(amazonDynamoDBLockClientOptions.getHeartbeatPeriod());
this.partitionKeyName = amazonDynamoDBLockClientOptions.getPartitionKeyName();
Expand Down Expand Up @@ -358,7 +361,8 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran

if (options.getAdditionalAttributes().containsKey(this.partitionKeyName) || options.getAdditionalAttributes().containsKey(OWNER_NAME) || options
.getAdditionalAttributes().containsKey(LEASE_DURATION) || options.getAdditionalAttributes().containsKey(RECORD_VERSION_NUMBER) || options
.getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())) {
.getAdditionalAttributes().containsKey(DATA) || this.sortKeyName.isPresent() && options.getAdditionalAttributes().containsKey(this.sortKeyName.get())
|| this.trackSequenceIds && options.getAdditionalAttributes().containsKey(SEQUENCE_ID)) {
throw new IllegalArgumentException(String
.format("Additional attribute cannot be one of the following types: " + "%s, %s, %s, %s, %s", this.partitionKeyName, OWNER_NAME, LEASE_DURATION,
RECORD_VERSION_NUMBER, DATA));
Expand All @@ -376,7 +380,16 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran
refreshPeriodInMilliseconds = options.getTimeUnit().toMillis(options.getRefreshPeriod());
}

final boolean deleteLockOnRelease = options.getDeleteLockOnRelease();
final boolean deleteLockOnRelease;
if (this.trackSequenceIds) {
if (options.getDeleteLockOnRelease().isPresent() && options.getDeleteLockOnRelease().get()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functional police is here 👮 😄

if (options.getDeleteLockOnRelease().orElse(false)) {

throw new IllegalArgumentException("Cannot set deleteLockOnRelease when sequence id tracking is enabled");
}
deleteLockOnRelease = false;
} else {
deleteLockOnRelease = options.getDeleteLockOnRelease().orElse(true);
}

final boolean replaceData = options.getReplaceData();

final Optional<SessionMonitor> sessionMonitor = options.getSessionMonitor();
Expand Down Expand Up @@ -412,20 +425,32 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran
newLockData = options.getData(); // If there is no existing data, we write the input data to the lock.
}

// If we're tracking sequence IDs, initialize or increment the current value.
Optional<Long> sequenceId = Optional.empty();
if (trackSequenceIds) {
sequenceId = existingLock.flatMap(value -> value.getSequenceId());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

sequenceId = Optional.of(existingLock.flatMap(value -> value.getSequenceId()).orElse(0L) + 1)

Motivation:

  1. make it clear that you fallback to 0L when there was no lock / seq. id
  2. the step (+1) is also clearly defined

if (sequenceId.isPresent()) {
sequenceId = sequenceId.map(value -> value + 1);
} else {
sequenceId = Optional.of(1L);
}
}

final Map<String, AttributeValue> item = new HashMap<>();
item.putAll(options.getAdditionalAttributes());
item.put(this.partitionKeyName, new AttributeValue().withS(key));
item.put(OWNER_NAME, new AttributeValue().withS(this.ownerName));
item.put(LEASE_DURATION, new AttributeValue().withS(String.valueOf(this.leaseDurationInMilliseconds)));
final String recordVersionNumber = this.generateRecordVersionNumber();
item.put(RECORD_VERSION_NUMBER, new AttributeValue().withS(String.valueOf(recordVersionNumber)));
sequenceId.ifPresent(sid -> item.put(SEQUENCE_ID, new AttributeValue().withN(sid.toString())));
sortKeyName.ifPresent(sortKeyName -> item.put(sortKeyName, new AttributeValue().withS(sortKey.get())));
newLockData.ifPresent(byteBuffer -> item.put(DATA, new AttributeValue().withB(byteBuffer)));

//if the existing lock does not exist or exists and is released
if (!existingLock.isPresent() || existingLock.get().isReleased()) {
return upsertAndMonitorNewOrReleasedLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor,
newLockData, item, recordVersionNumber);
newLockData, item, recordVersionNumber, sequenceId);
}
// we know that we didnt enter the if block above because it returns at the end.
// we also know that the existingLock.isPresent() is true
Expand All @@ -447,7 +472,7 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran
/* If the version numbers match, then we can acquire the lock, assuming it has already expired */
if (lockTryingToBeAcquired.isExpired()) {
return upsertAndMonitorExpiredLock(options, key, sortKey, deleteLockOnRelease, sessionMonitor, existingLock, newLockData, item,
recordVersionNumber);
recordVersionNumber, sequenceId);
}
} else {
/*
Expand Down Expand Up @@ -484,7 +509,8 @@ public LockItem acquireLock(final AcquireLockOptions options) throws LockNotGran
}

private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String key, Optional<String> sortKey, boolean deleteLockOnRelease,
Optional<SessionMonitor> sessionMonitor, Optional<LockItem> existingLock, Optional<ByteBuffer> newLockData, Map<String, AttributeValue> item, String recordVersionNumber) {
Optional<SessionMonitor> sessionMonitor, Optional<LockItem> existingLock, Optional<ByteBuffer> newLockData, Map<String, AttributeValue> item, String recordVersionNumber,
Optional<Long> sequenceId) {
final String conditionalExpression;
final Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(RVN_VALUE_EXPRESSION_VARIABLE, new AttributeValue(existingLock.get().getRecordVersionNumber()));
Expand All @@ -503,7 +529,7 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String
.withExpressionAttributeNames(expressionAttributeNames).withExpressionAttributeValues(expressionAttributeValues);

logger.trace("Acquiring an existing lock whose revisionVersionNumber did not change for " + partitionKeyName + " partitionKeyName=" + key + ", " + this.sortKeyName + "=" + sortKey);
return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest);
return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, sequenceId, putItemRequest);
}

/**
Expand All @@ -520,7 +546,7 @@ private LockItem upsertAndMonitorExpiredLock(AcquireLockOptions options, String
*/
private LockItem upsertAndMonitorNewOrReleasedLock(AcquireLockOptions options, String key, Optional<String> sortKey,
boolean deleteLockOnRelease, Optional<SessionMonitor> sessionMonitor,
Optional<ByteBuffer> newLockData, Map<String, AttributeValue> item, String recordVersionNumber) {
Optional<ByteBuffer> newLockData, Map<String, AttributeValue> item, String recordVersionNumber, Optional<Long> sequenceId) {
final Map<String, String> expressionAttributeNames = new HashMap<>();
expressionAttributeNames.put(PK_PATH_EXPRESSION_VARIABLE, this.partitionKeyName);
expressionAttributeNames.put(IS_RELEASED_PATH_EXPRESSION_VARIABLE, IS_RELEASED);
Expand All @@ -534,11 +560,12 @@ private LockItem upsertAndMonitorNewOrReleasedLock(AcquireLockOptions options, S
* sooner than it actually will, so they start counting towards its expiration before the Put succeeds
*/
logger.trace("Acquiring a new lock or an existing yet released lock on " + partitionKeyName + "=" + key + ", " + this.sortKeyName + "=" + sortKey);
return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, putItemRequest);
return putLockItemAndStartSessionMonitor(options, key, sortKey, deleteLockOnRelease, sessionMonitor, newLockData, recordVersionNumber, sequenceId, putItemRequest);
}

private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, String key, Optional<String> sortKey, boolean deleteLockOnRelease,
Optional<SessionMonitor> sessionMonitor, Optional<ByteBuffer> newLockData, String recordVersionNumber, PutItemRequest putItemRequest) {
Optional<SessionMonitor> sessionMonitor, Optional<ByteBuffer> newLockData, String recordVersionNumber, Optional<Long> sequenceId,
PutItemRequest putItemRequest) {
final long lastUpdatedTime = LockClientUtils.INSTANCE.millisecondTime();
if (options.getRequestMetricCollector().isPresent()) {
putItemRequest.setRequestMetricCollector(options.getRequestMetricCollector().get());
Expand All @@ -547,7 +574,7 @@ private LockItem putLockItemAndStartSessionMonitor(AcquireLockOptions options, S

final LockItem lockItem =
new LockItem(this, key, sortKey, newLockData, deleteLockOnRelease, this.ownerName, this.leaseDurationInMilliseconds, lastUpdatedTime,
recordVersionNumber, false, sessionMonitor, options.getAdditionalAttributes());
recordVersionNumber, sequenceId, false, sessionMonitor, options.getAdditionalAttributes());
this.locks.put(lockItem.getUniqueIdentifier(), lockItem);
this.tryAddSessionMonitor(lockItem.getUniqueIdentifier(), lockItem);
return lockItem;
Expand Down Expand Up @@ -760,6 +787,10 @@ private LockItem createLockItem(final GetLockOptions options, final Map<String,
final AttributeValue leaseDuration = item.remove(LEASE_DURATION);
final AttributeValue recordVersionNumber = item.remove(RECORD_VERSION_NUMBER);

final Optional<Long> sequenceId = trackSequenceIds
? Optional.ofNullable(item.remove(SEQUENCE_ID)).map(value -> Long.valueOf(value.getN()))
: Optional.empty();

final boolean isReleased = item.containsKey(IS_RELEASED);
item.remove(IS_RELEASED);
item.remove(this.partitionKeyName);
Expand All @@ -777,7 +808,9 @@ private LockItem createLockItem(final GetLockOptions options, final Map<String,
options.isDeleteLockOnRelease(),
ownerName.getS(),
Long.parseLong(leaseDuration.getS()), lookupTime,
recordVersionNumber.getS(), isReleased, Optional.empty(), item);
recordVersionNumber.getS(),
sequenceId,
isReleased, Optional.empty(), item);
return lockItem;
}

Expand Down
Loading