From ec9f6657fe774a5995f976c90ded362ee47c2ebb Mon Sep 17 00:00:00 2001 From: kamil-di <149760741+kamil-di@users.noreply.github.com> Date: Fri, 19 Sep 2025 09:45:38 +0200 Subject: [PATCH] #46752: Fix missing offset assignment logic in checkpoint store --- .../CHANGELOG.md | 3 ++- .../checkpointstore/blob/BlobCheckpointStore.java | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md index 43714c5039d1..ee6360e48b19 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed `updateCheckpoint` to properly handle checkpoints when `offsetString` is available but `offset` is null, preventing potential data loss during checkpoint updates. + ### Other Changes ## 1.21.2 (2025-10-27) @@ -617,4 +619,3 @@ store checkpoints and balance partition load among all instances of Event Proces - Initial offset provider for each partition is not implemented. - Interoperability with Event Processors of other language SDKs like Python is not supported. - diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index 2e5818c031c9..e30fb9b7a3c1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -243,9 +243,12 @@ private Mono updateOwnershipETag(Response response, Parti */ @Override public Mono updateCheckpoint(Checkpoint checkpoint) { - if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) { + if (checkpoint == null + || (checkpoint.getSequenceNumber() == null + && checkpoint.getOffset() == null + && CoreUtils.isNullOrEmpty(checkpoint.getOffsetString()))) { throw LOGGER.logExceptionAsWarning(Exceptions.propagate(new IllegalStateException( - "Both sequence number and offset cannot be null when updating a checkpoint"))); + "At least one of sequence number or offset information (offset or offsetString) must be provided when updating a checkpoint"))); } String partitionId = checkpoint.getPartitionId(); @@ -259,8 +262,14 @@ public Mono updateCheckpoint(Checkpoint checkpoint) { String sequenceNumber = checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber()); + String offset; + if (CoreUtils.isNullOrEmpty(checkpoint.getOffsetString())) { + offset = Objects.toString(checkpoint.getOffset(), null); + } else { + offset = checkpoint.getOffsetString(); + } metadata.put(SEQUENCE_NUMBER, sequenceNumber); - metadata.put(OFFSET, checkpoint.getOffsetString()); + metadata.put(OFFSET, offset); BlobAsyncClient blobAsyncClient = blobClients.get(blobName); return blobAsyncClient.exists().flatMap(exists -> {