diff --git a/packages/core/lib/queues/AbstractPublisherManager.ts b/packages/core/lib/queues/AbstractPublisherManager.ts index 87699b7d..88e300b3 100644 --- a/packages/core/lib/queues/AbstractPublisherManager.ts +++ b/packages/core/lib/queues/AbstractPublisherManager.ts @@ -131,6 +131,7 @@ export abstract class AbstractPublisherManager< this.targetToEventMap[eventTarget].push(supportedEvent) } } + private registerPublishers() { for (const eventTarget in this.targetToEventMap) { if (this.targetToPublisherMap[eventTarget]) { diff --git a/packages/sns/docker-compose.yml b/packages/sns/docker-compose.yml index c9d9b523..ec11c728 100644 --- a/packages/sns/docker-compose.yml +++ b/packages/sns/docker-compose.yml @@ -16,3 +16,15 @@ services: volumes: - '${TMPDIR:-/tmp}/localstack:/var/log/localstack' - '/var/run/docker.sock:/var/run/docker.sock' + + redis: + image: redis:6.2.7-alpine + command: redis-server --requirepass sOmE_sEcUrE_pAsS + ports: + - '6379:6379' + volumes: + - redis_data:/redis/data:cached + restart: on-failure +volumes: + redis_data: + driver: local diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index d2d0d5e4..c874c465 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -75,8 +75,13 @@ export abstract class AbstractSnsPublisher () => calculateOutgoingMessageSize(updatedMessage), ) + if (await this.isMessageDuplicated(parsedMessage)) { + return + } + await this.sendMessage(maybeOffloadedPayloadMessage, options) this.handleMessageProcessed(parsedMessage, 'published') + await this.deduplicateMessage(parsedMessage) } catch (error) { const err = error as Error this.handleError(err) diff --git a/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts new file mode 100644 index 00000000..4102a769 --- /dev/null +++ b/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts @@ -0,0 +1,214 @@ +import { + CommonMetadataFiller, + type MessageDeduplicationKeyGenerator, +} from '@message-queue-toolkit/core' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import { type AwilixContainer, asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { TestEventDeduplicationKeyGenerator } from '../../test/publishers/TestEventDeduplicationKeyGenerator' +import { cleanRedis } from '../../test/utils/cleanRedis' +import type { + Dependencies, + TestEventPublishPayloadsType, + TestEventsType, +} from '../../test/utils/testContext' +import { TestEvents, registerDependencies } from '../../test/utils/testContext' +import { type CommonSnsPublisher, CommonSnsPublisherFactory } from './CommonSnsPublisherFactory' +import { SnsPublisherManager } from './SnsPublisherManager' + +const TEST_DEDUPLICATION_KEY_PREFIX = 'test_key_prefix' + +describe('SnsPublisherManager', () => { + let diContainer: AwilixContainer + let publisherManager: SnsPublisherManager< + CommonSnsPublisher, + TestEventsType + > + let messageDeduplicationStore: RedisMessageDeduplicationStore + let messageDeduplicationKeyGenerator: MessageDeduplicationKeyGenerator + + beforeAll(async () => { + diContainer = await registerDependencies( + { + publisherManager: asValue(() => undefined), + }, + false, + ) + messageDeduplicationStore = new RedisMessageDeduplicationStore( + { + redis: diContainer.cradle.redis, + }, + { keyPrefix: TEST_DEDUPLICATION_KEY_PREFIX }, + ) + messageDeduplicationKeyGenerator = new TestEventDeduplicationKeyGenerator() + }) + + beforeEach(() => { + publisherManager = new SnsPublisherManager(diContainer.cradle, { + metadataFiller: new CommonMetadataFiller({ + serviceId: 'service', + }), + publisherFactory: new CommonSnsPublisherFactory(), + newPublisherOptions: { + handlerSpy: true, + messageIdField: 'id', + messageTypeField: 'type', + creationConfig: { + updateAttributesIfExists: true, + }, + messageDeduplicationConfig: { + deduplicationStore: messageDeduplicationStore, + messageTypeToConfigMap: { + 'entity.created': { + deduplicationKeyGenerator: messageDeduplicationKeyGenerator, + deduplicationWindowSeconds: 10, + }, + // 'entity.update' is not configured on purpose + }, + }, + }, + }) + }) + + afterEach(async () => { + await cleanRedis(diContainer.cradle.redis) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + describe('publish', () => { + it('publishes a message and writes deduplication key to store when message type is configured with deduplication', async () => { + const message = { + payload: { + entityId: '1', + newData: 'msg', + }, + type: 'entity.created', + } satisfies TestEventPublishPayloadsType + + const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, message) + + const spy = await publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .waitForMessageWithId(publishedMessage.id) + expect(spy.processingResult).toBe('published') + + const deduplicationKey = messageDeduplicationStore.retrieveKey( + messageDeduplicationKeyGenerator.generate(message), + ) + expect(deduplicationKey).not.toBeNull() + }) + + it('does not publish the same message if deduplication key already exists', async () => { + const message = { + payload: { + entityId: '1', + newData: 'msg', + }, + type: 'entity.created', + } satisfies TestEventPublishPayloadsType + + // Message is published for the initial call + const publishedMessageFirstCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message, + ) + + const spyFirstCall = await publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .waitForMessageWithId(publishedMessageFirstCall.id) + expect(spyFirstCall.processingResult).toBe('published') + + // Clear the spy, so we can check for the subsequent call + publisherManager.handlerSpy(TestEvents.created.snsTopic).clear() + + // Message is not published for the subsequent call + const publishedMessageSecondCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message, + ) + + const spySecondCall = publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .checkForMessage({ + id: publishedMessageSecondCall.id, + }) + expect(spySecondCall).toBeUndefined() + }) + + it('works only for event types that are configured', async () => { + const message1 = { + payload: { + entityId: '1', + newData: 'msg', + }, + type: 'entity.created', + } satisfies TestEventPublishPayloadsType + const message2 = { + payload: { + entityId: '1', + updatedData: 'msg', + }, + type: 'entity.updated', + } satisfies TestEventPublishPayloadsType + + // Message 1 is published for the initial call + const publishedMessageFirstCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message1, + ) + + const spyFirstCall = await publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .waitForMessageWithId(publishedMessageFirstCall.id) + expect(spyFirstCall.processingResult).toBe('published') + + // Clear the spy, so wew can check for the subsequent call + publisherManager.handlerSpy(TestEvents.created.snsTopic).clear() + + // Message 1 is not published for the subsequent call (deduplication works) + const publishedMessageSecondCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message1, + ) + + const spySecondCall = publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .checkForMessage({ + id: publishedMessageSecondCall.id, + }) + expect(spySecondCall).toBeUndefined() + + // Clear the spy, so we can check for the subsequent call + publisherManager.handlerSpy(TestEvents.created.snsTopic).clear() + + // Message 2 is published for the initial call + const publishedMessageThirdCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message2, + ) + + const spyThirdCall = await publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .waitForMessageWithId(publishedMessageThirdCall.id) + expect(spyThirdCall.processingResult).toBe('published') + + // Clear the spy, so we can check for the subsequent call + publisherManager.handlerSpy(TestEvents.created.snsTopic).clear() + + // Message 2 is published for the subsequent call (deduplication does not work) + const publishedMessageFourthCall = await publisherManager.publish( + TestEvents.created.snsTopic, + message2, + ) + + const spyFourthCall = await publisherManager + .handlerSpy(TestEvents.created.snsTopic) + .waitForMessageWithId(publishedMessageFourthCall.id) + expect(spyFourthCall.processingResult).toBe('published') + }) + }) +}) diff --git a/packages/sns/lib/sns/SnsPublisherManager.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.spec.ts index 2008cbca..dbd1cbe6 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.spec.ts @@ -41,6 +41,7 @@ describe('SnsPublisherManager', () => { // When const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, { payload: { + entityId: '1', newData: 'msg', }, type: 'entity.created', @@ -78,6 +79,7 @@ describe('SnsPublisherManager', () => { await expect( publisherManager.publish(TestEvents.created.snsTopic, { payload: { + entityId: '1', // @ts-expect-error This should be causing a compilation error updatedData: 'edwe', }, @@ -104,6 +106,7 @@ describe('SnsPublisherManager', () => { // @ts-expect-error Testing error scenario type: 'dummy.type', payload: { + entityId: '1', newData: 'msg', }, }), @@ -157,6 +160,7 @@ describe('SnsPublisherManager', () => { id: messageId, type: 'entity.created', payload: { + entityId: '1', newData: 'msg', }, }) diff --git a/packages/sns/package.json b/packages/sns/package.json index d54a3c31..0bac3850 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -44,6 +44,7 @@ "@biomejs/biome": "1.9.4", "@kibertoad/biome-config": "^1.2.1", "@message-queue-toolkit/core": "*", + "@message-queue-toolkit/redis-message-deduplication-store": "*", "@message-queue-toolkit/s3-payload-store": "*", "@message-queue-toolkit/sqs": "*", "@types/node": "^22.7.5", @@ -51,6 +52,7 @@ "awilix": "^12.0.1", "awilix-manager": "^5.1.0", "del-cli": "^6.0.0", + "ioredis": "^5.3.2", "typescript": "^5.7.2", "vitest": "^2.1.5" }, diff --git a/packages/sns/test/publishers/PermissionMessageDeduplicationKeyGenerator.ts b/packages/sns/test/publishers/PermissionMessageDeduplicationKeyGenerator.ts new file mode 100644 index 00000000..b333c244 --- /dev/null +++ b/packages/sns/test/publishers/PermissionMessageDeduplicationKeyGenerator.ts @@ -0,0 +1,15 @@ +import type { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core' +import type { + PERMISSIONS_ADD_MESSAGE_TYPE, + PERMISSIONS_REMOVE_MESSAGE_TYPE, +} from '../consumers/userConsumerSchemas' + +type PermissionMessagesType = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE + +export class PermissionMessageDeduplicationKeyGenerator + implements MessageDeduplicationKeyGenerator +{ + generate(message: PermissionMessagesType): string { + return `${message.messageType}:${message.id}` + } +} diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.messageDeduplication.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.messageDeduplication.spec.ts new file mode 100644 index 00000000..9301b18c --- /dev/null +++ b/packages/sns/test/publishers/SnsPermissionPublisher.messageDeduplication.spec.ts @@ -0,0 +1,151 @@ +import type { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import { type AwilixContainer, asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { cleanRedis } from '../../test/utils/cleanRedis' +import type { Dependencies } from '../../test/utils/testContext' +import { registerDependencies } from '../../test/utils/testContext' +import type { + PERMISSIONS_ADD_MESSAGE_TYPE, + PERMISSIONS_REMOVE_MESSAGE_TYPE, +} from '../consumers/userConsumerSchemas' +import { PermissionMessageDeduplicationKeyGenerator } from './PermissionMessageDeduplicationKeyGenerator' +import { SnsPermissionPublisher } from './SnsPermissionPublisher' + +const TEST_DEDUPLICATION_KEY_PREFIX = 'test_key_prefix' + +describe('SnsPermissionPublisher', () => { + describe('publish', () => { + let diContainer: AwilixContainer + let publisher: SnsPermissionPublisher + let messageDeduplicationStore: RedisMessageDeduplicationStore + let messageDeduplicationKeyGenerator: MessageDeduplicationKeyGenerator + + beforeAll(async () => { + diContainer = await registerDependencies( + { + permissionPublisher: asValue(() => undefined), + }, + false, + ) + messageDeduplicationStore = new RedisMessageDeduplicationStore( + { + redis: diContainer.cradle.redis, + }, + { keyPrefix: TEST_DEDUPLICATION_KEY_PREFIX }, + ) + messageDeduplicationKeyGenerator = new PermissionMessageDeduplicationKeyGenerator() + }) + + beforeEach(() => { + publisher = new SnsPermissionPublisher(diContainer.cradle, { + messageDeduplicationConfig: { + deduplicationStore: messageDeduplicationStore, + messageTypeToConfigMap: { + add: { + deduplicationKeyGenerator: messageDeduplicationKeyGenerator, + deduplicationWindowSeconds: 10, + }, + // 'remove' is not configured on purpose + }, + }, + }) + }) + + afterEach(async () => { + await cleanRedis(diContainer.cradle.redis) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publishes a message and writes deduplication key to store when message type is configured with deduplication', async () => { + const message = { + id: '1', + messageType: 'add', + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE + + await publisher.publish(message) + + const spy = await publisher.handlerSpy.waitForMessageWithId(message.id) + expect(spy.processingResult).toBe('published') + + const deduplicationKey = messageDeduplicationStore.retrieveKey( + messageDeduplicationKeyGenerator.generate(message), + ) + expect(deduplicationKey).not.toBeNull() + }) + + it('does not publish the same message if deduplication key already exists', async () => { + const message = { + id: '1', + messageType: 'add', + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE + + // Message is published for the initial call + await publisher.publish(message) + + const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId(message.id) + expect(spyFirstCall.processingResult).toBe('published') + + // Clear the spy, so we can check for the subsequent call + publisher.handlerSpy.clear() + + // Message is not published for the subsequent call + await publisher.publish(message) + + const spySecondCall = publisher.handlerSpy.checkForMessage({ + id: message.id, + }) + expect(spySecondCall).toBeUndefined() + }) + + it('works only for event types that are configured', async () => { + const message1 = { + id: '1', + messageType: 'add', + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE + const message2 = { + id: '1', + messageType: 'remove', + } satisfies PERMISSIONS_REMOVE_MESSAGE_TYPE + + // Message 1 is published for the initial call + await publisher.publish(message1) + + const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId(message1.id) + expect(spyFirstCall.processingResult).toBe('published') + + // Clear the spy, so wew can check for the subsequent call + publisher.handlerSpy.clear() + + // Message 1 is not published for the subsequent call (deduplication works) + await publisher.publish(message1) + + const spySecondCall = publisher.handlerSpy.checkForMessage({ + id: message1.id, + }) + expect(spySecondCall).toBeUndefined() + + // Clear the spy, so we can check for the subsequent call + publisher.handlerSpy.clear() + + // Message 2 is published for the initial call + await publisher.publish(message2) + + const spyThirdCall = await publisher.handlerSpy.waitForMessageWithId(message2.id) + expect(spyThirdCall.processingResult).toBe('published') + + // Clear the spy, so we can check for the subsequent call + publisher.handlerSpy.clear() + + // Message 2 is published for the subsequent call (deduplication does not work) + await publisher.publish(message2) + + const spyFourthCall = await publisher.handlerSpy.waitForMessageWithId(message2.id) + expect(spyFourthCall.processingResult).toBe('published') + }) + }) +}) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.ts b/packages/sns/test/publishers/SnsPermissionPublisher.ts index a75f0f7b..80c2ba6c 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.ts @@ -16,7 +16,10 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher constructor( dependencies: SNSDependencies, - options?: Pick, + options?: Pick< + SNSOptions, + 'creationConfig' | 'locatorConfig' | 'payloadStoreConfig' | 'messageDeduplicationConfig' + >, ) { super(dependencies, { ...(options?.locatorConfig @@ -33,6 +36,7 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher messageSchemas: [PERMISSIONS_ADD_MESSAGE_SCHEMA, PERMISSIONS_REMOVE_MESSAGE_SCHEMA], handlerSpy: true, messageTypeField: 'messageType', + messageDeduplicationConfig: options?.messageDeduplicationConfig, }) } diff --git a/packages/sns/test/publishers/TestEventDeduplicationKeyGenerator.ts b/packages/sns/test/publishers/TestEventDeduplicationKeyGenerator.ts new file mode 100644 index 00000000..467834ad --- /dev/null +++ b/packages/sns/test/publishers/TestEventDeduplicationKeyGenerator.ts @@ -0,0 +1,10 @@ +import type { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core' +import type { TestEventPublishPayloadsType } from '../utils/testContext' + +export class TestEventDeduplicationKeyGenerator + implements MessageDeduplicationKeyGenerator +{ + generate(message: TestEventPublishPayloadsType): string { + return `${message.type}:${message.payload.entityId}` + } +} diff --git a/packages/sns/test/utils/cleanRedis.ts b/packages/sns/test/utils/cleanRedis.ts new file mode 100644 index 00000000..5d22c748 --- /dev/null +++ b/packages/sns/test/utils/cleanRedis.ts @@ -0,0 +1,5 @@ +import type { Redis } from 'ioredis' + +export async function cleanRedis(redis: Redis) { + await redis.flushall('SYNC') +} diff --git a/packages/sns/test/utils/testContext.ts b/packages/sns/test/utils/testContext.ts index 6d506386..a09a40ee 100644 --- a/packages/sns/test/utils/testContext.ts +++ b/packages/sns/test/utils/testContext.ts @@ -25,7 +25,9 @@ import { SnsSqsPermissionConsumer } from '../consumers/SnsSqsPermissionConsumer' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' import { STSClient } from '@aws-sdk/client-sts' +import { Redis } from 'ioredis' import { CreateLocateConfigMixPublisher } from '../publishers/CreateLocateConfigMixPublisher' +import { TEST_REDIS_CONFIG } from './testRedisConfig' import { TEST_AWS_CONFIG } from './testSnsConfig' export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } @@ -40,6 +42,7 @@ export const TestEvents = { ...enrichMessageSchemaWithBase( 'entity.created', z.object({ + entityId: z.string(), newData: z.string(), }), ), @@ -51,6 +54,7 @@ export const TestEvents = { ...enrichMessageSchemaWithBase( 'entity.updated', z.object({ + entityId: z.string(), updatedData: z.string(), }), ), @@ -165,6 +169,37 @@ export async function registerDependencies( }, ), + redis: asFunction( + () => { + const redisConfig = TEST_REDIS_CONFIG + + return new Redis({ + host: redisConfig.host, + db: redisConfig.db, + port: redisConfig.port, + username: redisConfig.username, + password: redisConfig.password, + connectTimeout: redisConfig.connectTimeout, + commandTimeout: redisConfig.commandTimeout, + tls: redisConfig.useTls ? {} : undefined, + maxRetriesPerRequest: null, + lazyConnect: true, // connect handled by asyncInit + }) + }, + { + asyncInitPriority: 0, // starting at the very beginning + asyncInit: 'connect', + dispose: (redis) => { + return new Promise((resolve) => { + void redis.quit((_err, result) => { + return resolve(result) + }) + }) + }, + lifetime: Lifetime.SINGLETON, + }, + ), + // vendor-specific dependencies transactionObservabilityManager: asFunction(() => { return undefined @@ -198,6 +233,7 @@ export interface Dependencies { stsClient: STSClient s3: S3 awilixManager: AwilixManager + redis: Redis // vendor-specific dependencies transactionObservabilityManager: TransactionObservabilityManager diff --git a/packages/sns/test/utils/testRedisConfig.ts b/packages/sns/test/utils/testRedisConfig.ts new file mode 100644 index 00000000..ff915ab1 --- /dev/null +++ b/packages/sns/test/utils/testRedisConfig.ts @@ -0,0 +1,12 @@ +import type { RedisConfig } from '@lokalise/node-core' + +export const TEST_REDIS_CONFIG: RedisConfig = { + host: 'localhost', + db: 0, + port: 6379, + username: undefined, + password: 'sOmE_sEcUrE_pAsS', + connectTimeout: undefined, + commandTimeout: undefined, + useTls: false, +} diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.messageDeduplication.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.messageDeduplication.spec.ts index 8d04dbe8..262dc6d8 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.messageDeduplication.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.messageDeduplication.spec.ts @@ -73,7 +73,7 @@ describe('SqsPermissionPublisher', () => { await publisher.publish(message) - const spy = await publisher.handlerSpy.waitForMessageWithId('1', 'published') + const spy = await publisher.handlerSpy.waitForMessageWithId('1') expect(spy.message).toEqual(message) expect(spy.processingResult).toBe('published') @@ -93,7 +93,7 @@ describe('SqsPermissionPublisher', () => { // Message is published for the initial call await publisher.publish(message) - const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId('1', 'published') + const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId('1') expect(spyFirstCall.message).toEqual(message) expect(spyFirstCall.processingResult).toBe('published') @@ -125,7 +125,7 @@ describe('SqsPermissionPublisher', () => { // Message 1 is published await publisher.publish(message1) - const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId('id', 'published') + const spyFirstCall = await publisher.handlerSpy.waitForMessageWithId('id') expect(spyFirstCall.message).toEqual(message1) expect(spyFirstCall.processingResult).toBe('published') @@ -135,7 +135,7 @@ describe('SqsPermissionPublisher', () => { // Message 2 is published await publisher.publish(message2) - const spySecondCall = await publisher.handlerSpy.waitForMessageWithId('id', 'published') + const spySecondCall = await publisher.handlerSpy.waitForMessageWithId('id') expect(spySecondCall.message).toEqual(message2) expect(spySecondCall.processingResult).toBe('published') }) @@ -164,10 +164,10 @@ describe('SqsPermissionPublisher', () => { timestamp: new Date().toISOString(), } satisfies PERMISSIONS_REMOVE_MESSAGE_TYPE - // Message 1 is published + // Message 1 is published for the initial call await customPublisher.publish(message1) - const spyFirstCall = await customPublisher.handlerSpy.waitForMessageWithId('id', 'published') + const spyFirstCall = await customPublisher.handlerSpy.waitForMessageWithId('id') expect(spyFirstCall.message).toEqual(message1) expect(spyFirstCall.processingResult).toBe('published') @@ -185,10 +185,10 @@ describe('SqsPermissionPublisher', () => { // Clear the spy, so we can check for the subsequent call customPublisher.handlerSpy.clear() - // Message 2 is published + // Message 2 is published for the initial call await customPublisher.publish(message2) - const spyThirdCall = await customPublisher.handlerSpy.waitForMessageWithId('id', 'published') + const spyThirdCall = await customPublisher.handlerSpy.waitForMessageWithId('id') expect(spyThirdCall.message).toEqual(message2) expect(spyThirdCall.processingResult).toBe('published') @@ -198,7 +198,7 @@ describe('SqsPermissionPublisher', () => { // Message 2 is published for the subsequent call (deduplication does not work) await customPublisher.publish(message2) - const spyFourthCall = await customPublisher.handlerSpy.waitForMessageWithId('id', 'published') + const spyFourthCall = await customPublisher.handlerSpy.waitForMessageWithId('id') expect(spyFourthCall.message).toEqual(message2) expect(spyFourthCall.processingResult).toBe('published')