Skip to content

Commit

Permalink
feat(AP-6075): Add sns publisher and publisher manager part
Browse files Browse the repository at this point in the history
  • Loading branch information
Drodevbar committed Jan 13, 2025
1 parent 56964a6 commit 470cd49
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 10 deletions.
2 changes: 2 additions & 0 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export abstract class AbstractPublisherManager<
>
private readonly publisherDependencies: DependenciesType


protected constructor({
publisherFactory,
newPublisherOptions,
Expand Down Expand Up @@ -131,6 +132,7 @@ export abstract class AbstractPublisherManager<
this.targetToEventMap[eventTarget].push(supportedEvent)
}
}

private registerPublishers() {
for (const eventTarget in this.targetToEventMap) {
if (this.targetToPublisherMap[eventTarget]) {
Expand Down
12 changes: 12 additions & 0 deletions packages/sns/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@ services:
volumes:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
- '/var/run/docker.sock:/var/run/docker.sock'

redis:
image: redis:6.2.7-alpine
command: redis-server --requirepass sOmE_sEcUrE_pAsS
ports:
- '6379:6379'
volumes:
- redis_data:/redis/data:cached
restart: on-failure
volumes:
redis_data:
driver: local
5 changes: 5 additions & 0 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
() => calculateOutgoingMessageSize(updatedMessage),
)

if (await this.isMessageDuplicated(parsedMessage)) {
return
}

await this.sendMessage(maybeOffloadedPayloadMessage, options)
this.handleMessageProcessed(parsedMessage, 'published')
await this.deduplicateMessage(parsedMessage)
} catch (error) {
const err = error as Error
this.handleError(err)
Expand Down
214 changes: 214 additions & 0 deletions packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import {
CommonMetadataFiller,
type MessageDeduplicationKeyGenerator,
} from '@message-queue-toolkit/core'
import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store'
import { type AwilixContainer, asValue } from 'awilix'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'
import { TestEventDeduplicationKeyGenerator } from '../../test/publishers/TestEventDeduplicationKeyGenerator'
import { cleanRedis } from '../../test/utils/cleanRedis'
import type {
Dependencies,
TestEventPublishPayloadsType,
TestEventsType,
} from '../../test/utils/testContext'
import { TestEvents, registerDependencies } from '../../test/utils/testContext'
import { type CommonSnsPublisher, CommonSnsPublisherFactory } from './CommonSnsPublisherFactory'
import { SnsPublisherManager } from './SnsPublisherManager'

const TEST_DEDUPLICATION_KEY_PREFIX = 'test_key_prefix'

describe('SnsPublisherManager', () => {
let diContainer: AwilixContainer<Dependencies>
let publisherManager: SnsPublisherManager<
CommonSnsPublisher<TestEventPublishPayloadsType>,
TestEventsType
>
let messageDeduplicationStore: RedisMessageDeduplicationStore
let messageDeduplicationKeyGenerator: MessageDeduplicationKeyGenerator

beforeAll(async () => {
diContainer = await registerDependencies(
{
publisherManager: asValue(() => undefined),
},
false,
)
messageDeduplicationStore = new RedisMessageDeduplicationStore(
{
redis: diContainer.cradle.redis,
},
{ keyPrefix: TEST_DEDUPLICATION_KEY_PREFIX },
)
messageDeduplicationKeyGenerator = new TestEventDeduplicationKeyGenerator()
})

beforeEach(() => {
publisherManager = new SnsPublisherManager(diContainer.cradle, {
metadataFiller: new CommonMetadataFiller({
serviceId: 'service',
}),
publisherFactory: new CommonSnsPublisherFactory(),
newPublisherOptions: {
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
creationConfig: {
updateAttributesIfExists: true,
},
messageDeduplicationConfig: {
deduplicationStore: messageDeduplicationStore,
messageTypeToConfigMap: {
'entity.created': {
deduplicationKeyGenerator: messageDeduplicationKeyGenerator,
deduplicationWindowSeconds: 10,
},
// 'entity.update' is not configured on purpose
},
},
},
})
})

afterEach(async () => {
await cleanRedis(diContainer.cradle.redis)
})

afterAll(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

describe('publish', () => {
it('publishes a message and writes deduplication key to store when message type is configured with deduplication', async () => {
const message = {
payload: {
entityId: '1',
newData: 'msg',
},
type: 'entity.created',
} satisfies TestEventPublishPayloadsType

const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, message)

const spy = await publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessage.id)
expect(spy.processingResult).toBe('published')

const deduplicationKey = messageDeduplicationStore.retrieveKey(
messageDeduplicationKeyGenerator.generate(message),
)
expect(deduplicationKey).not.toBeNull()
})

it('does not publish the same message if deduplication key already exists', async () => {
const message = {
payload: {
entityId: '1',
newData: 'msg',
},
type: 'entity.created',
} satisfies TestEventPublishPayloadsType

// Message is published for the initial call
const publishedMessageFirstCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message,
)

const spyFirstCall = await publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessageFirstCall.id)
expect(spyFirstCall.processingResult).toBe('published')

// Clear the spy, so we can check for the subsequent call
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()

// Message is not published for the subsequent call
const publishedMessageSecondCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message,
)

const spySecondCall = publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.checkForMessage({
id: publishedMessageSecondCall.id,
})
expect(spySecondCall).toBeUndefined()
})

it('works only for event types that are configured', async () => {
const message1 = {
payload: {
entityId: '1',
newData: 'msg',
},
type: 'entity.created',
} satisfies TestEventPublishPayloadsType
const message2 = {
payload: {
entityId: '1',
updatedData: 'msg',
},
type: 'entity.updated',
} satisfies TestEventPublishPayloadsType

// Message 1 is published for the initial call
const publishedMessageFirstCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message1,
)

const spyFirstCall = await publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessageFirstCall.id)
expect(spyFirstCall.processingResult).toBe('published')

// Clear the spy, so wew can check for the subsequent call
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()

// Message 1 is not published for the subsequent call (deduplication works)
const publishedMessageSecondCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message1,
)

const spySecondCall = publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.checkForMessage({
id: publishedMessageSecondCall.id,
})
expect(spySecondCall).toBeUndefined()

// Clear the spy, so we can check for the subsequent call
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()

// Message 2 is published for the initial call
const publishedMessageThirdCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message2,
)

const spyThirdCall = await publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessageThirdCall.id)
expect(spyThirdCall.processingResult).toBe('published')

// Clear the spy, so we can check for the subsequent call
publisherManager.handlerSpy(TestEvents.created.snsTopic).clear()

// Message 2 is published for the subsequent call (deduplication does not work)
const publishedMessageFourthCall = await publisherManager.publish(
TestEvents.created.snsTopic,
message2,
)

const spyFourthCall = await publisherManager
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessageFourthCall.id)
expect(spyFourthCall.processingResult).toBe('published')
})
})
})
4 changes: 4 additions & 0 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ describe('SnsPublisherManager', () => {
// When
const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, {
payload: {
entityId: '1',
newData: 'msg',
},
type: 'entity.created',
Expand Down Expand Up @@ -78,6 +79,7 @@ describe('SnsPublisherManager', () => {
await expect(
publisherManager.publish(TestEvents.created.snsTopic, {
payload: {
entityId: '1',
// @ts-expect-error This should be causing a compilation error
updatedData: 'edwe',
},
Expand All @@ -104,6 +106,7 @@ describe('SnsPublisherManager', () => {
// @ts-expect-error Testing error scenario
type: 'dummy.type',
payload: {
entityId: '1',
newData: 'msg',
},
}),
Expand Down Expand Up @@ -157,6 +160,7 @@ describe('SnsPublisherManager', () => {
id: messageId,
type: 'entity.created',
payload: {
entityId: '1',
newData: 'msg',
},
})
Expand Down
2 changes: 2 additions & 0 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@message-queue-toolkit/core": "*",
"@message-queue-toolkit/redis-message-deduplication-store": "*",
"@message-queue-toolkit/s3-payload-store": "*",
"@message-queue-toolkit/sqs": "*",
"@types/node": "^22.7.5",
"@vitest/coverage-v8": "^2.1.5",
"awilix": "^12.0.1",
"awilix-manager": "^5.1.0",
"del-cli": "^6.0.0",
"ioredis": "^5.3.2",
"typescript": "^5.7.2",
"vitest": "^2.1.5"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { MessageDeduplicationKeyGenerator } from '@message-queue-toolkit/core'
import type {
PERMISSIONS_ADD_MESSAGE_TYPE,
PERMISSIONS_REMOVE_MESSAGE_TYPE,
} from '../consumers/userConsumerSchemas'

type PermissionMessagesType = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE

export class PermissionMessageDeduplicationKeyGenerator
implements MessageDeduplicationKeyGenerator<PermissionMessagesType>
{
generate(message: PermissionMessagesType): string {
return `${message.messageType}:${message.id}`
}
}
Loading

0 comments on commit 470cd49

Please sign in to comment.