Skip to content

Commit cf729c7

Browse files
committed
feat: use redis-semaphore for lock management, refactor
1 parent 49d81ee commit cf729c7

29 files changed

+799
-918
lines changed

README.md

Lines changed: 83 additions & 64 deletions
Large diffs are not rendered by default.

packages/amqp/lib/AbstractAmqpConsumer.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ export abstract class AbstractAmqpConsumer<
180180
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
181181
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) {
182182
// TODO: Add retry delay + republish message updating internal properties
183-
this.queueMessageForRetry(originalMessage)
183+
this.channel.nack(message as Message, false, true)
184184
this.handleMessageProcessed({
185185
message: parsedMessage,
186186
processingResult: 'retryLater',
@@ -295,10 +295,6 @@ export abstract class AbstractAmqpConsumer<
295295
)
296296
}
297297

298-
protected override queueMessageForRetry(message: MessagePayloadType): void {
299-
this.channel.nack(message as Message, false, true)
300-
}
301-
302298
private deserializeMessage(
303299
message: Message,
304300
): Either<'abort', ParseMessageResult<MessagePayloadType>> {

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,5 @@ export abstract class AbstractAmqpPublisher<
157157
override processMessage(): Promise<Either<'retryLater', 'success'>> {
158158
throw new Error('Not implemented for publisher')
159159
}
160-
161-
protected override queueMessageForRetry(): Promise<void> {
162-
throw new Error('Not implemented for publisher')
163-
}
164160
/* c8 ignore stop */
165161
}

packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ describe('AmqpPermissionConsumer', () => {
156156
messageId: '1',
157157
messageType: 'add',
158158
messageDeduplicationId: undefined,
159+
messageDeduplicationWindowSeconds: undefined,
159160
processingResult: 'consumed',
160161
queueName: AmqpPermissionConsumer.QUEUE_NAME,
161162
messageTimestamp: expect.any(Number),

packages/core/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ export {
7979
isOffloadedPayloadPointerPayload,
8080
} from './lib/payload-store/offloadedPayloadMessageSchemas'
8181
export {
82-
type PublisherMessageDeduplicationMessageTypeConfig,
83-
type PublisherMessageDeduplicationStore,
82+
type MessageDeduplicationStore,
8483
type MessageDeduplicationConfig,
85-
type ConsumerMessageDeduplicationMessageTypeConfig,
86-
type ConsumerMessageDeduplicationStore,
87-
ConsumerMessageDeduplicationKeyStatus,
84+
type ReleasableLock,
85+
DeduplicationRequester,
86+
AcquireLockTimeoutError,
87+
noopReleasableLock,
8888
} from './lib/message-deduplication/messageDeduplicationTypes'
Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,7 @@
11
import { z } from 'zod'
22

3-
// Private interface to provide JSDoc feature
4-
interface PublisherMessageDeduplicationMessageTypeInternal {
5-
/** How many seconds to keep the deduplication key in the store for a particular message type */
6-
deduplicationWindowSeconds: number
7-
}
8-
9-
export const PUBLISHER_MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA = z.object({
10-
deduplicationWindowSeconds: z.number().int().gt(0),
11-
})
12-
13-
export type PublisherMessageDeduplicationMessageType = z.infer<
14-
typeof PUBLISHER_MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA
15-
> &
16-
PublisherMessageDeduplicationMessageTypeInternal
17-
18-
// Private interface to provide JSDoc feature
19-
interface ConsumerMessageDeduplicationMessageTypeInternal {
20-
/** How many seconds to keep the deduplication key in the store for a particular message type after message is successfully processed */
21-
deduplicationWindowSeconds: number
22-
23-
/** How many seconds it is expected to take to process a message of a particular type */
24-
maximumProcessingTimeSeconds: number
25-
}
26-
27-
export const CONSUMER_MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA =
28-
PUBLISHER_MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA.extend({
29-
maximumProcessingTimeSeconds: z.number().int().gt(0),
30-
})
31-
32-
export type ConsumerMessageDeduplicationMessageType = z.infer<
33-
typeof CONSUMER_MESSAGE_DEDUPLICATION_MESSAGE_TYPE_SCHEMA
34-
> &
35-
ConsumerMessageDeduplicationMessageTypeInternal
3+
export const MESSAGE_DEDUPLICATION_WINDOW_SECONDS_SCHEMA = z
4+
.number()
5+
.int()
6+
.gt(0)
7+
.describe('message deduplication window in seconds')
Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,48 @@
1-
import type {
2-
ConsumerMessageDeduplicationMessageType,
3-
PublisherMessageDeduplicationMessageType,
4-
} from './messageDeduplicationSchemas'
1+
import type { Either } from '@lokalise/node-core'
52

6-
export interface PublisherMessageDeduplicationStore {
3+
export interface ReleasableLock {
4+
release(): Promise<void>
5+
}
6+
7+
export class AcquireLockTimeoutError extends Error {}
8+
9+
export interface MessageDeduplicationStore {
710
/**
811
* Stores a deduplication key in case it does not already exist.
912
* @param {string} key - deduplication key
1013
* @param {string} value - value to store
1114
* @param {number} ttlSeconds - time to live in seconds
12-
* @returns {boolean} - true if the key was stored, false if it already existed
15+
* @returns {Promise<boolean>} - true if the key was stored, false if it already existed
1316
*/
1417
setIfNotExists(key: string, value: string, ttlSeconds: number): Promise<boolean>
1518

16-
/** Retrieves value associated with deduplication key */
17-
getByKey(key: string): Promise<string | null>
18-
}
19-
20-
export type PublisherMessageDeduplicationMessageTypeConfig =
21-
PublisherMessageDeduplicationMessageType
22-
23-
export interface ConsumerMessageDeduplicationStore extends PublisherMessageDeduplicationStore {
2419
/**
25-
* Retrieves TTL of the deduplication key
26-
*
20+
* Acquires locks for a given key
2721
* @param {string} key - deduplication key
28-
* @returns {number|null} - TTL of the deduplication key in seconds or null if the key does not exist
22+
* @returns {Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>} - a promise that resolves to a ReleasableLock if the lock was acquired, AcquireLockTimeoutError error if the lock could not be acquired due to timeout, or an Error if the lock could not be acquired for another reason
2923
*/
30-
getKeyTtl(key: string): Promise<number | null>
31-
32-
/** Sets a value for the deduplication key or updates it if it already exists */
33-
setOrUpdate(key: string, value: string, ttlSeconds: number): Promise<void>
24+
acquireLock(key: string): Promise<Either<AcquireLockTimeoutError | Error, ReleasableLock>>
3425

35-
/** Deletes the deduplication key */
36-
deleteKey(key: string): Promise<void>
26+
/**
27+
* Checks if a deduplication key exists in the store
28+
* @param {string} key - deduplication key
29+
* @returns {Promise<boolean>} - true if the key exists, false otherwise
30+
*/
31+
keyExists(key: string): Promise<boolean>
3732
}
3833

39-
export type ConsumerMessageDeduplicationMessageTypeConfig = ConsumerMessageDeduplicationMessageType
40-
41-
export type MessageDeduplicationConfig<
42-
TStore extends ConsumerMessageDeduplicationStore | PublisherMessageDeduplicationStore,
43-
TConfig extends
44-
| ConsumerMessageDeduplicationMessageTypeConfig
45-
| PublisherMessageDeduplicationMessageTypeConfig,
46-
> = {
34+
export type MessageDeduplicationConfig = {
4735
/** The store to use for storage and retrieval of deduplication keys */
48-
deduplicationStore: TStore
36+
deduplicationStore: MessageDeduplicationStore
37+
}
4938

50-
/** The configuration for deduplication for each message type */
51-
messageTypeToConfigMap: Record<string, TConfig>
39+
export enum DeduplicationRequester {
40+
Consumer = 'consumer',
41+
Publisher = 'publisher',
5242
}
5343

54-
export enum ConsumerMessageDeduplicationKeyStatus {
55-
PROCESSING = 'PROCESSING',
56-
PROCESSED = 'PROCESSED',
44+
export const DEFAULT_DEDUPLICATION_WINDOW_SECONDS = 10
45+
46+
export const noopReleasableLock: ReleasableLock = {
47+
release: async () => {},
5748
}

packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export const OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA = z
99
offloadedPayloadPointer: z.string().min(1),
1010
offloadedPayloadSize: z.number().int().positive(),
1111
})
12-
// Pass-through allows to pass message ID, type and timestamp that are using dynamic keys.
12+
// Pass-through allows to pass message ID, type, timestamp and message-deduplication-related fields that are using dynamic keys.
1313
.passthrough()
1414

1515
export type OffloadedPayloadPointerPayload = z.infer<

0 commit comments

Comments
 (0)