Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(AP-6075): Add publisher & consumer level message deduplication #236

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e545a98
feat(AP-6075): Add producer level dedup for SQS queue
Drodevbar Jan 10, 2025
7858855
style: linting
Drodevbar Jan 10, 2025
68266a6
refactor: adjust naming slightly
Drodevbar Jan 10, 2025
5c7bd5e
tests: tests coverage
Drodevbar Jan 10, 2025
47d4e9b
feat(AP-6075): per message type sqs deduplication
Drodevbar Jan 13, 2025
56964a6
tests: add missing test dependency
Drodevbar Jan 13, 2025
972e853
feat(AP-6075): Add sns publisher and publisher manager part
Drodevbar Jan 13, 2025
c770cd8
docs: add readme
Drodevbar Jan 13, 2025
d0e433f
feat: add validation for message deduplication window
Drodevbar Jan 14, 2025
5693616
chore: prefer ts-expect-error rather than ts-ignore
Drodevbar Jan 14, 2025
446ba48
chore(AP-6075): change MessageDeduplicationStore slightly, use redis …
Drodevbar Jan 14, 2025
1344903
docs: fix readme
Drodevbar Jan 15, 2025
762f138
chore: bump up versions and adjust readme
Drodevbar Jan 16, 2025
876498c
chore(AP-6075): synchronous check to see if dedup is enabled & minor …
Drodevbar Jan 17, 2025
bbc159e
feat(AP-6076): Add consumer-level message deduplication (#237)
Drodevbar Jan 22, 2025
9757cbb
chore: adjust versions after squeezing consumer and producer dedup PR…
Drodevbar Jan 22, 2025
8a68ad3
refactor: reuse common fx to retrieve consumer & publusher dedup configs
Drodevbar Jan 22, 2025
ed5f470
refactor: wasDeduplicationKeyStored
Drodevbar Jan 22, 2025
19494ae
docs: update readme
Drodevbar Jan 22, 2025
391bc7b
docs: update readme
Drodevbar Jan 22, 2025
fae22c5
feat: single redis message deduplication store
Drodevbar Jan 22, 2025
1740b80
docs: add jsdocs
Drodevbar Jan 22, 2025
982e1e1
refactor: use publusher instead of producer for consistancy sake
Drodevbar Jan 24, 2025
745b7b1
refactor: more generic types, utilize zod
Drodevbar Jan 26, 2025
7a0da21
feat: allow passing deduplicationId instead of using key generator
Drodevbar Jan 27, 2025
7bd8c86
feat: add graceful error handling on dedup level
Drodevbar Jan 27, 2025
abd05d2
tests: fix tests
Drodevbar Jan 27, 2025
a67b456
Merge remote-tracking branch 'origin/main' into feat/AP-6075-producer…
Drodevbar Jan 28, 2025
a122e01
docs: add comment to keyPrefix
Drodevbar Jan 28, 2025
31f5b28
docs: update readme & small var renaming
Drodevbar Jan 28, 2025
49d81ee
docs: add explaining comments
Drodevbar Jan 28, 2025
cf729c7
feat: use redis-semaphore for lock management, refactor
Drodevbar Jan 29, 2025
14bfc7c
feat: move dedup config to separate field passed as part of message p…
Drodevbar Jan 29, 2025
a02b53f
chore: decrease code coverage of schema package
Drodevbar Jan 29, 2025
4a5dbed
chore: adjust dedup default values
Drodevbar Feb 3, 2025
d43e827
Update README.md
Drodevbar Feb 4, 2025
7aada4d
Update README.md
Drodevbar Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ They implement the following public methods:
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
* `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
* `messageDeduplicationIdField` - which field in the message contains the deduplication id (by default it is `deduplicationId`). This field needs to be defined as `z.string` in the schema. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
* `messageDeduplicationOptionsField` - which field in the message contains the deduplication options (by default it is `deduplicationOptions`). This field needs to have the below stricture.
* `deduplicationWindowSeconds` - how many seconds the deduplication key should be kept in the store, i.e. how much time message should be prevented from being published again. This fields needs to be integer greater than 0. Default is 40 seconds.
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
Expand Down Expand Up @@ -101,6 +106,14 @@ Multi-schema consumers support multiple message types via handler configs. They
* `logMessages` - add logs for processed messages.
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
* `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
* `enableConsumerDeduplication` - enable store-based consumer-level deduplication. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
* `messageDeduplicationIdField` - which field in the message contains the deduplication id (by default it is `deduplicationId`). This field needs to be defined as `z.string` in the schema. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
* `messageDeduplicationOptionsField` - which field in the message contains the deduplication options (by default it is `deduplicationOptions`). This field needs to have the below stricture.
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
* `deduplicationWindowSeconds` - how many seconds the deduplication key should be kept in the store, i.e. how much time message should be prevented from being processed again after it was successfully consumed. This fields needs to be integer greater than 0. Default is 40 seconds.
* `lockTimeoutSeconds` - how many seconds the lock should be kept in the store, i.e. how much time message should be prevented from being processed by another consumer. If consumer doesn't crash, the lock is being constantly updated to prevent other consumers from processing the message. This fields needs to be integer greater than 0. Default is 20 seconds.
* `acquireTimeoutSeconds` - how many seconds at most the consumer should wait for the lock to be acquired. If the lock is not acquired within this time, the message will be re-queued. This field needs to be integer greater than 0. Default is 20 seconds.
* `refreshIntervalSeconds` - how often the lock should be refreshed. This field needs to be numeric greater than 0 (fractional values are allowed). Default is 10 second.
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.
Expand Down Expand Up @@ -390,5 +403,167 @@ It needs to implement the following methods:
- `messageTimestamp` - the timestamp when the message was sent initially
- `messageProcessingStartTimestamp` - the timestamp when the processing of the message started
- `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished
- `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled

See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations

## Store-based message deduplication

There are 2 types of store-based message deduplication: publisher-level and consumer-level.

### Publisher-level store-based message deduplication

Publisher-level store-based message deduplication is a mechanism that prevents the same message from being sent to the queue multiple times.
It is useful when you want to ensure that a message is published only once in a specified period of time, regardless of how many times it is sent.

The mechanism relies on:
1. a deduplication store, which is used to store deduplication keys for a certain period of time
2. a deduplication key, which uniquely identifies a message
3. a deduplication config, which contains details like a period of time during which a deduplication key is stored in the store (see [Publishers](#publishers) for more details of the options)

Note that in case of some queuing systems, such as standard SQS, publisher-level deduplication is not sufficient to guarantee that a message is **processed** only once.
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
This is because standard SQS has an at-least-once delivery guarantee, which means that a message can be delivered more than once.
In such cases, publisher-level deduplication should be combined with consumer-level one.

The keys stored in the deduplication stored are prefixed with `publisher:` to avoid conflicts with consumer-level deduplication keys in case they are both enabled and the same duplication store is used.

In case you would like to use SQS FIFO deduplication feature, store-based deduplication won't handle it for you.
Instead, you should either enable content-based deduplication on the queue or pass `MessageDeduplicationId` within message options when publishing a message.

#### Configuration

1. **Install a deduplication store implementation (Redis in example)**:
```bash
npm install @message-queue-toolkit/redis-message-deduplication-store
```

2. **Configure your setup:**
```typescript
import { Redis } from 'ioredis'
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'

const redisClient = new Redis({
// your redis configuration
})

// Create a new instance of RedisMessageDeduplicationStore
messageDeduplicationStore = new RedisMessageDeduplicationStore({ redis: redisClient })

// Configure messages publisher
export class MyPublisher extends AbstractSqsPublisher<> {
constructor(
// dependencies and options
) {
super(dependencies, {
// rest of the configuration
enablePublisherDeduplication: true,
messageDeduplicationIdField: 'deduplicationId', // provide a field name in the message that contains unique deduplication id
messageDeduplicationOptionsField: 'deduplicationOptions', // provide a field name in the message that contains deduplication options
messageDeduplicationConfig: {
deduplicationStore: messageDeduplicationStore, // provide an instance of deduplication store
},
})
}
}

// Create a publisher and publish a message
const publisher = new MyPublisher(dependencies, options)
await publisher.init()

// Publish a message and deduplicate it for the next 60 seconds
await publisher.publish({
// rest of the message payload
deduplicationId: 'unique-id',
// the below options are optional. In case they are not provided or invalid, default values will be used
deduplicationOptions: {
deduplicationWindowSeconds: 60,
},
})
// any subsequent call to publish with the same deduplicationId will be ignored for the next 60 seconds

// You can also publish messages without deduplication, by simply ommitting deduplicationId field
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
await publisher.publish({
// message payload
})
```

### Consumer-level store-based message deduplication

Consumer-level store-based message deduplication is a mechanism that prevents the same message from being processed multiple times.
It is useful when you want to be sure that message is processed only once in a specified period of time, regardless of how many times it is received.

The mechanism relies on:
1. a deduplication store, which is used to store deduplication keys for a certain period of time.
2. a deduplication key, which uniquely identifies a message
3. a deduplication config (see [Consumers](#consumers) for more details of the options)

When a message is received, the consumer checks if the deduplication key exists in the store.

The consumer with store-based message deduplication enabled starts by checking if the deduplication key exists in the store.
If it does, message is considered as a duplicate and is ignored.
Otherwise, the consumer acquires an exclusive lock, which guarantees that only it can process the message.
In case of redis-based deduplication store, `redis-semaphore` is used which handles keeping lock alive when consumer is processing the message, and releasing it when processing is done.
Upon successful message processing, deduplication key is stored in the store for a given period of time to prevent processing the same message again.
In case lock is not acquired within the specified time, the message is re-queued.

The keys stored in the deduplication stored are prefixed with `consumer:` to avoid conflicts with publisher-level deduplication keys in case they are both enabled and the same duplication store is used.

In case you would like to use SQS FIFO deduplication feature, store-based deduplication won't handle it for you.
Instead, you should either enable content-based deduplication on the queue or pass `MessageDeduplicationId` within message options when publishing a message.

#### Configuration

1. **Install a deduplication store implementation (Redis in example)**:
```bash
npm install @message-queue-toolkit/redis-message-deduplication-store
```

2. **Configure your setup:**
```typescript
import { Redis } from 'ioredis'
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'

const redisClient = new Redis({
// your redis configuration
})

// Create a new instance of RedisMessageDeduplicationStore
messageDeduplicationStore = new RedisMessageDeduplicationStore({ redis: redisClient })

export class MyConsumer extends AbstractSqsConsumer<> {
constructor(
// dependencies and options
) {
super(dependencies, {
enableConsumerDeduplication: true,
messageDeduplicationIdField: 'deduplicationId', // provide a field name in the message that contains unique deduplication id
messageDeduplicationOptionsField: 'deduplicationOptions', // provide a field name in the message that contains deduplication options
messageDeduplicationConfig: {
deduplicationStore: messageDeduplicationStore, // provide an instance of deduplication store
},
})
}
}

// Create a consumer and start listening for messages
const consumer = new MyConsumer(dependencies, options)
await consumer.init()

// Publish a message, so that consumer can process it
publisher.publish({
// rest of the message payload
deduplicationId: 'unique-id',
// the below options are optional. In case they are not provided or invalid, default values will be used
deduplicationOptions: {
deduplicationWindowSeconds: 60,
lockTimeoutSeconds: 10,
acquireTimeoutSeconds: 10,
refreshIntervalSeconds: 5,
},
})

// You can also consume messages without deduplication, by simply ommitting deduplicationId field while publishing
publisher.publish({
// message payload
})
```
11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ services:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'

redis:
image: redis:6.2.7-alpine
command: redis-server --requirepass sOmE_sEcUrE_pAsS
ports:
- '6379:6379'
volumes:
- redis_data:/redis/data:cached
restart: on-failure

volumes:
rabbit_data:
driver: local
redis_data:
driver: local
2 changes: 1 addition & 1 deletion packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export abstract class AbstractAmqpConsumer<
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) {
// TODO: Add retry delay + republish message updating internal properties
this.channel.nack(message, false, true)
this.channel.nack(message as Message, false, true)
this.handleMessageProcessed({
message: parsedMessage,
processingResult: 'retryLater',
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/amqp",
"version": "18.0.0",
"version": "18.1.0",
"private": false,
"license": "MIT",
"description": "AMQP adapter for message-queue-toolkit",
Expand Down Expand Up @@ -29,7 +29,7 @@
"zod": "^3.23.8"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=19.0.0",
"@message-queue-toolkit/core": ">=19.1.0",
"@message-queue-toolkit/schemas": ">=2.0.0",
"amqplib": "^0.10.3"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ describe('AmqpPermissionConsumer', () => {
{
messageId: '1',
messageType: 'add',
messageDeduplicationId: undefined,
processingResult: 'consumed',
queueName: AmqpPermissionConsumer.QUEUE_NAME,
messageTimestamp: expect.any(Number),
Expand Down
9 changes: 9 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ export {
OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA,
isOffloadedPayloadPointerPayload,
} from './lib/payload-store/offloadedPayloadMessageSchemas'
export {
type MessageDeduplicationStore,
type MessageDeduplicationConfig,
type ReleasableLock,
type AcquireLockOptions,
DeduplicationRequester,
AcquireLockTimeoutError,
noopReleasableLock,
} from './lib/message-deduplication/messageDeduplicationTypes'
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import type { Either } from '@lokalise/node-core'
import type { MessageDeduplicationOptions } from '@message-queue-toolkit/schemas'

export interface ReleasableLock {
release(): Promise<void>
}

export class AcquireLockTimeoutError extends Error {}

export type AcquireLockOptions = Required<
Pick<
MessageDeduplicationOptions,
'acquireTimeoutSeconds' | 'lockTimeoutSeconds' | 'refreshIntervalSeconds'
>
>

export interface MessageDeduplicationStore {
/**
* Stores a deduplication key in case it does not already exist.
* @param {string} key - deduplication key
* @param {string} value - value to store
* @param {number} ttlSeconds - time to live in seconds
* @returns {Promise<boolean>} - true if the key was stored, false if it already existed
*/
setIfNotExists(key: string, value: string, ttlSeconds: number): Promise<boolean>

/**
* Acquires locks for a given key
* @param {string} key - deduplication key
* @param {object} options - options used when acquiring the lock
* @returns {Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>} - a promise that resolves to a ReleasableLock if the lock was acquired, AcquireLockTimeoutError error if the lock could not be acquired due to timeout, or an Error if the lock could not be acquired for another reason
*/
acquireLock(
key: string,
options: object,
): Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>

/**
* Checks if a deduplication key exists in the store
* @param {string} key - deduplication key
* @returns {Promise<boolean>} - true if the key exists, false otherwise
*/
keyExists(key: string): Promise<boolean>
}

export type MessageDeduplicationConfig = {
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
/** The store to use for storage and retrieval of deduplication keys */
deduplicationStore: MessageDeduplicationStore
}

export enum DeduplicationRequester {
Consumer = 'consumer',
Publisher = 'publisher',
}

export const DEFAULT_MESSAGE_DEDUPLICATION_OPTIONS: Required<MessageDeduplicationOptions> = {
deduplicationWindowSeconds: 40,
lockTimeoutSeconds: 20,
acquireTimeoutSeconds: 20,
refreshIntervalSeconds: 10,
}

export const noopReleasableLock: ReleasableLock = {
release: async () => {},
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA = z
offloadedPayloadPointer: z.string().min(1),
offloadedPayloadSize: z.number().int().positive(),
})
// Pass-through allows to pass message ID, type and timestamp that are using dynamic keys.
// Pass-through allows to pass message ID, type, timestamp and message-deduplication-related fields that are using dynamic keys.
.passthrough()

export type OffloadedPayloadPointerPayload = z.infer<
Expand Down
1 change: 1 addition & 0 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export abstract class AbstractPublisherManager<
this.targetToEventMap[eventTarget].push(supportedEvent)
}
}

private registerPublishers() {
for (const eventTarget in this.targetToEventMap) {
if (this.targetToPublisherMap[eventTarget]) {
Expand Down
Loading
Loading