Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed `updateCheckpoint` to properly handle checkpoints when `offsetString` is available but `offset` is null, preventing potential data loss during checkpoint updates.

### Other Changes

## 1.21.2 (2025-10-27)
Expand Down Expand Up @@ -617,4 +619,3 @@ store checkpoints and balance partition load among all instances of Event Proces

- Initial offset provider for each partition is not implemented.
- Interoperability with Event Processors of other language SDKs like Python is not supported.

Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,12 @@ private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, Parti
*/
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
if (checkpoint == null
|| (checkpoint.getSequenceNumber() == null
&& checkpoint.getOffset() == null
&& CoreUtils.isNullOrEmpty(checkpoint.getOffsetString()))) {
throw LOGGER.logExceptionAsWarning(Exceptions.propagate(new IllegalStateException(
"Both sequence number and offset cannot be null when updating a checkpoint")));
"At least one of sequence number or offset information (offset or offsetString) must be provided when updating a checkpoint")));
}

String partitionId = checkpoint.getPartitionId();
Expand All @@ -259,8 +262,14 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
String sequenceNumber
= checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());

String offset;
if (CoreUtils.isNullOrEmpty(checkpoint.getOffsetString())) {
offset = Objects.toString(checkpoint.getOffset(), null);
} else {
offset = checkpoint.getOffsetString();
}
metadata.put(SEQUENCE_NUMBER, sequenceNumber);
metadata.put(OFFSET, checkpoint.getOffsetString());
metadata.put(OFFSET, offset);
BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

return blobAsyncClient.exists().flatMap(exists -> {
Expand Down