Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AcquireLockOptions {
private final Boolean acquireReleasedLocksConsistently;
private final Optional<SessionMonitor> sessionMonitor;
private final Boolean reentrant;
private final Optional<Long> clockSkewUpperBound;

/**
* Setting this flag to true will prevent the thread from being blocked (put to sleep) for the lease duration and
Expand Down Expand Up @@ -72,6 +73,7 @@ public static class AcquireLockOptionsBuilder {
private Boolean updateExistingLockRecord;
private Boolean acquireReleasedLocksConsistently;
private Boolean reentrant;
private Optional<Long> clockSkewUpperBound;

private long safeTimeWithoutHeartbeat;
private Optional<Runnable> sessionMonitorCallback;
Expand All @@ -90,6 +92,7 @@ public static class AcquireLockOptionsBuilder {
this.shouldSkipBlockingWait = false;
this.acquireReleasedLocksConsistently = false;
this.reentrant = false;
this.clockSkewUpperBound = Optional.empty();
}

/**
Expand Down Expand Up @@ -256,6 +259,22 @@ public AcquireLockOptionsBuilder withReentrant(final boolean reentrant) {
return this;
}

/**
* In combination with withShouldSkipBlockingWait(true) this allows a node to rely on a lastTouchedAt value on the DynamoDb
* entry to take over locks which have expired but have not been deleted or marked as released within DynamoDb (which can occur
* due to an ungraceful shutdown of the owning node).
*
* It's critically important that this error bound is accurate to the nodes that are relying on the lock client. If not,
* correctness problems can occur.
*
* @param clockSkewUpperBound the upper error bound in milliseconds of clock skew across the nodes running this client
* @return a reference to this builder for fluent method chaining
*/
public AcquireLockOptionsBuilder withClockSkewUpperBound(final Long clockSkewUpperBound) {
this.clockSkewUpperBound = Optional.ofNullable(clockSkewUpperBound);
return this;
}

/**
* <p>
* Registers a "SessionMonitor."
Expand Down Expand Up @@ -333,7 +352,7 @@ public AcquireLockOptions build() {
}
return new AcquireLockOptions(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists,
this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, sessionMonitor,
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound);
}

@Override
Expand All @@ -342,7 +361,8 @@ public String toString() {
+ this.replaceData + ", deleteLockOnRelease=" + this.deleteLockOnRelease + ", refreshPeriod=" + this.refreshPeriod + ", additionalTimeToWaitForLock="
+ this.additionalTimeToWaitForLock + ", timeUnit=" + this.timeUnit + ", additionalAttributes=" + this.additionalAttributes + ", safeTimeWithoutHeartbeat="
+ this.safeTimeWithoutHeartbeat + ", sessionMonitorCallback=" + this.sessionMonitorCallback + ", acquireReleasedLocksConsistently="
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ")";
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ", this.clockSkewUpperBound=" + this.clockSkewUpperBound
+ ")";
}
}

Expand All @@ -360,7 +380,8 @@ public static AcquireLockOptionsBuilder builder(final String partitionKey) {
private AcquireLockOptions(final String partitionKey, final Optional<String> sortKey, final Optional<ByteBuffer> data, final Boolean replaceData,
final Boolean deleteLockOnRelease, final Boolean acquireOnlyIfLockAlreadyExists, final Long refreshPeriod, final Long additionalTimeToWaitForLock,
final TimeUnit timeUnit, final Map<String, AttributeValue> additionalAttributes, final Optional<SessionMonitor> sessionMonitor,
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant) {
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant,
final Optional<Long> clockSkewUpperBound) {
this.partitionKey = partitionKey;
this.sortKey = sortKey;
this.data = data;
Expand All @@ -376,6 +397,7 @@ private AcquireLockOptions(final String partitionKey, final Optional<String> sor
this.shouldSkipBlockingWait = shouldSkipBlockingWait;
this.acquireReleasedLocksConsistently = acquireReleasedLocksConsistently;
this.reentrant = reentrant;
this.clockSkewUpperBound = clockSkewUpperBound;
}

String getPartitionKey() {
Expand Down Expand Up @@ -424,6 +446,8 @@ Boolean getReentrant() {
return this.reentrant;
}

Optional<Long> getClockSkewUpperBound() { return this.clockSkewUpperBound; }

Map<String, AttributeValue> getAdditionalAttributes() {
return this.additionalAttributes;
}
Expand Down Expand Up @@ -460,15 +484,16 @@ public boolean equals(final Object other) {
&& Objects.equals(this.updateExistingLockRecord, otherOptions.updateExistingLockRecord)
&& Objects.equals(this.shouldSkipBlockingWait, otherOptions.shouldSkipBlockingWait)
&& Objects.equals(this.acquireReleasedLocksConsistently, otherOptions.acquireReleasedLocksConsistently)
&& Objects.equals(this.reentrant, otherOptions.reentrant);
&& Objects.equals(this.reentrant, otherOptions.reentrant)
&& Objects.equals(this.clockSkewUpperBound, otherOptions.clockSkewUpperBound);
}

@Override
public int hashCode() {
return Objects.hash(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease,
this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit,
this.additionalAttributes, this.sessionMonitor, this.updateExistingLockRecord,
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound);

}

Expand Down
Loading