Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(AP-6075): Add publisher & consumer level message deduplication #236

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e545a98
feat(AP-6075): Add producer level dedup for SQS queue
Drodevbar Jan 10, 2025
7858855
style: linting
Drodevbar Jan 10, 2025
68266a6
refactor: adjust naming slightly
Drodevbar Jan 10, 2025
5c7bd5e
tests: tests coverage
Drodevbar Jan 10, 2025
47d4e9b
feat(AP-6075): per message type sqs deduplication
Drodevbar Jan 13, 2025
56964a6
tests: add missing test dependency
Drodevbar Jan 13, 2025
972e853
feat(AP-6075): Add sns publisher and publisher manager part
Drodevbar Jan 13, 2025
c770cd8
docs: add readme
Drodevbar Jan 13, 2025
d0e433f
feat: add validation for message deduplication window
Drodevbar Jan 14, 2025
5693616
chore: prefer ts-expect-error rather than ts-ignore
Drodevbar Jan 14, 2025
446ba48
chore(AP-6075): change MessageDeduplicationStore slightly, use redis …
Drodevbar Jan 14, 2025
1344903
docs: fix readme
Drodevbar Jan 15, 2025
762f138
chore: bump up versions and adjust readme
Drodevbar Jan 16, 2025
876498c
chore(AP-6075): synchronous check to see if dedup is enabled & minor …
Drodevbar Jan 17, 2025
bbc159e
feat(AP-6076): Add consumer-level message deduplication (#237)
Drodevbar Jan 22, 2025
9757cbb
chore: adjust versions after squeezing consumer and producer dedup PR…
Drodevbar Jan 22, 2025
8a68ad3
refactor: reuse common fx to retrieve consumer & publusher dedup configs
Drodevbar Jan 22, 2025
ed5f470
refactor: wasDeduplicationKeyStored
Drodevbar Jan 22, 2025
19494ae
docs: update readme
Drodevbar Jan 22, 2025
391bc7b
docs: update readme
Drodevbar Jan 22, 2025
fae22c5
feat: single redis message deduplication store
Drodevbar Jan 22, 2025
1740b80
docs: add jsdocs
Drodevbar Jan 22, 2025
982e1e1
refactor: use publusher instead of producer for consistancy sake
Drodevbar Jan 24, 2025
745b7b1
refactor: more generic types, utilize zod
Drodevbar Jan 26, 2025
7a0da21
feat: allow passing deduplicationId instead of using key generator
Drodevbar Jan 27, 2025
7bd8c86
feat: add graceful error handling on dedup level
Drodevbar Jan 27, 2025
abd05d2
tests: fix tests
Drodevbar Jan 27, 2025
a67b456
Merge remote-tracking branch 'origin/main' into feat/AP-6075-producer…
Drodevbar Jan 28, 2025
a122e01
docs: add comment to keyPrefix
Drodevbar Jan 28, 2025
31f5b28
docs: update readme & small var renaming
Drodevbar Jan 28, 2025
49d81ee
docs: add explaining comments
Drodevbar Jan 28, 2025
cf729c7
feat: use redis-semaphore for lock management, refactor
Drodevbar Jan 29, 2025
14bfc7c
feat: move dedup config to separate field passed as part of message p…
Drodevbar Jan 29, 2025
a02b53f
chore: decrease code coverage of schema package
Drodevbar Jan 29, 2025
4a5dbed
chore: adjust dedup default values
Drodevbar Feb 3, 2025
d43e827
Update README.md
Drodevbar Feb 4, 2025
7aada4d
Update README.md
Drodevbar Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,71 @@ It needs to implement the following methods:
- `messageProcessingMilliseconds` - message processing time in milliseconds

See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations

## Producer-level deduplication

Producer-level message deduplication is a mechanism that prevents the same message from being sent to the queue multiple times.
It is useful when you want to ensure that a message is published only once, regardless of how many times it is sent.

Note that in case of some queuing systems, such as standard SQS, producer-level deduplication is not sufficient to guarantee that a message is **processed** only once.
This is because standard SQS has an at-least-once delivery guarantee, which means that a message can be delivered more than once.
In such cases, producer-level deduplication should be combined with consumer-level one.

### Configuration

1. **Install a deduplication store implementation (Redis in example)**:
```bash
npm install @message-queue-toolkit/redis-deduplication-store
```

2. **Configure your setup:**
```typescript
import { Redis } from 'ioredis'
import { RedisDeduplicationStore } from '@message-queue-toolkit/redis-deduplication-store'
import { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core'

const redisClient = new Redis({
// your redis configuration
})

// Create a new instance of RedisMessageDeduplicationStore
messageDeduplicationStore = new RedisMessageDeduplicationStore(
{ redis: diContainer.cradle.redis },
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
{ keyPrefix: 'optional-key-prefix' }, // used to prefix deduplication keys
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't redis client by itself support keyprefix?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is the idea here that we may want to have different prefixes on the same connection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we could use the same Redis connection as part of a service for both "regular" operations (like tmp storage management for different types of needs) and deduplication. In such a case, I think we need to handle key prefixing on dthe edup store. Feel free to disagree.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a user-defined key for this? wonder if there could be a default prefix per use-case, with a possibility for user to override

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that prefix is optional, I can add a default one in case user doesn't specify it (but in such case maybe they don't need prefixes at all? 🤔)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's OK to have prefix, but then let's explain what it's supposed to be and when different values need to be passed, so that user is not confused

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the description to RedisMessageDeduplicationStoreConfig

export type RedisMessageDeduplicationStoreConfig = {
  /** Optional prefix for the keys in the store, useful for namespacing and differentiating between deduplication store and other data */
  keyPrefix?: string
}

)

// Producer-level deduplication allows you to provide custom strategies of deduplication key generation for each message type
// In this example we'll provide just one strategy for one message type - 'dummy'
class DummyMessageDeduplicationKeyGenerator implements MessageDeduplicationKeyGenerator<DummyEvent> {
generateKey(message: DummyEvent): string {
return message.id
}
}

const dummyMessageDeduplicationKeyGenerator = new DummyMessageDeduplicationKeyGenerator()

export class MyPublisher extends AbstractSqsPublisher<> {
constructor(
// dependencies and options
) {
super(dependencies, {
// rest of the configuration
messageDeduplicationConfig: {
deduplicationStore: messageDeduplicationStore,
messageTypeToConfigMap: {
dummy: {
deduplicationWindowSeconds: 10,
deduplicationKeyGenerator: dummyMessageDeduplicationKeyGenerator,
},
// In case there are other event types available, you can provide their deduplication strategies here
// If strategy for certain message type is not provided, deduplication will not be performed for this message type
},
},
})
}
}
```




11 changes: 11 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ services:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'

redis:
image: redis:6.2.7-alpine
command: redis-server --requirepass sOmE_sEcUrE_pAsS
ports:
- '6379:6379'
volumes:
- redis_data:/redis/data:cached
restart: on-failure

volumes:
rabbit_data:
driver: local
redis_data:
driver: local
6 changes: 6 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ export {
OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA,
isOffloadedPayloadPointerPayload,
} from './lib/payload-store/offloadedPayloadMessageSchemas'
export type {
MessageDeduplicationConfig,
MessageDeduplicationMessageTypeConfig,
MessageDeduplicationStore,
MessageDeduplicationKeyGenerator,
} from './lib/message-deduplication/messageDeduplicationTypes'
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { z } from 'zod'

export const MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA = z.object({
deduplicationWindowSeconds: z.number().int().gt(0),
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export interface MessageDeduplicationKeyGenerator<Message extends object = object> {
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
generate(message: Message): string
}

export interface MessageDeduplicationStore {
storeKey(key: string, value: string, ttlSeconds: number): Promise<void>
retrieveKey(key: string): Promise<string | null>
}

export type MessageDeduplicationMessageTypeConfig = {
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
/** How many seconds to keep the deduplication key in the store for a particular message type */
deduplicationWindowSeconds: number
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved

/** The generator to use for generating deduplication keys for a particular message type */
deduplicationKeyGenerator: MessageDeduplicationKeyGenerator
}

export type MessageDeduplicationConfig = {
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
/** The store to use for storage and retrieval of deduplication keys */
deduplicationStore: MessageDeduplicationStore

/** The configuration for deduplication for each message type */
messageTypeToConfigMap: Record<string, MessageDeduplicationMessageTypeConfig>
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export abstract class AbstractPublisherManager<
this.targetToEventMap[eventTarget].push(supportedEvent)
}
}

private registerPublishers() {
for (const eventTarget in this.targetToEventMap) {
if (this.targetToPublisherMap[eventTarget]) {
Expand Down
80 changes: 78 additions & 2 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import type { ZodSchema, ZodType } from 'zod'

import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
import { jsonStreamStringifySerializer } from '../payload-store/JsonStreamStringifySerializer'
import { OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA } from '../payload-store/offloadedPayloadMessageSchemas'
import type { OffloadedPayloadPointerPayload } from '../payload-store/offloadedPayloadMessageSchemas'
import { OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA } from '../payload-store/offloadedPayloadMessageSchemas'
import type { PayloadStoreConfig } from '../payload-store/payloadStoreTypes'
import { isDestroyable } from '../payload-store/payloadStoreTypes'
import type { MessageProcessingResult } from '../types/MessageQueueTypes'
Expand All @@ -23,6 +23,8 @@ import { isRetryDateExceeded } from '../utils/dateUtils'
import { streamWithKnownSizeToString } from '../utils/streamUtils'
import { toDatePreprocessor } from '../utils/toDateProcessor'

import { MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA } from '../message-deduplication/messageDeduplicationSchemas'
import type { MessageDeduplicationConfig } from '../message-deduplication/messageDeduplicationTypes'
import type {
BarrierCallback,
BarrierResult,
Expand Down Expand Up @@ -84,8 +86,9 @@ export abstract class AbstractQueueService<
protected readonly deletionConfig?: DeletionConfig
protected readonly payloadStoreConfig?: Omit<PayloadStoreConfig, 'serializer'> &
Required<Pick<PayloadStoreConfig, 'serializer'>>
protected readonly _handlerSpy?: HandlerSpy<MessagePayloadSchemas>
protected readonly messageDeduplicationConfig?: MessageDeduplicationConfig
protected readonly messageMetricsManager?: MessageMetricsManager<MessagePayloadSchemas>
protected readonly _handlerSpy?: HandlerSpy<MessagePayloadSchemas>

protected isInitted: boolean

Expand Down Expand Up @@ -118,6 +121,9 @@ export abstract class AbstractQueueService<
...options.payloadStoreConfig,
}
: undefined
this.messageDeduplicationConfig = this.getValidateMessageDeduplicationConfig(
options.messageDeduplicationConfig,
)

this.logMessages = options.logMessages ?? false
this._handlerSpy = resolveHandlerSpy<MessagePayloadSchemas>(options)
Expand Down Expand Up @@ -518,4 +524,74 @@ export abstract class AbstractQueueService<
return { error: new Error('Failed to parse serialized offloaded payload', { cause: e }) }
}
}

/** Checks for an existence of deduplication key in deduplication store */
protected async isMessageDuplicated(message: MessagePayloadSchemas): Promise<boolean> {
if (!this.messageDeduplicationConfig) {
return false
}

// @ts-ignore
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
const messageType = message[this.messageTypeField] as string

if (!this.messageDeduplicationConfig.messageTypeToConfigMap[messageType]) {
return false
}

const deduplicationKey =
this.messageDeduplicationConfig.messageTypeToConfigMap[
messageType
].deduplicationKeyGenerator.generate(message)
const deduplicationValue =
await this.messageDeduplicationConfig.deduplicationStore.retrieveKey(deduplicationKey)

return deduplicationValue !== null
}

/** Stores deduplication key in deduplication store */
protected async deduplicateMessage(message: MessagePayloadSchemas): Promise<void> {
if (!this.messageDeduplicationConfig) {
return
}

// @ts-ignore
const messageType = message[this.messageTypeField] as string

if (!this.messageDeduplicationConfig.messageTypeToConfigMap[messageType]) {
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
return
}

const deduplicationKey =
this.messageDeduplicationConfig.messageTypeToConfigMap[
messageType
].deduplicationKeyGenerator.generate(message)

await this.messageDeduplicationConfig.deduplicationStore.storeKey(
deduplicationKey,
new Date().toISOString(),
this.messageDeduplicationConfig.messageTypeToConfigMap[messageType]
.deduplicationWindowSeconds,
)
}

private getValidateMessageDeduplicationConfig(
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
messageDeduplicationConfig?: MessageDeduplicationConfig,
): MessageDeduplicationConfig | undefined {
if (!messageDeduplicationConfig) {
return undefined
}

for (const messageConfig of Object.values(messageDeduplicationConfig.messageTypeToConfigMap)) {
const messageTypeToConfigMapParseResult =
MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA.safeParse(messageConfig)

if (messageTypeToConfigMapParseResult.error) {
throw new Error(
`Invalid message deduplication config provided: ${messageTypeToConfigMapParseResult.error.toString()}`,
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
)
}
}

return messageDeduplicationConfig
}
}
2 changes: 2 additions & 0 deletions packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { PayloadStoreConfig } from '../payload-store/payloadStoreTypes'
import type { MessageHandlerConfig } from '../queues/HandlerContainer'
import type { HandlerSpy, HandlerSpyParams } from '../queues/HandlerSpy'

import type { MessageDeduplicationConfig } from '../message-deduplication/messageDeduplicationTypes'
import type { MessageProcessingResult, TransactionObservabilityManager } from './MessageQueueTypes'

export type QueueDependencies = {
Expand Down Expand Up @@ -60,6 +61,7 @@ export type CommonQueueOptions = {
logMessages?: boolean
deletionConfig?: DeletionConfig
payloadStoreConfig?: PayloadStoreConfig
messageDeduplicationConfig?: MessageDeduplicationConfig
}

export type CommonCreationConfigType = {
Expand Down
13 changes: 13 additions & 0 deletions packages/redis-message-deduplication-store/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
services:
redis:
image: redis:6.2.7-alpine
command: redis-server --requirepass sOmE_sEcUrE_pAsS
ports:
- '6379:6379'
volumes:
- redis_data:/redis/data:cached
restart: on-failure

volumes:
redis_data:
driver: local
1 change: 1 addition & 0 deletions packages/redis-message-deduplication-store/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lib/RedisMessageDeduplicationStore'
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { MessageDeduplicationStore } from '@message-queue-toolkit/core'
import type { Redis } from 'ioredis'

export type RedisMessageDeduplicationStoreDependencies = {
redis: Redis
}

export type RedisMessageDeduplicationStoreConfig = {
keyPrefix?: string
}

export class RedisMessageDeduplicationStore implements MessageDeduplicationStore {
private readonly redis: Redis
private readonly config: RedisMessageDeduplicationStoreConfig

constructor(
dependencies: RedisMessageDeduplicationStoreDependencies,
config: RedisMessageDeduplicationStoreConfig,
) {
this.redis = dependencies.redis
this.config = config
}

async storeKey(key: string, value: string, ttlSeconds: number): Promise<void> {
const keyWithPrefix = this.getKeyWithOptionalPrefix(key)

await this.redis.set(keyWithPrefix, value, 'EX', ttlSeconds)
Drodevbar marked this conversation as resolved.
Show resolved Hide resolved
}

retrieveKey(key: string): Promise<string | null> {
const keyWithPrefix = this.getKeyWithOptionalPrefix(key)

return this.redis.get(keyWithPrefix)
}

private getKeyWithOptionalPrefix(key: string): string {
return this.config?.keyPrefix?.length ? `${this.config.keyPrefix}:${key}` : key
}
}
47 changes: 47 additions & 0 deletions packages/redis-message-deduplication-store/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "@message-queue-toolkit/redis-message-deduplication-store",
"version": "1.0.0",
"private": false,
"license": "MIT",
"description": "Redis-based message deduplication store for message-queue-toolkit",
"maintainers": [
{
"name": "Igor Savin",
"email": "[email protected]"
}
],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "del-cli dist && tsc",
"build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json",
"test": "vitest",
"test:coverage": "npm test -- --coverage",
"test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev",
"lint": "biome check . && tsc --project tsconfig.json --noEmit",
"lint:fix": "biome check --write .",
"docker:start:dev": "docker compose up -d",
"docker:stop:dev": "docker compose down",
"prepublishOnly": "npm run build:release"
},
"dependencies": {},
"peerDependencies": {
"ioredis": "^5.3.2"
},
"devDependencies": {
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.0.0",
"@vitest/coverage-v8": "^2.0.4",
"del-cli": "^6.0.0",
"typescript": "^5.5.3",
"vitest": "^2.0.4"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
"type": "git",
"url": "git://github.com/kibertoad/message-queue-toolkit.git"
},
"keywords": ["message", "queue", "queues", "utils", "redis", "ioredis", "deduplication", "store"],
"files": ["README.md", "LICENSE", "dist/*"]
}
Loading
Loading