Skip to content

Commit 1a34b14

Browse files
Drodevbarkibertoad
andauthored
feat(AP-6075): Add publisher & consumer level message deduplication (#236)
* feat(AP-6075): Add producer level dedup for SQS queue * style: linting * refactor: adjust naming slightly * tests: tests coverage * feat(AP-6075): per message type sqs deduplication * tests: add missing test dependency * feat(AP-6075): Add sns publisher and publisher manager part * docs: add readme * feat: add validation for message deduplication window * chore: prefer ts-expect-error rather than ts-ignore * chore(AP-6075): change MessageDeduplicationStore slightly, use redis NX when storing data * docs: fix readme * chore: bump up versions and adjust readme * chore(AP-6075): synchronous check to see if dedup is enabled & minor CR adjustments * feat(AP-6076): Add consumer-level message deduplication (#237) * feat(AP-6076): consumer-level message deduplication * docs: add docs for consumer-level dedup * chore: bump up versions * chore(AP-6076): add sync check if dedup enabled, add extra comments, renaming * fix: fix checking dedup on consumer side + add sync checks * chore: simplify consumer dedup logic * chore: add "duplicate" MessageProcessingResult * chore: adjust publishers to use duplicate status * chore: adjust versions after squeezing consumer and producer dedup PRs into one * refactor: reuse common fx to retrieve consumer & publusher dedup configs * refactor: wasDeduplicationKeyStored * docs: update readme * docs: update readme * feat: single redis message deduplication store * docs: add jsdocs * refactor: use publusher instead of producer for consistancy sake * refactor: more generic types, utilize zod * feat: allow passing deduplicationId instead of using key generator * feat: add graceful error handling on dedup level * tests: fix tests * docs: add comment to keyPrefix * docs: update readme & small var renaming * docs: add explaining comments * feat: use redis-semaphore for lock management, refactor * feat: move dedup config to separate field passed as part of message property of choice, enqueue message in case of timeout while waiting for lock * chore: decrease code coverage of schema package * chore: adjust dedup default values * Update README.md Co-authored-by: Igor Savin <[email protected]> * Update README.md Co-authored-by: Igor Savin <[email protected]> * chore: dev deps cleanup --------- Co-authored-by: Igor Savin <[email protected]>
1 parent 066ca3d commit 1a34b14

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2246
-89
lines changed

README.md

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ They implement the following public methods:
3636
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
3737
* `logMessages` - add logs for processed messages.
3838
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
39+
* `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
40+
* `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
41+
* `messageDeduplicationIdField` - which field in the message contains the deduplication id (by default it is `deduplicationId`). This field needs to be defined as `z.string` in the schema. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
42+
* `messageDeduplicationOptionsField` - which field in the message contains the deduplication options (by default it is `deduplicationOptions`). This field needs to have the below stricture.
43+
* `deduplicationWindowSeconds` - how many seconds the deduplication key should be kept in the store, i.e. how much time message should be prevented from being published again. This fields needs to be integer greater than 0. Default is 40 seconds.
3944
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
4045
* `close()`, stop publisher use (e. g. disconnect);
4146
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
@@ -101,6 +106,14 @@ Multi-schema consumers support multiple message types via handler configs. They
101106
* `logMessages` - add logs for processed messages.
102107
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
103108
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
109+
* `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
110+
* `enableConsumerDeduplication` - enable store-based consumer-level deduplication. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
111+
* `messageDeduplicationIdField` - which field in the message contains the deduplication id (by default it is `deduplicationId`). This field needs to be defined as `z.string` in the schema. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
112+
* `messageDeduplicationOptionsField` - which field in the message contains the deduplication options (by default it is `deduplicationOptions`). This field needs to have the below structure.
113+
* `deduplicationWindowSeconds` - how many seconds the deduplication key should be kept in the store, i.e. how much time message should be prevented from being processed again after it was successfully consumed. This fields needs to be integer greater than 0. Default is 40 seconds.
114+
* `lockTimeoutSeconds` - how many seconds the lock should be kept in the store, i.e. how much time message should be prevented from being processed by another consumer. If consumer doesn't crash, the lock is being constantly updated to prevent other consumers from processing the message. This fields needs to be integer greater than 0. Default is 20 seconds.
115+
* `acquireTimeoutSeconds` - how many seconds at most the consumer should wait for the lock to be acquired. If the lock is not acquired within this time, the message will be re-queued. This field needs to be integer greater than 0. Default is 20 seconds.
116+
* `refreshIntervalSeconds` - how often the lock should be refreshed. This field needs to be numeric greater than 0 (fractional values are allowed). Default is 10 second.
104117
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
105118
* `close()`, stop listening for messages and disconnect;
106119
* `start()`, which invokes `init()`.
@@ -390,5 +403,167 @@ It needs to implement the following methods:
390403
- `messageTimestamp` - the timestamp when the message was sent initially
391404
- `messageProcessingStartTimestamp` - the timestamp when the processing of the message started
392405
- `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished
406+
- `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled
393407

394408
See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations
409+
410+
## Store-based message deduplication
411+
412+
There are 2 types of store-based message deduplication: publisher-level and consumer-level.
413+
414+
### Publisher-level store-based message deduplication
415+
416+
Publisher-level store-based message deduplication is a mechanism that prevents the same message from being sent to the queue multiple times.
417+
It is useful when you want to ensure that a message is published only once in a specified period of time, regardless of how many times it is sent.
418+
419+
The mechanism relies on:
420+
1. a deduplication store, which is used to store deduplication keys for a certain period of time
421+
2. a deduplication key, which uniquely identifies a message
422+
3. a deduplication config, which contains details like a period of time during which a deduplication key is stored in the store (see [Publishers](#publishers) for more details of the options)
423+
424+
Note that in case of some queuing systems, such as standard SQS, publisher-level deduplication is not sufficient to guarantee that a message is **processed** only once.
425+
This is because standard SQS has an at-least-once delivery guarantee, which means that a message can be delivered more than once.
426+
In such cases, publisher-level deduplication should be combined with consumer-level one.
427+
428+
The keys stored in the deduplication stored are prefixed with `publisher:` to avoid conflicts with consumer-level deduplication keys in case they are both enabled and the same duplication store is used.
429+
430+
In case you would like to use SQS FIFO deduplication feature, store-based deduplication won't handle it for you.
431+
Instead, you should either enable content-based deduplication on the queue or pass `MessageDeduplicationId` within message options when publishing a message.
432+
433+
#### Configuration
434+
435+
1. **Install a deduplication store implementation (Redis in example)**:
436+
```bash
437+
npm install @message-queue-toolkit/redis-message-deduplication-store
438+
```
439+
440+
2. **Configure your setup:**
441+
```typescript
442+
import { Redis } from 'ioredis'
443+
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'
444+
445+
const redisClient = new Redis({
446+
// your redis configuration
447+
})
448+
449+
// Create a new instance of RedisMessageDeduplicationStore
450+
messageDeduplicationStore = new RedisMessageDeduplicationStore({ redis: redisClient })
451+
452+
// Configure messages publisher
453+
export class MyPublisher extends AbstractSqsPublisher<> {
454+
constructor(
455+
// dependencies and options
456+
) {
457+
super(dependencies, {
458+
// rest of the configuration
459+
enablePublisherDeduplication: true,
460+
messageDeduplicationIdField: 'deduplicationId', // provide a field name in the message that contains unique deduplication id
461+
messageDeduplicationOptionsField: 'deduplicationOptions', // provide a field name in the message that contains deduplication options
462+
messageDeduplicationConfig: {
463+
deduplicationStore: messageDeduplicationStore, // provide an instance of deduplication store
464+
},
465+
})
466+
}
467+
}
468+
469+
// Create a publisher and publish a message
470+
const publisher = new MyPublisher(dependencies, options)
471+
await publisher.init()
472+
473+
// Publish a message and deduplicate it for the next 60 seconds
474+
await publisher.publish({
475+
// rest of the message payload
476+
deduplicationId: 'unique-id',
477+
// the below options are optional. In case they are not provided or invalid, default values will be used
478+
deduplicationOptions: {
479+
deduplicationWindowSeconds: 60,
480+
},
481+
})
482+
// any subsequent call to publish with the same deduplicationId will be ignored for the next 60 seconds
483+
484+
// You can also publish messages without deduplication, by simply omitting deduplicationId field
485+
await publisher.publish({
486+
// message payload
487+
})
488+
```
489+
490+
### Consumer-level store-based message deduplication
491+
492+
Consumer-level store-based message deduplication is a mechanism that prevents the same message from being processed multiple times.
493+
It is useful when you want to be sure that message is processed only once in a specified period of time, regardless of how many times it is received.
494+
495+
The mechanism relies on:
496+
1. a deduplication store, which is used to store deduplication keys for a certain period of time.
497+
2. a deduplication key, which uniquely identifies a message
498+
3. a deduplication config (see [Consumers](#consumers) for more details of the options)
499+
500+
When a message is received, the consumer checks if the deduplication key exists in the store.
501+
502+
The consumer with store-based message deduplication enabled starts by checking if the deduplication key exists in the store.
503+
If it does, message is considered as a duplicate and is ignored.
504+
Otherwise, the consumer acquires an exclusive lock, which guarantees that only it can process the message.
505+
In case of redis-based deduplication store, `redis-semaphore` is used which handles keeping lock alive when consumer is processing the message, and releasing it when processing is done.
506+
Upon successful message processing, deduplication key is stored in the store for a given period of time to prevent processing the same message again.
507+
In case lock is not acquired within the specified time, the message is re-queued.
508+
509+
The keys stored in the deduplication stored are prefixed with `consumer:` to avoid conflicts with publisher-level deduplication keys in case they are both enabled and the same duplication store is used.
510+
511+
In case you would like to use SQS FIFO deduplication feature, store-based deduplication won't handle it for you.
512+
Instead, you should either enable content-based deduplication on the queue or pass `MessageDeduplicationId` within message options when publishing a message.
513+
514+
#### Configuration
515+
516+
1. **Install a deduplication store implementation (Redis in example)**:
517+
```bash
518+
npm install @message-queue-toolkit/redis-message-deduplication-store
519+
```
520+
521+
2. **Configure your setup:**
522+
```typescript
523+
import { Redis } from 'ioredis'
524+
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'
525+
526+
const redisClient = new Redis({
527+
// your redis configuration
528+
})
529+
530+
// Create a new instance of RedisMessageDeduplicationStore
531+
messageDeduplicationStore = new RedisMessageDeduplicationStore({ redis: redisClient })
532+
533+
export class MyConsumer extends AbstractSqsConsumer<> {
534+
constructor(
535+
// dependencies and options
536+
) {
537+
super(dependencies, {
538+
enableConsumerDeduplication: true,
539+
messageDeduplicationIdField: 'deduplicationId', // provide a field name in the message that contains unique deduplication id
540+
messageDeduplicationOptionsField: 'deduplicationOptions', // provide a field name in the message that contains deduplication options
541+
messageDeduplicationConfig: {
542+
deduplicationStore: messageDeduplicationStore, // provide an instance of deduplication store
543+
},
544+
})
545+
}
546+
}
547+
548+
// Create a consumer and start listening for messages
549+
const consumer = new MyConsumer(dependencies, options)
550+
await consumer.init()
551+
552+
// Publish a message, so that consumer can process it
553+
publisher.publish({
554+
// rest of the message payload
555+
deduplicationId: 'unique-id',
556+
// the below options are optional. In case they are not provided or invalid, default values will be used
557+
deduplicationOptions: {
558+
deduplicationWindowSeconds: 60,
559+
lockTimeoutSeconds: 10,
560+
acquireTimeoutSeconds: 10,
561+
refreshIntervalSeconds: 5,
562+
},
563+
})
564+
565+
// You can also consume messages without deduplication, by simply ommitting deduplicationId field while publishing
566+
publisher.publish({
567+
// message payload
568+
})
569+
```

docker-compose.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ services:
2525
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
2626
- '/var/run/docker.sock:/var/run/docker.sock'
2727

28+
redis:
29+
image: redis:6.2.7-alpine
30+
command: redis-server --requirepass sOmE_sEcUrE_pAsS
31+
ports:
32+
- '6379:6379'
33+
volumes:
34+
- redis_data:/redis/data:cached
35+
restart: on-failure
36+
2837
volumes:
2938
rabbit_data:
3039
driver: local
40+
redis_data:
41+
driver: local

packages/amqp/lib/AbstractAmqpConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export abstract class AbstractAmqpConsumer<
180180
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
181181
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) {
182182
// TODO: Add retry delay + republish message updating internal properties
183-
this.channel.nack(message, false, true)
183+
this.channel.nack(message as Message, false, true)
184184
this.handleMessageProcessed({
185185
message: parsedMessage,
186186
processingResult: 'retryLater',

packages/amqp/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/amqp",
3-
"version": "18.0.0",
3+
"version": "18.1.0",
44
"private": false,
55
"license": "MIT",
66
"description": "AMQP adapter for message-queue-toolkit",
@@ -29,7 +29,7 @@
2929
"zod": "^3.23.8"
3030
},
3131
"peerDependencies": {
32-
"@message-queue-toolkit/core": ">=19.0.0",
32+
"@message-queue-toolkit/core": ">=19.1.0",
3333
"@message-queue-toolkit/schemas": ">=2.0.0",
3434
"amqplib": "^0.10.3"
3535
},

packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ describe('AmqpPermissionConsumer', () => {
155155
{
156156
messageId: '1',
157157
messageType: 'add',
158+
messageDeduplicationId: undefined,
158159
processingResult: 'consumed',
159160
queueName: AmqpPermissionConsumer.QUEUE_NAME,
160161
messageTimestamp: expect.any(Number),

packages/core/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,12 @@ export {
7878
OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA,
7979
isOffloadedPayloadPointerPayload,
8080
} from './lib/payload-store/offloadedPayloadMessageSchemas'
81+
export {
82+
type MessageDeduplicationStore,
83+
type MessageDeduplicationConfig,
84+
type ReleasableLock,
85+
type AcquireLockOptions,
86+
DeduplicationRequester,
87+
AcquireLockTimeoutError,
88+
noopReleasableLock,
89+
} from './lib/message-deduplication/messageDeduplicationTypes'
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import type { Either } from '@lokalise/node-core'
2+
import type { MessageDeduplicationOptions } from '@message-queue-toolkit/schemas'
3+
4+
export interface ReleasableLock {
5+
release(): Promise<void>
6+
}
7+
8+
export class AcquireLockTimeoutError extends Error {}
9+
10+
export type AcquireLockOptions = Required<
11+
Pick<
12+
MessageDeduplicationOptions,
13+
'acquireTimeoutSeconds' | 'lockTimeoutSeconds' | 'refreshIntervalSeconds'
14+
>
15+
>
16+
17+
export interface MessageDeduplicationStore {
18+
/**
19+
* Stores a deduplication key in case it does not already exist.
20+
* @param {string} key - deduplication key
21+
* @param {string} value - value to store
22+
* @param {number} ttlSeconds - time to live in seconds
23+
* @returns {Promise<boolean>} - true if the key was stored, false if it already existed
24+
*/
25+
setIfNotExists(key: string, value: string, ttlSeconds: number): Promise<boolean>
26+
27+
/**
28+
* Acquires locks for a given key
29+
* @param {string} key - deduplication key
30+
* @param {object} options - options used when acquiring the lock
31+
* @returns {Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>} - a promise that resolves to a ReleasableLock if the lock was acquired, AcquireLockTimeoutError error if the lock could not be acquired due to timeout, or an Error if the lock could not be acquired for another reason
32+
*/
33+
acquireLock(
34+
key: string,
35+
options: object,
36+
): Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>
37+
38+
/**
39+
* Checks if a deduplication key exists in the store
40+
* @param {string} key - deduplication key
41+
* @returns {Promise<boolean>} - true if the key exists, false otherwise
42+
*/
43+
keyExists(key: string): Promise<boolean>
44+
}
45+
46+
export type MessageDeduplicationConfig = {
47+
/** The store to use for storage and retrieval of deduplication keys */
48+
deduplicationStore: MessageDeduplicationStore
49+
}
50+
51+
export enum DeduplicationRequester {
52+
Consumer = 'consumer',
53+
Publisher = 'publisher',
54+
}
55+
56+
export const DEFAULT_MESSAGE_DEDUPLICATION_OPTIONS: Required<MessageDeduplicationOptions> = {
57+
deduplicationWindowSeconds: 40,
58+
lockTimeoutSeconds: 20,
59+
acquireTimeoutSeconds: 20,
60+
refreshIntervalSeconds: 10,
61+
}
62+
63+
export const noopReleasableLock: ReleasableLock = {
64+
release: async () => {},
65+
}

packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export const OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA = z
99
offloadedPayloadPointer: z.string().min(1),
1010
offloadedPayloadSize: z.number().int().positive(),
1111
})
12-
// Pass-through allows to pass message ID, type and timestamp that are using dynamic keys.
12+
// Pass-through allows to pass message ID, type, timestamp and message-deduplication-related fields that are using dynamic keys.
1313
.passthrough()
1414

1515
export type OffloadedPayloadPointerPayload = z.infer<

packages/core/lib/queues/AbstractPublisherManager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export abstract class AbstractPublisherManager<
131131
this.targetToEventMap[eventTarget].push(supportedEvent)
132132
}
133133
}
134+
134135
private registerPublishers() {
135136
for (const eventTarget in this.targetToEventMap) {
136137
if (this.targetToPublisherMap[eventTarget]) {

0 commit comments

Comments
 (0)