Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -255,6 +256,107 @@ public void readFeedDocumentsStartFromCustomDate() throws InterruptedException {
}
}

@Test(groups = { "query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT)
public void verifyConsistentTimestamps() throws InterruptedException {
CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);

try {
List<InternalObjectNode> createdDocuments = new ArrayList<>();
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.handleLatestVersionChanges((List<ChangeFeedProcessorItem> docs) -> {
logger.info("START processing from thread {}", Thread.currentThread().getId());
for (ChangeFeedProcessorItem item : docs) {
processItem(item, receivedDocuments);
}
logger.info("END processing from thread {}", Thread.currentThread().getId());
})
.feedContainer(createdFeedCollection)
.leaseContainer(createdLeaseCollection)
.options(new ChangeFeedProcessorOptions()
.setLeaseRenewInterval(Duration.ofSeconds(20))
.setLeaseAcquireInterval(Duration.ofSeconds(10))
.setLeaseExpirationInterval(Duration.ofSeconds(30))
.setFeedPollDelay(Duration.ofSeconds(1))
.setLeasePrefix("TEST")
.setMaxItemCount(10)
.setMinScaleCount(1)
.setMaxScaleCount(3)
)
.buildChangeFeedProcessor();
AtomicReference<String> initialTimestamp = new AtomicReference<>();

startChangeFeedProcessor(changeFeedProcessor);

assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();

safeStopChangeFeedProcessor(changeFeedProcessor);

createdLeaseCollection.queryItems("SELECT * FROM c", new CosmosQueryRequestOptions(), JsonNode.class)
.byPage()
.flatMap(feedResponse -> {
for (JsonNode item : feedResponse.getResults()) {
if (item.get("timestamp") != null) {
initialTimestamp.set(item.get("timestamp").asText());
logger.info("Found timestamp: %s", initialTimestamp);
}
}
return Mono.empty();
}).blockLast();



startChangeFeedProcessor(changeFeedProcessor);

// create a gap between previously written documents
Thread.sleep(3000);

// process some documents after the CFP is started and stopped
setupReadFeedDocuments(createdDocuments, createdFeedCollection, FEED_COUNT);

// Wait for the feed processor to receive and process the documents.
waitToReceiveDocuments(receivedDocuments, 40 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT);
safeStopChangeFeedProcessor(changeFeedProcessor);

logger.info("After processing documents");

AtomicReference<String> newTimestamp = new AtomicReference<>();


createdLeaseCollection.queryItems("SELECT * FROM c", new CosmosQueryRequestOptions(), JsonNode.class)
.byPage()
.flatMap(feedResponse -> {
for (JsonNode item : feedResponse.getResults()) {
if (item.get("timestamp") != null) {
newTimestamp.set(item.get("timestamp").asText());
logger.info("Found timestamp: %s", newTimestamp);
}
}
return Mono.empty();
}).blockLast();


assertThat(newTimestamp.get()).doesNotContain("[UTC]");
assertThat(initialTimestamp.get()).doesNotContain("[UTC]");

for (InternalObjectNode item : createdDocuments) {
assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue();
}

// Wait for the feed processor to shutdown.
Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT);

} finally {
safeDeleteCollection(createdFeedCollection);
safeDeleteCollection(createdLeaseCollection);

// Allow some time for the collections to be deleted before exiting.
Thread.sleep(500);
}
}

@Test(groups = {"multi-master"}, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT)
public void readFeedDocumentsStartFromCustomDateForMultiWrite_test() throws InterruptedException {
CosmosClientBuilder clientBuilder = getClientBuilder();
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Changed to use `PartitionKeyRangeCache` to get partition key range during startup and split handling. - [46700](https://github.com/Azure/azure-sdk-for-java/pull/46700)
* Changed to use lower casing http header names for gateway response. - [46736](https://github.com/Azure/azure-sdk-for-java/pull/46736)
* Improved resilience around several completion events for an ssl handshake. - [PR 46734](https://github.com/Azure/azure-sdk-for-java/pull/46734)
* Changed timestamp format to be consistent in leases for CFP. - [PR 46784](https://github.com/Azure/azure-sdk-for-java/pull/46784)
* Added `MetadataThrottlingRetryPolicy` for `PartitionKeyRange` `RequestRateTooLargeException` handling. - [PR 46823](https://github.com/Azure/azure-sdk-for-java/pull/46823)

### 4.74.0 (2025-09-05)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ServiceItemLeaseV1 implements Lease {

public ServiceItemLeaseV1() {
ZonedDateTime currentTime = ZonedDateTime.now(ZoneId.of("UTC"));
this.timestamp = currentTime.toString();
this.timestamp = currentTime.toInstant().toString();
this._ts = String.valueOf(currentTime.getSecond());
this.properties = new HashMap<>();
// By default, this is EPK_RANGE_BASED_LEASE version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ServiceItemLease implements Lease {

public ServiceItemLease() {
ZonedDateTime currentTime = ZonedDateTime.now(ZoneId.of("UTC"));
this.timestamp = currentTime.toString();
this.timestamp = currentTime.toInstant().toString();
this._ts = String.valueOf(currentTime.getSecond());
this.properties = new HashMap<>();
}
Expand Down