Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class VoteConfirmationTriggerHandler extends BaseTriggerHandler<VoteWithD

return replacePlaceholders(messageTemplate, {
daoId: vote.daoId,
proposalTitle: vote.proposalTitle,
proposalTitle: vote.proposalTitle ?? undefined,
votingPower,
...(hasReason && { reason: vote.reason! })
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface OffchainProposalData {
daoId: string;
title: string;
discussion: string;
link: string;
state: string;
created: number;
}
Expand Down Expand Up @@ -35,6 +36,7 @@ export class OffchainProposalFactory {
daoId,
title: `Test Snapshot Proposal ${proposalId}`,
discussion: '',
link: '',
state: 'active',
created: futureTimestamp,
...overrides,
Expand Down
2 changes: 1 addition & 1 deletion apps/integrated-tests/src/mocks/graphql-mock-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class GraphQLMockSetup {
filtered = filtered.filter(p => p.daoId === config.headers['anticapture-dao-id']);
}
return Promise.resolve({
data: { data: { offchainProposals: { items: filtered.map(p => ({ id: p.id, title: p.title, discussion: p.discussion, state: p.state, created: p.created })), totalCount: filtered.length } } }
data: { data: { offchainProposals: { items: filtered.map(p => ({ id: p.id, title: p.title, discussion: p.discussion, link: p.link, state: p.state, created: p.created })), totalCount: filtered.length } } }
});
}

Expand Down
6 changes: 5 additions & 1 deletion apps/logic-system/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { VotingReminderTrigger } from './triggers/voting-reminder-trigger';
import { ProposalRepository } from './repositories/proposal.repository';
import { OffchainProposalRepository } from './repositories/offchain-proposal.repository';
import { VotingPowerRepository } from './repositories/voting-power.repository';
import { ThresholdRepository } from './repositories/threshold.repository';
import { VotesRepository } from './repositories/votes.repository';
import { RabbitMQDispatcherService } from './api-clients/rabbitmq-dispatcher.service';
import { AnticaptureClient } from '@notification-system/anticapture-client';
Expand Down Expand Up @@ -41,16 +42,18 @@ export class App {
const proposalRepository = new ProposalRepository(anticaptureClient);
const offchainProposalRepository = new OffchainProposalRepository(anticaptureClient);
const votingPowerRepository = new VotingPowerRepository(anticaptureClient);
const thresholdRepository = new ThresholdRepository(anticaptureClient);
const votesRepository = new VotesRepository(anticaptureClient);

this.initPromise = this.initializeRabbitMQ(rabbitmqUrl, proposalRepository, offchainProposalRepository, votingPowerRepository, votesRepository, triggerInterval, initialTimestamp);
this.initPromise = this.initializeRabbitMQ(rabbitmqUrl, proposalRepository, offchainProposalRepository, votingPowerRepository, thresholdRepository, votesRepository, triggerInterval, initialTimestamp);
}

private async initializeRabbitMQ(
rabbitmqUrl: string,
proposalRepository: ProposalRepository,
offchainProposalRepository: OffchainProposalRepository,
votingPowerRepository: VotingPowerRepository,
thresholdRepository: ThresholdRepository,
votesRepository: VotesRepository,
triggerInterval: number,
initialTimestamp?: string
Expand Down Expand Up @@ -78,6 +81,7 @@ export class App {
this.votingPowerTrigger = new VotingPowerChangedTrigger(
dispatcherService,
votingPowerRepository,
thresholdRepository,
triggerInterval
);

Expand Down
46 changes: 46 additions & 0 deletions apps/logic-system/src/repositories/threshold.repository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {
AnticaptureClient,
FeedEventType,
FeedRelevance
} from '@notification-system/anticapture-client';

interface CacheEntry {
value: string;
fetchedAt: number;
}

const ONE_DAY_MS = 86_400_000;

export class ThresholdRepository {
private cache = new Map<string, CacheEntry>();

constructor(
private readonly anticaptureClient: AnticaptureClient,
private readonly cacheTtlMs: number = ONE_DAY_MS
) {}

async getThreshold(daoId: string, type: FeedEventType): Promise<string | null> {
const cacheKey = `${daoId}:${type}`;
const cached = this.cache.get(cacheKey);

if (cached && Date.now() - cached.fetchedAt < this.cacheTtlMs) {
return cached.value;
}

try {
const threshold = await this.anticaptureClient.getEventThreshold(daoId, type, FeedRelevance.High);

if (threshold !== null) {
this.cache.set(cacheKey, { value: threshold, fetchedAt: Date.now() });
}

return threshold;
} catch (error) {
console.warn(
`[ThresholdRepository] Error fetching threshold for ${daoId}/${type}:`,
error instanceof Error ? error.message : error
);
return null;
}
}
}
34 changes: 28 additions & 6 deletions apps/logic-system/src/triggers/voting-power-changed-trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

import { Trigger } from './base-trigger';
import { VotingPowerRepository } from '../repositories/voting-power.repository';
import { ThresholdRepository } from '../repositories/threshold.repository';
import { DispatcherService, DispatcherMessage } from '../interfaces/dispatcher.interface';
import { ProcessedVotingPowerHistory } from '@notification-system/anticapture-client';
import { ProcessedVotingPowerHistory, FeedEventType } from '@notification-system/anticapture-client';

const triggerId = 'voting-power-changed';

Expand All @@ -16,6 +17,7 @@ export class VotingPowerChangedTrigger extends Trigger<ProcessedVotingPowerHisto
constructor(
private readonly dispatcherService: DispatcherService,
private readonly votingPowerRepository: VotingPowerRepository,
private readonly thresholdRepository: ThresholdRepository,
interval: number
) {
super(triggerId, interval);
Expand All @@ -40,17 +42,37 @@ export class VotingPowerChangedTrigger extends Trigger<ProcessedVotingPowerHisto
return;
}

// Always advance the timestamp cursor even if all events are filtered out,
// to avoid reprocessing the same events on every cycle
this.lastProcessedTimestamp = String(Number(data[data.length - 1].timestamp) + 1);

const filtered = await this.filterByThreshold(data);
if (filtered.length === 0) {
return;
}

const message: DispatcherMessage<ProcessedVotingPowerHistory> = {
triggerId: this.id,
events: data
events: filtered
};

await this.dispatcherService.sendMessage(message);
}

// Update the last processed timestamp to the most recent timestamp + 1 second
// Since data comes ordered by timestamp asc, the last item has the latest timestamp
// Adding 1 avoids reprocessing the same event since the API uses >= (gte) for fromDate
this.lastProcessedTimestamp = String(Number(data[data.length - 1].timestamp) + 1);
private async filterByThreshold(
data: ProcessedVotingPowerHistory[]
): Promise<ProcessedVotingPowerHistory[]> {
const keep = await Promise.all(
data.map(async (event) => {
const type = event.changeType.toUpperCase();
if (!Object.values(FeedEventType).includes(type as FeedEventType)) return true;

const threshold = await this.thresholdRepository.getThreshold(event.daoId, type as FeedEventType);
return threshold === null || Math.abs(Number(event.delta)) >= Number(threshold);
})
);

return data.filter((_, i) => keep[i]);
Comment on lines +65 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't this be added to the votes endpoint query? i.e, votes(fromDelta: <vote high relevance>)

we currently don't have it supported, but it would be a piece of cake to add it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On votes, yes!

But on this case, (voting power changes), it would not work. The flow here is:

  • fetch ALL vp changes;
  • loop -> filter considering the event type (delegation or transfer).

As the threshold could be different for each type, we cannot filter on the query. We need to get all, then filter locally. The other option would be to use "relevance" as a query parameter, then the api filter on the repo-layer (but it looks ugly).

}

/**
Expand Down
7 changes: 7 additions & 0 deletions apps/logic-system/tests/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ export const createMockVotingPowerRepository = () => ({
listVotingPowerHistory: jest.fn()
});

/**
* Creates a mocked ThresholdRepository
*/
export const createMockThresholdRepository = () => ({
getThreshold: jest.fn<() => Promise<string | null>>()
});

// Sample voting power data for tests
export const mockVotingPowerData = [
createVotingPowerHistory(),
Expand Down
102 changes: 102 additions & 0 deletions apps/logic-system/tests/threshold-repository.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { describe, it, expect, jest, beforeEach } from '@jest/globals';
import { ThresholdRepository } from '../src/repositories/threshold.repository';
import { FeedEventType, FeedRelevance } from '@notification-system/anticapture-client';

const createMockAnticaptureClient = () => ({
getEventThreshold: jest.fn<() => Promise<string | null>>()
});

describe('ThresholdRepository', () => {
let repository: ThresholdRepository;
let mockClient: ReturnType<typeof createMockAnticaptureClient>;

beforeEach(() => {
jest.clearAllMocks();
mockClient = createMockAnticaptureClient();
repository = new ThresholdRepository(mockClient as any, 300_000);
});

describe('getThreshold', () => {
it('should fetch threshold from client on cache miss', async () => {
mockClient.getEventThreshold.mockResolvedValue('40000000000000000000000');

const result = await repository.getThreshold('ENS', FeedEventType.Delegation);

expect(result).toBe('40000000000000000000000');
expect(mockClient.getEventThreshold).toHaveBeenCalledWith(
'ENS', FeedEventType.Delegation, FeedRelevance.High
);
});

it('should return cached value on cache hit', async () => {
mockClient.getEventThreshold.mockResolvedValue('40000000000000000000000');

await repository.getThreshold('ENS', FeedEventType.Delegation);
const result = await repository.getThreshold('ENS', FeedEventType.Delegation);

expect(result).toBe('40000000000000000000000');
expect(mockClient.getEventThreshold).toHaveBeenCalledTimes(1);
});

it('should cache separately per daoId and type', async () => {
mockClient.getEventThreshold
.mockResolvedValueOnce('1000')
.mockResolvedValueOnce('2000')
.mockResolvedValueOnce('3000');

const r1 = await repository.getThreshold('ENS', FeedEventType.Delegation);
const r2 = await repository.getThreshold('ENS', FeedEventType.Transfer);
const r3 = await repository.getThreshold('UNISWAP', FeedEventType.Delegation);

expect(r1).toBe('1000');
expect(r2).toBe('2000');
expect(r3).toBe('3000');
expect(mockClient.getEventThreshold).toHaveBeenCalledTimes(3);
});

it('should refetch after TTL expires', async () => {
const shortTtlRepo = new ThresholdRepository(mockClient as any, 100);
mockClient.getEventThreshold
.mockResolvedValueOnce('1000')
.mockResolvedValueOnce('2000');

const r1 = await shortTtlRepo.getThreshold('ENS', FeedEventType.Delegation);
expect(r1).toBe('1000');

await new Promise(resolve => setTimeout(resolve, 150));

const r2 = await shortTtlRepo.getThreshold('ENS', FeedEventType.Delegation);
expect(r2).toBe('2000');
expect(mockClient.getEventThreshold).toHaveBeenCalledTimes(2);
});

it('should return null when client returns null (fail-open)', async () => {
mockClient.getEventThreshold.mockResolvedValue(null);

const result = await repository.getThreshold('ENS', FeedEventType.Delegation);

expect(result).toBeNull();
});

it('should not cache null responses', async () => {
mockClient.getEventThreshold
.mockResolvedValueOnce(null)
.mockResolvedValueOnce('5000');

const r1 = await repository.getThreshold('ENS', FeedEventType.Delegation);
const r2 = await repository.getThreshold('ENS', FeedEventType.Delegation);

expect(r1).toBeNull();
expect(r2).toBe('5000');
expect(mockClient.getEventThreshold).toHaveBeenCalledTimes(2);
});

it('should return null when client throws (fail-open)', async () => {
mockClient.getEventThreshold.mockRejectedValue(new Error('Network error'));

const result = await repository.getThreshold('ENS', FeedEventType.Delegation);

expect(result).toBeNull();
});
});
});
Loading