-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(AP-6075): Add publisher & consumer level message deduplication #236
base: main
Are you sure you want to change the base?
Conversation
3e77c00
to
e49d745
Compare
e49d745
to
68266a6
Compare
275b919
to
470cd49
Compare
470cd49
to
972e853
Compare
017adad
to
c770cd8
Compare
packages/core/lib/message-deduplication/messageDeduplicationTypes.ts
Outdated
Show resolved
Hide resolved
packages/core/lib/message-deduplication/messageDeduplicationTypes.ts
Outdated
Show resolved
Hide resolved
packages/core/lib/message-deduplication/messageDeduplicationTypes.ts
Outdated
Show resolved
Hide resolved
packages/redis-message-deduplication-store/lib/RedisMessageDeduplicationStore.ts
Outdated
Show resolved
Hide resolved
…NX when storing data
688bbcc
to
8aa1a12
Compare
8aa1a12
to
1344903
Compare
packages/redis-message-deduplication-store/lib/RedisMessageDeduplicationStore.ts
Outdated
Show resolved
Hide resolved
messageTypeToConfigMap: Record<string, TConfig> | ||
} | ||
|
||
export enum ConsumerMessageDeduplicationKeyStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if message fails with "Error, do not retry"? (e. g. payload cannot be parsed or does not pass validation).
Are we / should we mark the message as PROCESSED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed 2., 3. and 4. scenarios from the below list:
- message validation fails and we remove the message from the queue
retryLater
and message should be retriedretryLater
and message should not be retried (due to exceeding the retries)- error during processing the message
In cases 2., 3. and 4. I'm releasing the lock before changing the message status to allow further processing. In the case of 1
, at this stage, the lock has not yet been acquired, so we don't need to do anything specific.
Hope it makes sense 🙌
…roperty of choice, enqueue message in case of timeout while waiting for lock
@@ -248,6 +274,11 @@ export abstract class AbstractQueueService< | |||
? // @ts-ignore | |||
message[this.messageTypeField] | |||
: undefined | |||
const messageDeduplicationId = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to reduce probability of errors, if (consumer)publisherDeduplication is enabled, and deduplicationId field is not present, I'd throw an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kibertoad Even if publisher/consumer-level deduplication is enabled, it's still possible to skip it by omitting the deduplication ID. This is by design as our publishers support multiple message types, so if dedup is enabled on the publisher level, not all message types will most likely support it.
Previously, this was being configured per message type on the consumer/publisher/manager level, but I've adjusted it to pass it as message metadata as discussed there with Krzysztof and Eduards: #236 (comment)
Let me know if it makes sense.
Co-authored-by: Igor Savin <[email protected]>
Co-authored-by: Igor Savin <[email protected]>
This PR adds publisher-level and consumer-level message deduplication mechanisms.
Changelog:
redis-sempaphore
under the hood)