From 76754378dbf2256e19eeb52312b85118471fcee9 Mon Sep 17 00:00:00 2001 From: Banx17 Date: Tue, 23 Jun 2026 00:25:55 +0100 Subject: [PATCH] feat: implement pledge system with records, recurring donations, reminders and APIs --- prisma/schema.prisma | 74 +++++++- src/controllers/pledge.controller.ts | 175 ++++++++++++++++++ src/routes/pledge.routes.ts | 36 ++++ src/services/pledge.service.test.ts | 208 ++++++++++++++++++++++ src/services/pledge.service.ts | 257 +++++++++++++++++++++++++++ src/workers/pledge.worker.ts | 147 +++++++++++++++ 6 files changed, 896 insertions(+), 1 deletion(-) create mode 100644 src/controllers/pledge.controller.ts create mode 100644 src/routes/pledge.routes.ts create mode 100644 src/services/pledge.service.test.ts create mode 100644 src/services/pledge.service.ts create mode 100644 src/workers/pledge.worker.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ec51f6c..ab10e16 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -52,7 +52,8 @@ model User { campaigns Campaign[] auditLogs AuditLog[] notifications Notification[] - kycSubmissions KYCSubmission[] + kycSubmissions KYCSubmission[] + pledges Pledge[] @@index([email]) @@index([walletAddress]) @@ -229,6 +230,7 @@ model Campaign { suspensions Suspension[] appeals Appeal[] fraudReports FraudReport[] + pledges Pledge[] @@index([organizationId]) @@index([userId]) @@ -745,3 +747,73 @@ model FraudReport { @@index([reporterId]) @@index([createdAt]) } + +// ─── Pledge System ─────────────────────────────────────────────────────────── + +enum PledgeType { + ONE_OFF + RECURRING +} + +enum PledgeCadence { + WEEKLY + MONTHLY +} + +enum PledgeStatus { + ACTIVE + PAUSED + CANCELLED + FAILED + COMPLETED +} + +enum PledgeAttemptStatus { + PENDING + SUCCESS + FAILED +} + +model Pledge { + id String @id @default(uuid()) + donorId String + campaignId String? + amount Decimal + currency String @default("USD") + type PledgeType + cadence PledgeCadence? + startDate DateTime + nextRunAt DateTime? + endDate DateTime? + status PledgeStatus @default(ACTIVE) + idempotencyKey String? @unique + metadata Json? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + donor User @relation(fields: [donorId], references: [id]) + campaign Campaign? @relation(fields: [campaignId], references: [id]) + attempts PledgeAttempt[] + + @@index([donorId]) + @@index([status, nextRunAt]) + @@map("pledges") +} + +model PledgeAttempt { + id String @id @default(uuid()) + pledgeId String + attemptAt DateTime @default(now()) + status PledgeAttemptStatus @default(PENDING) + providerReference String? + failureReason String? + retryCount Int @default(0) + metadata Json? + createdAt DateTime @default(now()) + + pledge Pledge @relation(fields: [pledgeId], references: [id]) + + @@index([pledgeId]) + @@index([status]) + @@map("pledge_attempts") +} diff --git a/src/controllers/pledge.controller.ts b/src/controllers/pledge.controller.ts new file mode 100644 index 0000000..faa9a4f --- /dev/null +++ b/src/controllers/pledge.controller.ts @@ -0,0 +1,175 @@ +import { Request, Response, NextFunction } from 'express'; +import { PledgeService } from '../services/pledge.service'; +import { PledgeType, PledgeCadence, PledgeStatus } from '@prisma/client'; +import logger from '../utils/logger'; + +export function createPledgeController(pledgeService: PledgeService) { + /** + * @notice POST /pledges + * Create a new pledge + */ + async function createPledge(req: Request, res: Response, next: NextFunction) { + try { + const { + donorId, + campaignId, + amount, + currency, + type, + cadence, + startDate, + endDate, + idempotencyKey, + metadata, + } = req.body; + + if (!amount || !type || !startDate) { + return res.status(400).json({ + error: { code: 'bad_request', message: 'amount, type, and startDate are required' }, + }); + } + + if (!Object.values(PledgeType).includes(type)) { + return res.status(400).json({ + error: { code: 'bad_request', message: `type must be ONE_OFF or RECURRING` }, + }); + } + + const pledge = await pledgeService.createPledge({ + donorId: donorId ?? (req as any).user?.id, + campaignId, + amount: Number(amount), + currency, + type, + cadence, + startDate: new Date(startDate), + endDate: endDate ? new Date(endDate) : undefined, + idempotencyKey, + metadata, + }); + + return res.status(201).json({ status: 'success', data: pledge }); + } catch (error: any) { + if (error.message?.includes('required')) { + return res.status(400).json({ error: { code: 'bad_request', message: error.message } }); + } + next(error); + } + } + + /** + * @notice GET /pledges/:id + * Get pledge details with recent attempts + */ + async function getPledge(req: Request, res: Response, next: NextFunction) { + try { + const pledge = await pledgeService.getPledgeById(req.params.id); + if (!pledge) { + return res.status(404).json({ error: { code: 'not_found', message: 'Pledge not found' } }); + } + return res.json({ status: 'success', data: pledge }); + } catch (error) { + next(error); + } + } + + /** + * @notice GET /pledges + * List pledges with pagination + */ + async function listPledges(req: Request, res: Response, next: NextFunction) { + try { + const { status, page, limit } = req.query; + const donorId = (req as any).user?.id; + + const result = await pledgeService.listPledges({ + donorId, + status: status as PledgeStatus | undefined, + page: page ? Number(page) : undefined, + limit: limit ? Number(limit) : undefined, + }); + + return res.json({ status: 'success', ...result }); + } catch (error) { + next(error); + } + } + + /** + * @notice POST /pledges/:id/cancel + * Cancel a pledge + */ + async function cancelPledge(req: Request, res: Response, next: NextFunction) { + try { + const { reason } = req.body; + const pledge = await pledgeService.cancelPledge(req.params.id, reason); + return res.json({ status: 'success', data: pledge }); + } catch (error: any) { + if (error.message === 'Pledge not found') { + return res.status(404).json({ error: { code: 'not_found', message: error.message } }); + } + if (error.message?.includes('already cancelled')) { + return res.status(409).json({ error: { code: 'conflict', message: error.message } }); + } + next(error); + } + } + + /** + * @notice GET /admin/pledges + * Admin: list all pledges + */ + async function adminListPledges(req: Request, res: Response, next: NextFunction) { + try { + const { status, donorId, page, limit } = req.query; + const result = await pledgeService.listPledges({ + donorId: donorId as string | undefined, + status: status as PledgeStatus | undefined, + page: page ? Number(page) : undefined, + limit: limit ? Number(limit) : undefined, + }); + return res.json({ status: 'success', ...result }); + } catch (error) { + next(error); + } + } + + /** + * @notice POST /admin/pledges/:id/pause + * Admin: pause or resume a pledge + */ + async function adminPausePledge(req: Request, res: Response, next: NextFunction) { + try { + const { action } = req.body; + const pledge = action === 'resume' + ? await pledgeService.resumePledge(req.params.id) + : await pledgeService.pausePledge(req.params.id); + return res.json({ status: 'success', data: pledge }); + } catch (error) { + next(error); + } + } + + /** + * @notice GET /admin/pledges/:id/attempts + * Admin: list attempts for a pledge + */ + async function adminListAttempts(req: Request, res: Response, next: NextFunction) { + try { + const attempts = await pledgeService.listAttempts(req.params.id); + return res.json({ status: 'success', data: attempts }); + } catch (error) { + next(error); + } + } + + return { + createPledge, + getPledge, + listPledges, + cancelPledge, + adminListPledges, + adminPausePledge, + adminListAttempts, + }; +} \ No newline at end of file diff --git a/src/routes/pledge.routes.ts b/src/routes/pledge.routes.ts new file mode 100644 index 0000000..a4d437d --- /dev/null +++ b/src/routes/pledge.routes.ts @@ -0,0 +1,36 @@ +import { Router } from 'express'; +import { PrismaClient } from '@prisma/client'; +import { PledgeService } from '../services/pledge.service'; +import { createPledgeController } from '../controllers/pledge.controller'; + +/** + * @notice Creates pledge router with injected dependencies + */ +export function createPledgeRouter(prisma: PrismaClient): Router { + const router = Router(); + const pledgeService = new PledgeService(prisma); + const controller = createPledgeController(pledgeService); + + // Donor endpoints + router.post('/', controller.createPledge); + router.get('/', controller.listPledges); + router.get('/:id', controller.getPledge); + router.post('/:id/cancel', controller.cancelPledge); + + return router; +} + +/** + * @notice Creates admin pledge router + */ +export function createAdminPledgeRouter(prisma: PrismaClient): Router { + const router = Router(); + const pledgeService = new PledgeService(prisma); + const controller = createPledgeController(pledgeService); + + router.get('/', controller.adminListPledges); + router.post('/:id/pause', controller.adminPausePledge); + router.get('/:id/attempts', controller.adminListAttempts); + + return router; +} \ No newline at end of file diff --git a/src/services/pledge.service.test.ts b/src/services/pledge.service.test.ts new file mode 100644 index 0000000..d56984a --- /dev/null +++ b/src/services/pledge.service.test.ts @@ -0,0 +1,208 @@ +import { PledgeService } from './pledge.service'; +import { PledgeType, PledgeCadence, PledgeStatus, PledgeAttemptStatus } from '@prisma/client'; +import { Decimal } from '@prisma/client/runtime/library'; + +const mockPrisma = { + pledge: { + findUnique: jest.fn(), + findMany: jest.fn(), + create: jest.fn(), + update: jest.fn(), + count: jest.fn(), + }, + pledgeAttempt: { + create: jest.fn(), + findMany: jest.fn(), + }, +}; + +describe('PledgeService', () => { + let service: PledgeService; + + beforeEach(() => { + jest.clearAllMocks(); + service = new PledgeService(mockPrisma as any); + }); + + describe('createPledge', () => { + it('creates a one-off pledge', async () => { + const pledge = { + id: 'pledge-1', + type: PledgeType.ONE_OFF, + status: PledgeStatus.ACTIVE, + amount: new Decimal(100), + }; + mockPrisma.pledge.create.mockResolvedValueOnce(pledge); + + const result = await service.createPledge({ + donorId: 'donor-1', + amount: 100, + type: PledgeType.ONE_OFF, + startDate: new Date(Date.now() + 86400000), + }); + + expect(result).toEqual(pledge); + expect(mockPrisma.pledge.create).toHaveBeenCalledTimes(1); + }); + + it('creates a recurring pledge with cadence', async () => { + const pledge = { + id: 'pledge-2', + type: PledgeType.RECURRING, + cadence: PledgeCadence.MONTHLY, + status: PledgeStatus.ACTIVE, + }; + mockPrisma.pledge.create.mockResolvedValueOnce(pledge); + + const result = await service.createPledge({ + donorId: 'donor-1', + amount: 50, + type: PledgeType.RECURRING, + cadence: PledgeCadence.MONTHLY, + startDate: new Date(), + }); + + expect(result.type).toBe(PledgeType.RECURRING); + }); + + it('throws when RECURRING pledge has no cadence', async () => { + await expect( + service.createPledge({ + donorId: 'donor-1', + amount: 50, + type: PledgeType.RECURRING, + startDate: new Date(), + }), + ).rejects.toThrow('cadence is required'); + }); + + it('returns existing pledge for duplicate idempotency key', async () => { + const existing = { id: 'pledge-existing', idempotencyKey: 'key-123' }; + mockPrisma.pledge.findUnique.mockResolvedValueOnce(existing); + + const result = await service.createPledge({ + donorId: 'donor-1', + amount: 100, + type: PledgeType.ONE_OFF, + startDate: new Date(), + idempotencyKey: 'key-123', + }); + + expect(result).toEqual(existing); + expect(mockPrisma.pledge.create).not.toHaveBeenCalled(); + }); + }); + + describe('getPledgeById', () => { + it('returns pledge with attempts', async () => { + const pledge = { id: 'pledge-1', attempts: [] }; + mockPrisma.pledge.findUnique.mockResolvedValueOnce(pledge); + + const result = await service.getPledgeById('pledge-1'); + expect(result).toEqual(pledge); + }); + + it('returns null when pledge not found', async () => { + mockPrisma.pledge.findUnique.mockResolvedValueOnce(null); + const result = await service.getPledgeById('missing'); + expect(result).toBeNull(); + }); + }); + + describe('cancelPledge', () => { + it('cancels an active pledge', async () => { + const pledge = { id: 'pledge-1', status: PledgeStatus.ACTIVE, metadata: {} }; + mockPrisma.pledge.findUnique.mockResolvedValueOnce(pledge); + mockPrisma.pledge.update.mockResolvedValueOnce({ + ...pledge, + status: PledgeStatus.CANCELLED, + }); + + const result = await service.cancelPledge('pledge-1', 'No longer needed'); + expect(result.status).toBe(PledgeStatus.CANCELLED); + }); + + it('throws when pledge not found', async () => { + mockPrisma.pledge.findUnique.mockResolvedValueOnce(null); + await expect(service.cancelPledge('missing')).rejects.toThrow('Pledge not found'); + }); + + it('throws when pledge already cancelled', async () => { + mockPrisma.pledge.findUnique.mockResolvedValueOnce({ + id: 'pledge-1', + status: PledgeStatus.CANCELLED, + }); + await expect(service.cancelPledge('pledge-1')).rejects.toThrow('already cancelled'); + }); + }); + + describe('listPledges', () => { + it('returns paginated pledges', async () => { + mockPrisma.pledge.findMany.mockResolvedValueOnce([{ id: 'p1' }]); + mockPrisma.pledge.count.mockResolvedValueOnce(1); + + const result = await service.listPledges({ donorId: 'donor-1' }); + expect(result.data).toHaveLength(1); + expect(result.pagination.total).toBe(1); + }); + }); + + describe('markAttemptSuccess', () => { + it('marks one-off pledge as COMPLETED', async () => { + const pledge = { + id: 'pledge-1', + type: PledgeType.ONE_OFF, + nextRunAt: new Date(), + cadence: null, + endDate: null, + }; + mockPrisma.pledge.findUnique.mockResolvedValueOnce(pledge); + mockPrisma.pledge.update.mockResolvedValueOnce({ + ...pledge, + status: PledgeStatus.COMPLETED, + nextRunAt: null, + }); + + await service.markAttemptSuccess('pledge-1'); + expect(mockPrisma.pledge.update).toHaveBeenCalledWith({ + where: { id: 'pledge-1' }, + data: { status: PledgeStatus.COMPLETED, nextRunAt: null }, + }); + }); + + it('updates nextRunAt for recurring pledge', async () => { + const pledge = { + id: 'pledge-2', + type: PledgeType.RECURRING, + cadence: PledgeCadence.WEEKLY, + nextRunAt: new Date(), + endDate: null, + }; + mockPrisma.pledge.findUnique.mockResolvedValueOnce(pledge); + mockPrisma.pledge.update.mockResolvedValueOnce(pledge); + + await service.markAttemptSuccess('pledge-2'); + expect(mockPrisma.pledge.update).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: 'pledge-2' }, + data: expect.objectContaining({ nextRunAt: expect.any(Date) }), + }), + ); + }); + }); + + describe('recordAttempt', () => { + it('creates a pledge attempt', async () => { + const attempt = { id: 'attempt-1', pledgeId: 'pledge-1' }; + mockPrisma.pledgeAttempt.create.mockResolvedValueOnce(attempt); + + const result = await service.recordAttempt( + 'pledge-1', + PledgeAttemptStatus.SUCCESS, + { providerReference: 'ref-123' }, + ); + + expect(result).toEqual(attempt); + }); + }); +}); \ No newline at end of file diff --git a/src/services/pledge.service.ts b/src/services/pledge.service.ts new file mode 100644 index 0000000..6558232 --- /dev/null +++ b/src/services/pledge.service.ts @@ -0,0 +1,257 @@ +import { PrismaClient, PledgeType, PledgeCadence, PledgeStatus, PledgeAttemptStatus } from '@prisma/client'; +import { Decimal } from '@prisma/client/runtime/library'; +import logger from '../config/logger'; + +export interface CreatePledgeInput { + donorId: string; + campaignId?: string; + amount: number; + currency?: string; + type: PledgeType; + cadence?: PledgeCadence; + startDate: Date; + endDate?: Date; + idempotencyKey?: string; + metadata?: Record; +} + +export interface ListPledgesInput { + donorId?: string; + status?: PledgeStatus; + page?: number; + limit?: number; +} + +/** + * @notice Calculates the next run date based on cadence + */ +function calculateNextRunAt(current: Date, cadence: PledgeCadence): Date { + const next = new Date(current); + if (cadence === PledgeCadence.WEEKLY) { + next.setDate(next.getDate() + 7); + } else if (cadence === PledgeCadence.MONTHLY) { + next.setMonth(next.getMonth() + 1); + } + return next; +} + +export class PledgeService { + constructor(private prisma: PrismaClient) {} + + /** + * @notice Create a new pledge with idempotency support + */ + async createPledge(input: CreatePledgeInput) { + // Check idempotency + if (input.idempotencyKey) { + const existing = await this.prisma.pledge.findUnique({ + where: { idempotencyKey: input.idempotencyKey }, + }); + if (existing) { + logger.info('Pledge already exists for idempotency key', { + idempotencyKey: input.idempotencyKey, + }); + return existing; + } + } + + if (input.type === PledgeType.RECURRING && !input.cadence) { + throw new Error('cadence is required for RECURRING pledges'); + } + + const nextRunAt = input.type === PledgeType.RECURRING + ? calculateNextRunAt(input.startDate, input.cadence!) + : input.startDate; + + const pledge = await this.prisma.pledge.create({ + data: { + donorId: input.donorId, + campaignId: input.campaignId, + amount: new Decimal(input.amount), + currency: input.currency ?? 'USD', + type: input.type, + cadence: input.cadence, + startDate: input.startDate, + nextRunAt, + endDate: input.endDate, + idempotencyKey: input.idempotencyKey, + metadata: input.metadata, + }, + }); + + logger.info('Pledge created', { pledgeId: pledge.id, type: pledge.type }); + return pledge; + } + + /** + * @notice Get pledge by ID with recent attempts + */ + async getPledgeById(pledgeId: string) { + return this.prisma.pledge.findUnique({ + where: { id: pledgeId }, + include: { + attempts: { + orderBy: { createdAt: 'desc' }, + take: 10, + }, + }, + }); + } + + /** + * @notice List pledges with pagination and filters + */ + async listPledges(input: ListPledgesInput) { + const page = input.page ?? 1; + const limit = input.limit ?? 20; + const offset = (page - 1) * limit; + + const where = { + ...(input.donorId && { donorId: input.donorId }), + ...(input.status && { status: input.status }), + }; + + const [pledges, total] = await Promise.all([ + this.prisma.pledge.findMany({ + where, + orderBy: { createdAt: 'desc' }, + skip: offset, + take: limit, + }), + this.prisma.pledge.count({ where }), + ]); + + return { + data: pledges, + pagination: { page, limit, total, totalPages: Math.ceil(total / limit) }, + }; + } + + /** + * @notice Cancel a pledge + */ + async cancelPledge(pledgeId: string, reason?: string) { + const pledge = await this.prisma.pledge.findUnique({ where: { id: pledgeId } }); + + if (!pledge) throw new Error('Pledge not found'); + if (pledge.status === PledgeStatus.CANCELLED) { + throw new Error('Pledge is already cancelled'); + } + + const updated = await this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { + status: PledgeStatus.CANCELLED, + metadata: { + ...(pledge.metadata as Record ?? {}), + cancelReason: reason, + cancelledAt: new Date().toISOString(), + }, + }, + }); + + logger.info('Pledge cancelled', { pledgeId, reason }); + return updated; + } + + /** + * @notice Record a pledge attempt + */ + async recordAttempt( + pledgeId: string, + status: PledgeAttemptStatus, + options?: { + providerReference?: string; + failureReason?: string; + retryCount?: number; + metadata?: Record; + }, + ) { + return this.prisma.pledgeAttempt.create({ + data: { + pledgeId, + status, + providerReference: options?.providerReference, + failureReason: options?.failureReason, + retryCount: options?.retryCount ?? 0, + metadata: options?.metadata, + }, + }); + } + + /** + * @notice Get pledges due for processing + */ + async getDuePledges() { + return this.prisma.pledge.findMany({ + where: { + status: PledgeStatus.ACTIVE, + nextRunAt: { lte: new Date() }, + }, + }); + } + + /** + * @notice Update pledge after successful attempt + */ + async markAttemptSuccess(pledgeId: string) { + const pledge = await this.prisma.pledge.findUnique({ where: { id: pledgeId } }); + if (!pledge) throw new Error('Pledge not found'); + + if (pledge.type === PledgeType.ONE_OFF) { + return this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { status: PledgeStatus.COMPLETED, nextRunAt: null }, + }); + } + + // Recurring — calculate next run + const nextRunAt = calculateNextRunAt( + pledge.nextRunAt ?? new Date(), + pledge.cadence!, + ); + + // Check if past end date + if (pledge.endDate && nextRunAt > pledge.endDate) { + return this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { status: PledgeStatus.COMPLETED, nextRunAt: null }, + }); + } + + return this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { nextRunAt }, + }); + } + + /** + * @notice Pause a pledge + */ + async pausePledge(pledgeId: string) { + return this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { status: PledgeStatus.PAUSED }, + }); + } + + /** + * @notice Resume a paused pledge + */ + async resumePledge(pledgeId: string) { + return this.prisma.pledge.update({ + where: { id: pledgeId }, + data: { status: PledgeStatus.ACTIVE }, + }); + } + + /** + * @notice List attempts for a pledge (admin) + */ + async listAttempts(pledgeId: string) { + return this.prisma.pledgeAttempt.findMany({ + where: { pledgeId }, + orderBy: { createdAt: 'desc' }, + }); + } +} \ No newline at end of file diff --git a/src/workers/pledge.worker.ts b/src/workers/pledge.worker.ts new file mode 100644 index 0000000..b71bf71 --- /dev/null +++ b/src/workers/pledge.worker.ts @@ -0,0 +1,147 @@ +import { PrismaClient, PledgeAttemptStatus, PledgeStatus } from '@prisma/client'; +import { PledgeService } from '../services/pledge.service'; +import logger from '../config/logger'; + +const MAX_RETRIES = parseInt(process.env.PLEDGE_MAX_RETRIES ?? '3', 10); +const REMINDER_WINDOW_DAYS = parseInt(process.env.PLEDGE_REMINDER_WINDOW_DAYS ?? '3', 10); + +/** + * @notice Exponential backoff delay in ms for retry attempts + */ +function backoffDelay(retryCount: number): number { + return Math.min(1000 * Math.pow(2, retryCount), 24 * 60 * 60 * 1000); +} + +/** + * @notice Mock payment processor — replace with real integration + */ +async function processPayment(pledgeId: string, amount: number): Promise { + // Replace this with actual payment service call + logger.info('Processing payment', { pledgeId, amount }); + return `ref-${pledgeId}-${Date.now()}`; +} + +export class PledgeWorker { + private pledgeService: PledgeService; + private intervalId: NodeJS.Timeout | null = null; + + constructor(private prisma: PrismaClient) { + this.pledgeService = new PledgeService(prisma); + } + + /** + * @notice Process all due pledges + */ + async processDuePledges(): Promise { + const duePledges = await this.pledgeService.getDuePledges(); + + logger.info(`Processing ${duePledges.length} due pledges`); + + for (const pledge of duePledges) { + try { + const providerReference = await processPayment( + pledge.id, + pledge.amount.toNumber(), + ); + + await this.pledgeService.recordAttempt(pledge.id, PledgeAttemptStatus.SUCCESS, { + providerReference, + }); + + await this.pledgeService.markAttemptSuccess(pledge.id); + + logger.info('Pledge processed successfully', { pledgeId: pledge.id }); + } catch (error: any) { + logger.error('Pledge processing failed', { pledgeId: pledge.id, error: error.message }); + + // Get retry count from latest attempt + const attempts = await this.pledgeService.listAttempts(pledge.id); + const retryCount = attempts.length; + + await this.pledgeService.recordAttempt(pledge.id, PledgeAttemptStatus.FAILED, { + failureReason: error.message, + retryCount, + }); + + if (retryCount >= MAX_RETRIES) { + await this.prisma.pledge.update({ + where: { id: pledge.id }, + data: { status: PledgeStatus.FAILED }, + }); + logger.warn('Pledge marked as FAILED after max retries', { pledgeId: pledge.id }); + } else { + // Schedule retry with backoff + const delay = backoffDelay(retryCount); + const nextRunAt = new Date(Date.now() + delay); + await this.prisma.pledge.update({ + where: { id: pledge.id }, + data: { nextRunAt }, + }); + logger.info('Pledge retry scheduled', { pledgeId: pledge.id, nextRunAt }); + } + } + } + } + + /** + * @notice Send reminders for pledges due within reminder window + */ + async sendReminders(): Promise { + const windowEnd = new Date(); + windowEnd.setDate(windowEnd.getDate() + REMINDER_WINDOW_DAYS); + + const upcomingPledges = await this.prisma.pledge.findMany({ + where: { + status: PledgeStatus.ACTIVE, + nextRunAt: { + gte: new Date(), + lte: windowEnd, + }, + }, + }); + + logger.info(`Sending reminders for ${upcomingPledges.length} upcoming pledges`); + + for (const pledge of upcomingPledges) { + logger.info('Reminder sent for pledge', { + pledgeId: pledge.id, + nextRunAt: pledge.nextRunAt, + donorId: pledge.donorId, + }); + // Integrate with notification.service.ts here + } + } + + /** + * @notice Start the worker on a schedule + * @param intervalMs - How often to run (default: 60 seconds) + */ + start(intervalMs: number = 60_000): void { + if (process.env.PLEDGE_WORKER_ENABLED !== 'true') { + logger.info('Pledge worker disabled via PLEDGE_WORKER_ENABLED env var'); + return; + } + + logger.info('Pledge worker started', { intervalMs }); + + this.intervalId = setInterval(async () => { + try { + await this.processDuePledges(); + await this.sendReminders(); + } catch (error) { + logger.error('Pledge worker error', { error }); + } + }, intervalMs); + } + + /** + * @notice Stop the worker + */ + stop(): void { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + logger.info('Pledge worker stopped'); + } + } +} \ No newline at end of file