Skip to content
Merged

Dev #220

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -845,4 +845,41 @@ describe('VotingPowerTriggerHandler - eventId deduplication', () => {
// Should send 2 notifications (one for each delegation received)
expect(sentNotifications).toHaveLength(2);
});

it('should NOT send duplicate notifications when same message is processed twice (idempotency)', async () => {
// Simulates the Logic System re-sending the same event due to cursor not advancing
// The deduplication should block the second run entirely

const walletA = '0xWalletA000000000000000000000000000000001';
const stubUser1: User = { id: 'user-1', channel: 'telegram', channel_user_id: '111', created_at: new Date() };

const { handler, sentNotifications } = createHandlerWithDeduplication({
[walletA]: [stubUser1]
});

const message: DispatcherMessage = {
triggerId: 'voting-power-changed',
events: [
{
daoId: 'test-dao',
accountId: walletA,
transactionHash: '0xDuplicateTxHash',
changeType: 'other',
delta: '1000',
logIndex: 0,
chainId: 1,
timestamp: 1234567890
}
]
};

// First run: should send notification
await handler.handleMessage(message);
const firstRunCount = sentNotifications.length;
expect(firstRunCount).toBe(1);

// Second run (same message): deduplication should block it
await handler.handleMessage(message);
expect(sentNotifications.length).toBe(firstRunCount);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
): Promise<void> {
const { daoId, accountId, sourceAccountId, delta, transactionHash, chainId, votingPower, logIndex } = votingPowerEvent;

const subscribers = await this.getNotificationSubscribers(
const { subscribers, eventId } = await this.getNotificationSubscribers(
accountId, // who receives the delegation
daoId,
transactionHash,
Expand Down Expand Up @@ -137,7 +137,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
chainId
});

await this.sendNotificationsToSubscribers(subscribers, notificationMessage, transactionHash, daoId, metadata, buttons);
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, eventId, daoId, metadata, buttons);
}

/**
Expand All @@ -160,7 +160,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
return;
}

const subscribers = await this.getNotificationSubscribers(
const { subscribers, eventId } = await this.getNotificationSubscribers(
sourceAccountId, // who MADE the delegation
daoId,
transactionHash,
Expand Down Expand Up @@ -219,7 +219,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
chainId
});

await this.sendNotificationsToSubscribers(subscribers, notificationMessage, transactionHash, daoId, metadata, buttons);
await this.sendNotificationsToSubscribers(subscribers, notificationMessage, eventId, daoId, metadata, buttons);
}

/**
Expand All @@ -232,7 +232,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
): Promise<void> {
const { daoId, accountId, changeType, transactionHash, chainId, transfer, logIndex } = votingPowerEvent;

const subscribers = await this.getNotificationSubscribers(
const { subscribers, eventId } = await this.getNotificationSubscribers(
accountId, daoId, transactionHash, logIndex, walletOwnersMap, daoSubscribersMap
);
if (subscribers.length === 0) return;
Expand All @@ -242,7 +242,7 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
: this.buildGenericNotification(votingPowerEvent);

const buttons = buildButtons({ triggerType: 'votingPowerChange', txHash: transactionHash, chainId });
await this.sendNotificationsToSubscribers(subscribers, message, transactionHash, daoId, metadata, buttons);
await this.sendNotificationsToSubscribers(subscribers, message, eventId, daoId, metadata, buttons);
}

/**
Expand Down Expand Up @@ -293,7 +293,6 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {

/**
* Shared method to get notification subscribers with deduplication
*
* eventId format: ${transactionHash}-${logIndex}-${accountId}-voting-power
*/
private async getNotificationSubscribers(
Expand All @@ -303,10 +302,12 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
logIndex: number,
walletOwnersMap: Record<string, User[]>,
daoSubscribersMap: Record<string, User[]>
): Promise<User[]> {
): Promise<{ subscribers: User[]; eventId: string }> {
const eventId = `${transactionHash}-${logIndex}-${accountId}-voting-power`;

// Get wallet owners from cache
const walletOwners = walletOwnersMap[accountId] || [];
if (walletOwners.length === 0) return [];
if (walletOwners.length === 0) return { subscribers: [], eventId };

// Get DAO subscribers from cache
const daoSubscribers = daoSubscribersMap[daoId] || [];
Expand All @@ -316,8 +317,8 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
daoSubscribers.some(sub => sub.id === owner.id)
);

if (subscribedOwners.length === 0) return [];
const eventId = `${transactionHash}-${logIndex}-${accountId}-voting-power`;
if (subscribedOwners.length === 0) return { subscribers: [], eventId };

// Check deduplication for all subscribed owners at once
const shouldSendNotifications = await this.subscriptionClient.shouldSend(
subscribedOwners,
Expand All @@ -326,10 +327,10 @@ export class VotingPowerTriggerHandler extends BaseTriggerHandler {
);

// Final filtered list of subscribers
const finalSubscribers = subscribedOwners.filter(owner =>
const subscribers = subscribedOwners.filter(owner =>
shouldSendNotifications.some(notification => notification.user_id === owner.id)
);
return finalSubscribers;
return { subscribers, eventId };
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ describe('Slack Voting Power Trigger - Integration Test', () => {
}

// Verify database records
// eventId format: ${transactionHash}-${logIndex}-${accountId}-voting-power
const expectedEventId = `0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef-0-${userWalletAddress}-voting-power`;
const notifications = await dbHelper.getNotifications();
const vpNotification = notifications.find(n =>
n.user_id === user.id &&
n.event_id === '0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef' &&
n.event_id === expectedEventId &&
n.dao_id === TEST_DAO_ID
);
expect(vpNotification).toBeDefined();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ export class VotingPowerChangedTrigger extends Trigger<ProcessedVotingPowerHisto

await this.dispatcherService.sendMessage(message);

// Update the last processed timestamp to the most recent timestamp
// Update the last processed timestamp to the most recent timestamp + 1 second
// Since data comes ordered by timestamp asc, the last item has the latest timestamp
this.lastProcessedTimestamp = data[data.length - 1].timestamp;
// Adding 1 avoids reprocessing the same event since the API uses >= (gte) for fromDate
this.lastProcessedTimestamp = String(Number(data[data.length - 1].timestamp) + 1);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions apps/logic-system/tests/voting-power-trigger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ describe('VotingPowerChangedTrigger', () => {
});
});

it('should update timestamp to the last item in array', async () => {
it('should update timestamp to the last item in array + 1 second', async () => {
await trigger.process(mockVotingPowerData);

// Should update to timestamp of last item (chronologically last)
const lastProcessed = (trigger as any).lastProcessedTimestamp;
expect(lastProcessed).toBe('1625184000'); // Last item timestamp
expect(lastProcessed).toBe('1625184001'); // Last item timestamp + 1
});

it('should handle single item correctly', async () => {
Expand All @@ -85,7 +85,7 @@ describe('VotingPowerChangedTrigger', () => {
});

const lastProcessed = (trigger as any).lastProcessedTimestamp;
expect(lastProcessed).toBe('1625097600');
expect(lastProcessed).toBe('1625097601'); // timestamp + 1
});
});

Expand All @@ -109,7 +109,7 @@ describe('VotingPowerChangedTrigger', () => {

// Verify the second call used the updated timestamp
const secondCallArgs = mockVotingPowerRepository.listVotingPowerHistory.mock.calls[1][0];
expect(secondCallArgs).toBe('1625097600'); // Timestamp from first execution
expect(secondCallArgs).toBe('1625097601'); // Timestamp from first execution
});

it('should not process when no new data available', async () => {
Expand Down