diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 82fb15a61575..a543309ef37a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -88,6 +88,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -255,6 +256,107 @@ public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { } } + @Test(groups = { "query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) + public void verifyConsistentTimestamps() throws InterruptedException { + CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .handleLatestVersionChanges((List docs) -> { + logger.info("START processing from thread {}", Thread.currentThread().getId()); + for (ChangeFeedProcessorItem item : docs) { + processItem(item, receivedDocuments); + } + logger.info("END processing from thread {}", Thread.currentThread().getId()); + }) + .feedContainer(createdFeedCollection) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeaseRenewInterval(Duration.ofSeconds(20)) + .setLeaseAcquireInterval(Duration.ofSeconds(10)) + .setLeaseExpirationInterval(Duration.ofSeconds(30)) + .setFeedPollDelay(Duration.ofSeconds(1)) + .setLeasePrefix("TEST") + .setMaxItemCount(10) + .setMinScaleCount(1) + .setMaxScaleCount(3) + ) + .buildChangeFeedProcessor(); + AtomicReference initialTimestamp = new AtomicReference<>(); + + startChangeFeedProcessor(changeFeedProcessor); + + assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + safeStopChangeFeedProcessor(changeFeedProcessor); + + createdLeaseCollection.queryItems("SELECT * FROM c", new CosmosQueryRequestOptions(), JsonNode.class) + .byPage() + .flatMap(feedResponse -> { + for (JsonNode item : feedResponse.getResults()) { + if (item.get("timestamp") != null) { + initialTimestamp.set(item.get("timestamp").asText()); + logger.info("Found timestamp: %s", initialTimestamp); + } + } + return Mono.empty(); + }).blockLast(); + + + + startChangeFeedProcessor(changeFeedProcessor); + + // create a gap between previously written documents + Thread.sleep(3000); + + // process some documents after the CFP is started and stopped + setupReadFeedDocuments(createdDocuments, createdFeedCollection, FEED_COUNT); + + // Wait for the feed processor to receive and process the documents. + waitToReceiveDocuments(receivedDocuments, 40 * CHANGE_FEED_PROCESSOR_TIMEOUT, FEED_COUNT); + safeStopChangeFeedProcessor(changeFeedProcessor); + + logger.info("After processing documents"); + + AtomicReference newTimestamp = new AtomicReference<>(); + + + createdLeaseCollection.queryItems("SELECT * FROM c", new CosmosQueryRequestOptions(), JsonNode.class) + .byPage() + .flatMap(feedResponse -> { + for (JsonNode item : feedResponse.getResults()) { + if (item.get("timestamp") != null) { + newTimestamp.set(item.get("timestamp").asText()); + logger.info("Found timestamp: %s", newTimestamp); + } + } + return Mono.empty(); + }).blockLast(); + + + assertThat(newTimestamp.get()).doesNotContain("[UTC]"); + assertThat(initialTimestamp.get()).doesNotContain("[UTC]"); + + for (InternalObjectNode item : createdDocuments) { + assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); + } + + // Wait for the feed processor to shutdown. + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + + } finally { + safeDeleteCollection(createdFeedCollection); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + @Test(groups = {"multi-master"}, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDateForMultiWrite_test() throws InterruptedException { CosmosClientBuilder clientBuilder = getClientBuilder(); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 2af84251114a..11cd72aee6de 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -13,6 +13,7 @@ * Changed to use `PartitionKeyRangeCache` to get partition key range during startup and split handling. - [46700](https://github.com/Azure/azure-sdk-for-java/pull/46700) * Changed to use lower casing http header names for gateway response. - [46736](https://github.com/Azure/azure-sdk-for-java/pull/46736) * Improved resilience around several completion events for an ssl handshake. - [PR 46734](https://github.com/Azure/azure-sdk-for-java/pull/46734) +* Changed timestamp format to be consistent in leases for CFP. - [PR 46784](https://github.com/Azure/azure-sdk-for-java/pull/46784) * Added `MetadataThrottlingRetryPolicy` for `PartitionKeyRange` `RequestRateTooLargeException` handling. - [PR 46823](https://github.com/Azure/azure-sdk-for-java/pull/46823) ### 4.74.0 (2025-09-05) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ServiceItemLeaseV1.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ServiceItemLeaseV1.java index 468cfafaab84..820fc6717781 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ServiceItemLeaseV1.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ServiceItemLeaseV1.java @@ -54,7 +54,7 @@ public class ServiceItemLeaseV1 implements Lease { public ServiceItemLeaseV1() { ZonedDateTime currentTime = ZonedDateTime.now(ZoneId.of("UTC")); - this.timestamp = currentTime.toString(); + this.timestamp = currentTime.toInstant().toString(); this._ts = String.valueOf(currentTime.getSecond()); this.properties = new HashMap<>(); // By default, this is EPK_RANGE_BASED_LEASE version diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/ServiceItemLease.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/ServiceItemLease.java index 2a693bcf2855..e0f4a0be1a18 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/ServiceItemLease.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/ServiceItemLease.java @@ -46,7 +46,7 @@ public class ServiceItemLease implements Lease { public ServiceItemLease() { ZonedDateTime currentTime = ZonedDateTime.now(ZoneId.of("UTC")); - this.timestamp = currentTime.toString(); + this.timestamp = currentTime.toInstant().toString(); this._ts = String.valueOf(currentTime.getSecond()); this.properties = new HashMap<>(); }