Skip to content

Commit 972e853

Browse files
committed
feat(AP-6075): Add sns publisher and publisher manager part
1 parent 56964a6 commit 972e853

14 files changed

+481
-10
lines changed

packages/core/lib/queues/AbstractPublisherManager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export abstract class AbstractPublisherManager<
131131
this.targetToEventMap[eventTarget].push(supportedEvent)
132132
}
133133
}
134+
134135
private registerPublishers() {
135136
for (const eventTarget in this.targetToEventMap) {
136137
if (this.targetToPublisherMap[eventTarget]) {

packages/sns/docker-compose.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,15 @@ services:
1616
volumes:
1717
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
1818
- '/var/run/docker.sock:/var/run/docker.sock'
19+
20+
redis:
21+
image: redis:6.2.7-alpine
22+
command: redis-server --requirepass sOmE_sEcUrE_pAsS
23+
ports:
24+
- '6379:6379'
25+
volumes:
26+
- redis_data:/redis/data:cached
27+
restart: on-failure
28+
volumes:
29+
redis_data:
30+
driver: local

packages/sns/lib/sns/AbstractSnsPublisher.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,13 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
7575
() => calculateOutgoingMessageSize(updatedMessage),
7676
)
7777

78+
if (await this.isMessageDuplicated(parsedMessage)) {
79+
return
80+
}
81+
7882
await this.sendMessage(maybeOffloadedPayloadMessage, options)
7983
this.handleMessageProcessed(parsedMessage, 'published')
84+
await this.deduplicateMessage(parsedMessage)
8085
} catch (error) {
8186
const err = error as Error
8287
this.handleError(err)
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import {
2+
CommonMetadataFiller,
3+
type MessageDeduplicationKeyGenerator,
4+
} from '@message-queue-toolkit/core'
5+
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'
6+
import { type AwilixContainer, asValue } from 'awilix'
7+
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'
8+
import { TestEventDeduplicationKeyGenerator } from '../../test/publishers/TestEventDeduplicationKeyGenerator'
9+
import { cleanRedis } from '../../test/utils/cleanRedis'
10+
import type {
11+
Dependencies,
12+
TestEventPublishPayloadsType,
13+
TestEventsType,
14+
} from '../../test/utils/testContext'
15+
import { TestEvents, registerDependencies } from '../../test/utils/testContext'
16+
import { type CommonSnsPublisher, CommonSnsPublisherFactory } from './CommonSnsPublisherFactory'
17+
import { SnsPublisherManager } from './SnsPublisherManager'
18+
19+
const TEST_DEDUPLICATION_KEY_PREFIX = 'test_key_prefix'
20+
21+
describe('SnsPublisherManager', () => {
22+
let diContainer: AwilixContainer<Dependencies>
23+
let publisherManager: SnsPublisherManager<
24+
CommonSnsPublisher<TestEventPublishPayloadsType>,
25+
TestEventsType
26+
>
27+
let messageDeduplicationStore: RedisMessageDeduplicationStore
28+
let messageDeduplicationKeyGenerator: MessageDeduplicationKeyGenerator
29+
30+
beforeAll(async () => {
31+
diContainer = await registerDependencies(
32+
{
33+
publisherManager: asValue(() => undefined),
34+
},
35+
false,
36+
)
37+
messageDeduplicationStore = new RedisMessageDeduplicationStore(
38+
{
39+
redis: diContainer.cradle.redis,
40+
},
41+
{ keyPrefix: TEST_DEDUPLICATION_KEY_PREFIX },
42+
)
43+
messageDeduplicationKeyGenerator = new TestEventDeduplicationKeyGenerator()
44+
})
45+
46+
beforeEach(() => {
47+
publisherManager = new SnsPublisherManager(diContainer.cradle, {
48+
metadataFiller: new CommonMetadataFiller({
49+
serviceId: 'service',
50+
}),
51+
publisherFactory: new CommonSnsPublisherFactory(),
52+
newPublisherOptions: {
53+
handlerSpy: true,
54+
messageIdField: 'id',
55+
messageTypeField: 'type',
56+
creationConfig: {
57+
updateAttributesIfExists: true,
58+
},
59+
messageDeduplicationConfig: {
60+
deduplicationStore: messageDeduplicationStore,
61+
messageTypeToConfigMap: {
62+
'entity.created': {
63+
deduplicationKeyGenerator: messageDeduplicationKeyGenerator,
64+
deduplicationWindowSeconds: 10,
65+
},
66+
// 'entity.update' is not configured on purpose
67+
},
68+
},
69+
},
70+
})
71+
})
72+
73+
afterEach(async () => {
74+
await cleanRedis(diContainer.cradle.redis)
75+
})
76+
77+
afterAll(async () => {
78+
await diContainer.cradle.awilixManager.executeDispose()
79+
await diContainer.dispose()
80+
})
81+
82+
describe('publish', () => {
83+
it('publishes a message and writes deduplication key to store when message type is configured with deduplication', async () => {
84+
const message = {
85+
payload: {
86+
entityId: '1',
87+
newData: 'msg',
88+
},
89+
type: 'entity.created',
90+
} satisfies TestEventPublishPayloadsType
91+
92+
const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, message)
93+
94+
const spy = await publisherManager
95+
.handlerSpy(TestEvents.created.snsTopic)
96+
.waitForMessageWithId(publishedMessage.id)
97+
expect(spy.processingResult).toBe('published')
98+
99+
const deduplicationKey = messageDeduplicationStore.retrieveKey(
100+
messageDeduplicationKeyGenerator.generate(message),
101+
)
102+
expect(deduplicationKey).not.toBeNull()
103+
})
104+
105+
it('does not publish the same message if deduplication key already exists', async () => {
106+
const message = {
107+
payload: {
108+
entityId: '1',
109+
newData: 'msg',
110+
},
111+
type: 'entity.created',
112+
} satisfies TestEventPublishPayloadsType
113+
114+
// Message is published for the initial call
115+
const publishedMessageFirstCall = await publisherManager.publish(
116+
TestEvents.created.snsTopic,
117+
message,
118+
)
119+
120+
const spyFirstCall = await publisherManager
121+
.handlerSpy(TestEvents.created.snsTopic)
122+
.waitForMessageWithId(publishedMessageFirstCall.id)
123+
expect(spyFirstCall.processingResult).toBe('published')
124+
125+
// Clear the spy, so we can check for the subsequent call
126+
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()
127+
128+
// Message is not published for the subsequent call
129+
const publishedMessageSecondCall = await publisherManager.publish(
130+
TestEvents.created.snsTopic,
131+
message,
132+
)
133+
134+
const spySecondCall = publisherManager
135+
.handlerSpy(TestEvents.created.snsTopic)
136+
.checkForMessage({
137+
id: publishedMessageSecondCall.id,
138+
})
139+
expect(spySecondCall).toBeUndefined()
140+
})
141+
142+
it('works only for event types that are configured', async () => {
143+
const message1 = {
144+
payload: {
145+
entityId: '1',
146+
newData: 'msg',
147+
},
148+
type: 'entity.created',
149+
} satisfies TestEventPublishPayloadsType
150+
const message2 = {
151+
payload: {
152+
entityId: '1',
153+
updatedData: 'msg',
154+
},
155+
type: 'entity.updated',
156+
} satisfies TestEventPublishPayloadsType
157+
158+
// Message 1 is published for the initial call
159+
const publishedMessageFirstCall = await publisherManager.publish(
160+
TestEvents.created.snsTopic,
161+
message1,
162+
)
163+
164+
const spyFirstCall = await publisherManager
165+
.handlerSpy(TestEvents.created.snsTopic)
166+
.waitForMessageWithId(publishedMessageFirstCall.id)
167+
expect(spyFirstCall.processingResult).toBe('published')
168+
169+
// Clear the spy, so wew can check for the subsequent call
170+
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()
171+
172+
// Message 1 is not published for the subsequent call (deduplication works)
173+
const publishedMessageSecondCall = await publisherManager.publish(
174+
TestEvents.created.snsTopic,
175+
message1,
176+
)
177+
178+
const spySecondCall = publisherManager
179+
.handlerSpy(TestEvents.created.snsTopic)
180+
.checkForMessage({
181+
id: publishedMessageSecondCall.id,
182+
})
183+
expect(spySecondCall).toBeUndefined()
184+
185+
// Clear the spy, so we can check for the subsequent call
186+
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()
187+
188+
// Message 2 is published for the initial call
189+
const publishedMessageThirdCall = await publisherManager.publish(
190+
TestEvents.created.snsTopic,
191+
message2,
192+
)
193+
194+
const spyThirdCall = await publisherManager
195+
.handlerSpy(TestEvents.created.snsTopic)
196+
.waitForMessageWithId(publishedMessageThirdCall.id)
197+
expect(spyThirdCall.processingResult).toBe('published')
198+
199+
// Clear the spy, so we can check for the subsequent call
200+
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()
201+
202+
// Message 2 is published for the subsequent call (deduplication does not work)
203+
const publishedMessageFourthCall = await publisherManager.publish(
204+
TestEvents.created.snsTopic,
205+
message2,
206+
)
207+
208+
const spyFourthCall = await publisherManager
209+
.handlerSpy(TestEvents.created.snsTopic)
210+
.waitForMessageWithId(publishedMessageFourthCall.id)
211+
expect(spyFourthCall.processingResult).toBe('published')
212+
})
213+
})
214+
})

packages/sns/lib/sns/SnsPublisherManager.spec.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ describe('SnsPublisherManager', () => {
4141
// When
4242
const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, {
4343
payload: {
44+
entityId: '1',
4445
newData: 'msg',
4546
},
4647
type: 'entity.created',
@@ -78,6 +79,7 @@ describe('SnsPublisherManager', () => {
7879
await expect(
7980
publisherManager.publish(TestEvents.created.snsTopic, {
8081
payload: {
82+
entityId: '1',
8183
// @ts-expect-error This should be causing a compilation error
8284
updatedData: 'edwe',
8385
},
@@ -104,6 +106,7 @@ describe('SnsPublisherManager', () => {
104106
// @ts-expect-error Testing error scenario
105107
type: 'dummy.type',
106108
payload: {
109+
entityId: '1',
107110
newData: 'msg',
108111
},
109112
}),
@@ -157,6 +160,7 @@ describe('SnsPublisherManager', () => {
157160
id: messageId,
158161
type: 'entity.created',
159162
payload: {
163+
entityId: '1',
160164
newData: 'msg',
161165
},
162166
})

packages/sns/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@
4444
"@biomejs/biome": "1.9.4",
4545
"@kibertoad/biome-config": "^1.2.1",
4646
"@message-queue-toolkit/core": "*",
47+
"@message-queue-toolkit/redis-message-deduplication-store": "*",
4748
"@message-queue-toolkit/s3-payload-store": "*",
4849
"@message-queue-toolkit/sqs": "*",
4950
"@types/node": "^22.7.5",
5051
"@vitest/coverage-v8": "^2.1.5",
5152
"awilix": "^12.0.1",
5253
"awilix-manager": "^5.1.0",
5354
"del-cli": "^6.0.0",
55+
"ioredis": "^5.3.2",
5456
"typescript": "^5.7.2",
5557
"vitest": "^2.1.5"
5658
},
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import type { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core'
2+
import type {
3+
PERMISSIONS_ADD_MESSAGE_TYPE,
4+
PERMISSIONS_REMOVE_MESSAGE_TYPE,
5+
} from '../consumers/userConsumerSchemas'
6+
7+
type PermissionMessagesType = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
8+
9+
export class PermissionMessageDeduplicationKeyGenerator
10+
implements MessageDeduplicationKeyGenerator<PermissionMessagesType>
11+
{
12+
generate(message: PermissionMessagesType): string {
13+
return `${message.messageType}:${message.id}`
14+
}
15+
}

0 commit comments

Comments
 (0)