diff --git a/.env.demo b/.env.demo index 786570c4..aea23746 100644 --- a/.env.demo +++ b/.env.demo @@ -88,3 +88,33 @@ INDICIO_TEST_GENESIS='{"reqSignature":{},"txn":{"data":{"data":{"alias":"OpsNode {"reqSignature":{},"txn":{"data":{"data":{"alias":"idlab-node01","blskey":"2fjJVi33U1tCTjW77cJaf1NLz7EzWkVNzR9BEQpVVK64MJpRKNUzt6k7Td2U8yqU5hGyAFH5N7ZymSB55TnpC3rJYLVTcGXZeXpmrQx3mwnXNyfTDnxfTpdQ1KMoFeZoDPZ8acfaH8GWeW2jL1qREE52tetBf4tXTeshmWzGkEN7r4y","blskey_pop":"RSjiM6dYUmN2rv2ca7dUCmEKrivq12rhxhXUKHdmSwUxbCmcijsgoERjYG7MqxhKLjSAJ5715K23fVEc6uK1kTenKmYCcCts8MLMAQG8Upb22nfgHJ3py8RwRoACeAjFF3myAMNRJJPhUdv96drJdwkGRv7f6JjvoB5KWVQYTNgheP","client_ip":"205.159.92.17","client_port":"9702","node_ip":"205.159.92.16","node_port":"9701","services":["VALIDATOR"]},"dest":"8czYgwmLDazVrBHuo53Tyx7Tw8ZhvnoC2BfhQGir4r8F"},"metadata":{"from":"PN8wFxLKjdkwyxoEEXwyz2"},"type":"0"},"txnMetadata":{"seqNo":5,"txnId":"9237eca7d2a203f6e1779f63064d2f22cf28e1bcd4e6fe5d791b15e82969acdc"},"ver":"1"} {"reqSignature":{},"txn":{"data":{"data":{"alias":"lorica-identity-node1","blskey":"wUh24sVCQ8PHDgSb343g2eLxjD5vwxsrETfuV2sbwMNnYon9nhbaK5jcWTekvXtyiwxHxuiCCoZwKS97MQEAeC2oLbbMeKjYm212QwSnm7aKLEqTStXht35VqZvZLT7Q3mPQRYLjMGixdn4ocNHrBTMwPUQYycEqwaHWgE1ncDueXY","blskey_pop":"R2sMwF7UW6AaD4ALa1uB1YVPuP6JsdJ7LsUoViM9oySFqFt34C1x1tdHDysS9wwruzaaEFui6xNPqJ8eu3UBqcFKkoWhdsMqCALwe63ytxPwvtLtCffJLhHAcgrPC7DorXYdqhdG2cevdqc5oqFEAaKoFDBf12p5SsbbM4PYWCmVCb","client_ip":"35.225.220.151","client_port":"9702","node_ip":"35.224.26.110","node_port":"9701","services":["VALIDATOR"]},"dest":"k74ZsZuUaJEcB8RRxMwkCwdE5g1r9yzA3nx41qvYqYf"},"metadata":{"from":"Ex6hzsJFYzNJ7kzbfncNeU"},"type":"0"},"txnMetadata":{"seqNo":6,"txnId":"6880673ce4ae4a2352f103d2a6ae20469dd070f2027283a1da5e62a64a59d688"},"ver":"1"} {"reqSignature":{},"txn":{"data":{"data":{"alias":"cysecure-itn","blskey":"GdCvMLkkBYevRFi93b6qaj9G2u1W6Vnbg8QhRD1chhrWR8vRE8x9x7KXVeUBPFf6yW5qq2JCfA2frc8SGni2RwjtTagezfwAwnorLhVJqS5ZxTi4pgcw6smebnt4zWVhTkh6ugDHEypHwNQBcw5WhBZcEJKgNbyVLnHok9ob6cfr3u","blskey_pop":"RbH9mY7M5p3UB3oj4sT1skYwMkxjoUnja8eTYfcm83VcNbxC9zR9pCiRhk4q1dJT3wkDBPGNKnk2p83vaJYLcgMuJtzoWoJAWAxjb3Mcq8Agf6cgQpBuzBq2uCzFPuQCAhDS4Kv9iwA6FsRnfvoeFTs1hhgSJVxQzDWMVTVAD9uCqu","client_ip":"35.169.19.171","client_port":"9702","node_ip":"54.225.56.21","node_port":"9701","services":["VALIDATOR"]},"dest":"4ETBDmHzx8iDQB6Xygmo9nNXtMgq9f6hxGArNhQ6Hh3u"},"metadata":{"from":"uSXXXEdBicPHMMhr3ddNF"},"type":"0"},"txnMetadata":{"seqNo":7,"txnId":"3c21718b07806b2f193b35953dda5b68b288efd551dce4467ce890703d5ba549"},"ver":"1"}' + +# ─── Data Purge — Record Deletion ──────────────────────────────────────────── +# Automatically deletes credential/proof/session records after a configurable TTL. +# Two independent flows — enable one or both: +# Flow A (NATS): schedules deletion at offer-creation time via NATS JetStream +# Flow B (Cron): periodically scans DB and deletes records older than TTL + +PURGE_ENABLED=false +PURGE_WEBHOOK_ENABLED=false + +# ── Flow A: NATS (event-driven, requires NATS connection below) ─────────────── +PURGE_NATS_ENABLED=false +PURGE_NATS_TTL_SECONDS=2592000 + +# ── Flow B: Cron (DB scan, no NATS required) ────────────────────────────────── +PURGE_CRON_ENABLED=false +PURGE_CRON_TTL_SECONDS=2592000 +PURGE_CRON_SCHEDULE=0 * * * * + +# ── Optional: per-type record filtering (default: all 4 types) ──────────────── +# PURGE_DIDCOMM_CREDENTIAL=true +# PURGE_DIDCOMM_PROOF=true +# PURGE_OID4VC_ISSUANCE=true +# PURGE_OID4VC_VERIFICATION=true + +# ─── NATS Connection (required for Flow A) ──────────────────────────────────── +NATS_SERVERS=nats://localhost:4223 +# Auth type — one of: nkey | creds | usernamePassword | none +NATS_AUTH_TYPE=nkey +NATS_NKEY_SEED= diff --git a/.env.sample b/.env.sample index 577eb8c2..591a04d0 100644 --- a/.env.sample +++ b/.env.sample @@ -86,3 +86,35 @@ INDICIO_TEST_GENESIS=`{"reqSignature":{},"txn":{"data":{"data":{"alias":"OpsNode {"reqSignature":{},"txn":{"data":{"data":{"alias":"idlab-node01","blskey":"2fjJVi33U1tCTjW77cJaf1NLz7EzWkVNzR9BEQpVVK64MJpRKNUzt6k7Td2U8yqU5hGyAFH5N7ZymSB55TnpC3rJYLVTcGXZeXpmrQx3mwnXNyfTDnxfTpdQ1KMoFeZoDPZ8acfaH8GWeW2jL1qREE52tetBf4tXTeshmWzGkEN7r4y","blskey_pop":"RSjiM6dYUmN2rv2ca7dUCmEKrivq12rhxhXUKHdmSwUxbCmcijsgoERjYG7MqxhKLjSAJ5715K23fVEc6uK1kTenKmYCcCts8MLMAQG8Upb22nfgHJ3py8RwRoACeAjFF3myAMNRJJPhUdv96drJdwkGRv7f6JjvoB5KWVQYTNgheP","client_ip":"205.159.92.17","client_port":"9702","node_ip":"205.159.92.16","node_port":"9701","services":["VALIDATOR"]},"dest":"8czYgwmLDazVrBHuo53Tyx7Tw8ZhvnoC2BfhQGir4r8F"},"metadata":{"from":"PN8wFxLKjdkwyxoEEXwyz2"},"type":"0"},"txnMetadata":{"seqNo":5,"txnId":"9237eca7d2a203f6e1779f63064d2f22cf28e1bcd4e6fe5d791b15e82969acdc"},"ver":"1"} {"reqSignature":{},"txn":{"data":{"data":{"alias":"lorica-identity-node1","blskey":"wUh24sVCQ8PHDgSb343g2eLxjD5vwxsrETfuV2sbwMNnYon9nhbaK5jcWTekvXtyiwxHxuiCCoZwKS97MQEAeC2oLbbMeKjYm212QwSnm7aKLEqTStXht35VqZvZLT7Q3mPQRYLjMGixdn4ocNHrBTMwPUQYycEqwaHWgE1ncDueXY","blskey_pop":"R2sMwF7UW6AaD4ALa1uB1YVPuP6JsdJ7LsUoViM9oySFqFt34C1x1tdHDysS9wwruzaaEFui6xNPqJ8eu3UBqcFKkoWhdsMqCALwe63ytxPwvtLtCffJLhHAcgrPC7DorXYdqhdG2cevdqc5oqFEAaKoFDBf12p5SsbbM4PYWCmVCb","client_ip":"35.225.220.151","client_port":"9702","node_ip":"35.224.26.110","node_port":"9701","services":["VALIDATOR"]},"dest":"k74ZsZuUaJEcB8RRxMwkCwdE5g1r9yzA3nx41qvYqYf"},"metadata":{"from":"Ex6hzsJFYzNJ7kzbfncNeU"},"type":"0"},"txnMetadata":{"seqNo":6,"txnId":"6880673ce4ae4a2352f103d2a6ae20469dd070f2027283a1da5e62a64a59d688"},"ver":"1"} {"reqSignature":{},"txn":{"data":{"data":{"alias":"cysecure-itn","blskey":"GdCvMLkkBYevRFi93b6qaj9G2u1W6Vnbg8QhRD1chhrWR8vRE8x9x7KXVeUBPFf6yW5qq2JCfA2frc8SGni2RwjtTagezfwAwnorLhVJqS5ZxTi4pgcw6smebnt4zWVhTkh6ugDHEypHwNQBcw5WhBZcEJKgNbyVLnHok9ob6cfr3u","blskey_pop":"RbH9mY7M5p3UB3oj4sT1skYwMkxjoUnja8eTYfcm83VcNbxC9zR9pCiRhk4q1dJT3wkDBPGNKnk2p83vaJYLcgMuJtzoWoJAWAxjb3Mcq8Agf6cgQpBuzBq2uCzFPuQCAhDS4Kv9iwA6FsRnfvoeFTs1hhgSJVxQzDWMVTVAD9uCqu","client_ip":"35.169.19.171","client_port":"9702","node_ip":"54.225.56.21","node_port":"9701","services":["VALIDATOR"]},"dest":"4ETBDmHzx8iDQB6Xygmo9nNXtMgq9f6hxGArNhQ6Hh3u"},"metadata":{"from":"uSXXXEdBicPHMMhr3ddNF"},"type":"0"},"txnMetadata":{"seqNo":7,"txnId":"3c21718b07806b2f193b35953dda5b68b288efd551dce4467ce890703d5ba549"},"ver":"1"}` +# ─── Data Purge ─────────────────────────────────────────────────────────────── +# Automatically deletes records after a configurable TTL. +# Two independent flows — enable one or both: +# Flow A (NATS): schedules deletion at offer-creation time via NATS JetStream +# Flow B (Cron): periodically scans DB and deletes records older than TTL + +# Master switch — set to true to activate purge system +PURGE_ENABLED= +# Set to false to skip webhook calls after deletion (default: true) +PURGE_WEBHOOK_ENABLED= + +# ── Flow A: NATS (requires NATS_SERVERS below) ──────────────────────────────── +PURGE_NATS_ENABLED= +PURGE_NATS_TTL_SECONDS= # seconds from offer creation to deletion (e.g. 2592000 = 30 days) + +# ── Flow B: Cron (DB scan, no NATS required) ────────────────────────────────── +PURGE_CRON_ENABLED= +PURGE_CRON_TTL_SECONDS= # records older than this (createdAt) are deleted (e.g. 2592000 = 30 days) +PURGE_CRON_SCHEDULE= # cron expression for scan frequency (e.g. 0 * * * * = hourly) + +# ── Optional: per-type record filtering ─────────────────────────────────────── +# Uncomment to restrict purge to specific record types (default: all types enabled) +# PURGE_DIDCOMM_CREDENTIAL=true +# PURGE_DIDCOMM_PROOF=true +# PURGE_OID4VC_ISSUANCE=true +# PURGE_OID4VC_VERIFICATION=true + +# ─── NATS Connection (required for Flow A) ──────────────────────────────────── +NATS_SERVERS= +# Auth type — one of: nkey | creds | usernamePassword | none +NATS_AUTH_TYPE= +NATS_NKEY_SEED= # required when NATS_AUTH_TYPE=nkey # required when NATS_AUTH_TYPE=usernamePassword diff --git a/package.json b/package.json index febab0df..30143edb 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,8 @@ "express-rate-limit": "^7.5.0", "joi": "^17.13.3", "jsonwebtoken": "^9.0.2", + "nats": "^2.29.3", + "node-cron": "^4.2.1", "node-fetch": "2", "patch-package": "^8.0.0", "reflect-metadata": "^0.2.2", @@ -97,6 +99,7 @@ "@types/jsonwebtoken": "^9.0.9", "@types/multer": "^1.4.12", "@types/node": "^20.17.0", + "@types/node-cron": "^3.0.11", "@types/ref-array-di": "^1.2.8", "@types/ref-struct-di": "^1.1.12", "@types/supertest": "^6.0.3", diff --git a/src/cliAgent.ts b/src/cliAgent.ts index 791c5137..744e20c8 100644 --- a/src/cliAgent.ts +++ b/src/cliAgent.ts @@ -66,6 +66,9 @@ import { readFile } from 'fs/promises' import { IndicioAcceptanceMechanism, IndicioTransactionAuthorAgreement, Network, NetworkName } from './enums' import { setupServer } from './server' +import { buildPurgeConfig } from './purge/PurgeTypes' +import { validatePurgeConfig } from './purge/PurgeConfigValidator' +import { initPurgeSchedulers, stopPurgeSchedulers, getNatsPurgeScheduler, getCronPurgeScheduler } from './purge/PurgeSchedulerFactory' import { generateSecretKey } from './utils/helpers' import { TsLogger } from './utils/logger' import { @@ -361,6 +364,7 @@ const getWithTenantModules = ( // return secretKey // } + export async function runRestAgent(restConfig: AriesRestConfig) { const { endpoints, @@ -556,7 +560,34 @@ export async function runRestAgent(restConfig: AriesRestConfig) { logger.info(`*** API Key: ${apiKey}`) + // Start purge schedulers if enabled (NATS and Cron are independent) + const purgeConfig = buildPurgeConfig() + if (purgeConfig) { + await validatePurgeConfig(purgeConfig) + initPurgeSchedulers(purgeConfig.natsConfig.enabled, purgeConfig.cronConfig.enabled) + + const purgeWebhookUrl = purgeConfig.webhookEnabled ? webhookUrl : undefined + + if (purgeConfig.natsConfig.enabled) { + await getNatsPurgeScheduler()!.start(agent, purgeConfig, purgeWebhookUrl) + } + + if (purgeConfig.cronConfig.enabled) { + await getCronPurgeScheduler()!.start(agent, purgeConfig, purgeWebhookUrl) + } + } + app.listen(adminPort, () => { logger.info(`Successfully started server on port ${adminPort}`) }) + + // Graceful shutdown + const shutdown = async () => { + agent.config.logger.info('[Shutdown] Stopping services...') + await stopPurgeSchedulers() + process.exit(0) + } + + process.on('SIGTERM', shutdown) + process.on('SIGINT', shutdown) } diff --git a/src/controllers/didcomm/credentials/CredentialController.ts b/src/controllers/didcomm/credentials/CredentialController.ts index 315a2c0e..0da39061 100644 --- a/src/controllers/didcomm/credentials/CredentialController.ts +++ b/src/controllers/didcomm/credentials/CredentialController.ts @@ -14,6 +14,8 @@ import { injectable } from 'tsyringe' import { SCOPES } from '../../../enums' import ErrorHandlingService from '../../../errorHandlingService' +import { SchedulePurge } from '../../../purge/decorators/SchedulePurge' +import { PurgeRecordType } from '../../../purge/PurgeTypes' import { AgentType } from '../../../types' import { CredentialExchangeRecordExample, RecordId } from '../../examples' import { @@ -163,6 +165,7 @@ export class CredentialController extends Controller { */ @Example(CredentialExchangeRecordExample) @Post('/create-offer') + @SchedulePurge(PurgeRecordType.DIDCOMM_CREDENTIAL, (r) => (r as any)?.id) public async createOffer(@Request() request: Req, @Body() createOfferOptions: CreateOfferOptions) { try { const offer = await request.agent.modules.didcomm.credentials.offerCredential(createOfferOptions) @@ -173,6 +176,7 @@ export class CredentialController extends Controller { } @Post('/create-offer-oob') + @SchedulePurge(PurgeRecordType.DIDCOMM_CREDENTIAL, (r) => (r as any)?.credentialExchangeRecordId) public async createOfferOob(@Request() request: Req, @Body() outOfBandOption: CreateOfferOobOptions) { try { let invitationDid: string | undefined @@ -230,6 +234,7 @@ export class CredentialController extends Controller { }), outOfBandRecord: outOfBandRecord.toJSON(), outOfBandRecordId: outOfBandRecord.id, + credentialExchangeRecordId: offerOob.credentialExchangeRecord.id, credentialRequestThId: offerOob.credentialExchangeRecord.threadId, invitationDid, } diff --git a/src/controllers/didcomm/proofs/ProofController.ts b/src/controllers/didcomm/proofs/ProofController.ts index 01832fb2..f971497d 100644 --- a/src/controllers/didcomm/proofs/ProofController.ts +++ b/src/controllers/didcomm/proofs/ProofController.ts @@ -16,6 +16,8 @@ import { injectable } from 'tsyringe' import { SCOPES } from '../../../enums' import ErrorHandlingService from '../../../errorHandlingService' +import { SchedulePurge } from '../../../purge/decorators/SchedulePurge' +import { PurgeRecordType } from '../../../purge/PurgeTypes' import { ProofRecordExample, RecordId } from '../../examples' import { AcceptProofProposal, @@ -118,6 +120,7 @@ export class ProofController extends Controller { */ @Post('/request-proof') @Example(ProofRecordExample) + @SchedulePurge(PurgeRecordType.DIDCOMM_PROOF, (r) => (r as any)?.id) public async requestProof(@Request() request: Req, @Body() requestProofOptions: RequestProofOptions) { try { const requestProofPayload = { @@ -143,6 +146,7 @@ export class ProofController extends Controller { */ @Post('create-request-oob') @Example(ProofRecordExample) + @SchedulePurge(PurgeRecordType.DIDCOMM_PROOF, (r) => (r as any)?.proofRecordId) public async createRequest(@Request() request: Req, @Body() createRequestOptions: CreateProofRequestOobOptions) { try { let routing: DidCommRouting @@ -200,6 +204,7 @@ export class ProofController extends Controller { }), outOfBandRecord: outOfBandRecord.toJSON(), invitationDid, + proofRecordId: proof.proofRecord.id, proofRecordThId: proof.proofRecord.threadId, proofMessageId: proof.message.thread?.threadId || proof.message.threadId || proof.message.id, } diff --git a/src/controllers/openid4vc/issuance-sessions/issuance-sessions.Controller.ts b/src/controllers/openid4vc/issuance-sessions/issuance-sessions.Controller.ts index 01b1df1d..d575e226 100644 --- a/src/controllers/openid4vc/issuance-sessions/issuance-sessions.Controller.ts +++ b/src/controllers/openid4vc/issuance-sessions/issuance-sessions.Controller.ts @@ -6,6 +6,8 @@ import { injectable } from 'tsyringe' import { SCOPES } from '../../../enums' // eslint-disable-next-line import/order import ErrorHandlingService from '../../../errorHandlingService' +import { SchedulePurge } from '../../../purge/decorators/SchedulePurge' +import { PurgeRecordType } from '../../../purge/PurgeTypes' // import { AgentWithRootOrTenant } from '../../types/agent' import { OpenId4VcIssuanceSessionsCreateOffer } from '../types/issuer.types' @@ -26,6 +28,7 @@ export class IssuanceSessionsController extends Controller { * Creates a credential offer with the specified credential configurations and authorization type. */ @Post('/create-credential-offer') + @SchedulePurge(PurgeRecordType.OID4VC_ISSUANCE, (r) => (r as any)?.issuanceSession?.id) public async createCredentialOffer(@Request() request: Req, @Body() options: OpenId4VcIssuanceSessionsCreateOffer) { try { return await issuanceSessionService.createCredentialOffer(options, request) diff --git a/src/controllers/openid4vc/verifier-sessions/verification-sessions.Controller.ts b/src/controllers/openid4vc/verifier-sessions/verification-sessions.Controller.ts index 7e6385b1..b6d945cd 100644 --- a/src/controllers/openid4vc/verifier-sessions/verification-sessions.Controller.ts +++ b/src/controllers/openid4vc/verifier-sessions/verification-sessions.Controller.ts @@ -5,6 +5,8 @@ import { injectable } from 'tsyringe' import { SCOPES } from '../../../enums' import ErrorHandlingService from '../../../errorHandlingService' +import { SchedulePurge } from '../../../purge/decorators/SchedulePurge' +import { PurgeRecordType } from '../../../purge/PurgeTypes' import { CreateAuthorizationRequest, OpenId4VCDCQLVerificationSessionRecord } from '../types/verifier.types' import { VerificationSessionsService } from './verification-sessions.service' @@ -21,6 +23,7 @@ export class VerificationSessionsController extends Controller { * Create an authorization request, acting as a Relying Party (RP) */ @Post('/create-presentation-request') + @SchedulePurge(PurgeRecordType.OID4VC_VERIFICATION, (r) => (r as any)?.verificationSession?.id) public async createProofRequest( @Request() request: Req, @Body() createAuthorizationRequest: CreateAuthorizationRequest, diff --git a/src/purge/PurgeConfigValidator.ts b/src/purge/PurgeConfigValidator.ts new file mode 100644 index 00000000..b2f316bf --- /dev/null +++ b/src/purge/PurgeConfigValidator.ts @@ -0,0 +1,46 @@ +import { connect } from 'nats' + +import { buildNatsAuthenticator } from '../utils/NatsAuthenticator' +import type { NatsConfig, PurgeConfig } from './PurgeTypes' + +export async function validatePurgeConfig(config: PurgeConfig): Promise { + const { natsConfig, cronConfig } = config + + if (!natsConfig.enabled && !cronConfig.enabled) { + throw new Error( + '[Purge] PURGE_ENABLED=true but neither PURGE_NATS_ENABLED nor PURGE_CRON_ENABLED is set to true. ' + + 'Enable at least one mode.', + ) + } + + if (natsConfig.enabled) { + await verifyNatsJetStream(natsConfig.nats) + } +} + +async function verifyNatsJetStream(nats: NatsConfig): Promise { + let nc: Awaited> | null = null + + try { + nc = await connect({ + servers: nats.servers, + ...buildNatsAuthenticator(nats), + timeout: 5000, + maxReconnectAttempts: 0, + }) + } catch (err: any) { + throw new Error( + `[Purge] PURGE_NATS_ENABLED=true but cannot connect to NATS at ${nats.servers.join(', ')}: ${err?.message}`, + ) + } + + try { + await nc.jetstreamManager() + } catch (err: any) { + throw new Error( + `[Purge] Connected to NATS but JetStream is not enabled. Start NATS with the -js flag. Error: ${err?.message}`, + ) + } finally { + await nc.close() + } +} diff --git a/src/purge/PurgeConstants.ts b/src/purge/PurgeConstants.ts new file mode 100644 index 00000000..245386ca --- /dev/null +++ b/src/purge/PurgeConstants.ts @@ -0,0 +1,46 @@ +import { PurgeRecordType } from './PurgeTypes' + +export const PURGE_STREAM = 'PURGE' + +// Schedule and execution subjects must be in the same stream for Nats-Schedule-Target to work +export const PURGE_SCHEDULER_SUBJECTS: Record = { + [PurgeRecordType.DIDCOMM_CREDENTIAL]: 'purge.schedule.didcomm.credential', + [PurgeRecordType.DIDCOMM_PROOF]: 'purge.schedule.didcomm.proof', + [PurgeRecordType.OID4VC_ISSUANCE]: 'purge.schedule.oid4vc.issuance', + [PurgeRecordType.OID4VC_VERIFICATION]: 'purge.schedule.oid4vc.verification', +} + +export const PURGE_EXECUTION_SUBJECTS: Record = { + [PurgeRecordType.DIDCOMM_CREDENTIAL]: 'purge.execute.didcomm.credential', + [PurgeRecordType.DIDCOMM_PROOF]: 'purge.execute.didcomm.proof', + [PurgeRecordType.OID4VC_ISSUANCE]: 'purge.execute.oid4vc.issuance', + [PurgeRecordType.OID4VC_VERIFICATION]: 'purge.execute.oid4vc.verification', +} + +export const PURGE_CONSUMER_NAMES: Record = { + [PurgeRecordType.DIDCOMM_CREDENTIAL]: 'purge-worker-didcomm-credential', + [PurgeRecordType.DIDCOMM_PROOF]: 'purge-worker-didcomm-proof', + [PurgeRecordType.OID4VC_ISSUANCE]: 'purge-worker-oid4vc-issuance', + [PurgeRecordType.OID4VC_VERIFICATION]: 'purge-worker-oid4vc-verification', +} + +// Added to ttlSeconds when computing stream max_age — covers worker processing time after fire +export const PURGE_STREAM_BUFFER_NS = 7 * 24 * 60 * 60 * 1_000_000_000 + +export const PURGE_CONSUMER_ACK_WAIT_NS = 30 * 1_000_000_000 + +export const PURGE_CONSUMER_MAX_DELIVER = 3 + +export const PURGE_CONSUMER_BACKOFF_NS = [ + 5_000_000_000, // 5s + 30_000_000_000, // 30s +] + +export const PURGE_WEBHOOK_PATHS: Record = { + [PurgeRecordType.DIDCOMM_CREDENTIAL]: '/purge/didcomm-credential', + [PurgeRecordType.DIDCOMM_PROOF]: '/purge/didcomm-proof', + [PurgeRecordType.OID4VC_ISSUANCE]: '/purge/oid4vc-issuance', + [PurgeRecordType.OID4VC_VERIFICATION]: '/purge/oid4vc-verification', +} + +export const PURGE_WEBHOOK_RETRY_DELAYS_MS = [1000, 5000, 30000] diff --git a/src/purge/PurgeDeleteRecord.ts b/src/purge/PurgeDeleteRecord.ts new file mode 100644 index 00000000..6de526e4 --- /dev/null +++ b/src/purge/PurgeDeleteRecord.ts @@ -0,0 +1,34 @@ +import type { Agent } from '@credo-ts/core' + +import { OpenId4VcIssuanceSessionRepository, OpenId4VcVerificationSessionRepository } from '@credo-ts/openid4vc' + +import { PurgeRecordType } from './PurgeTypes' + +export async function deletePurgeRecord(agent: Agent, recordType: PurgeRecordType, recordId: string): Promise { + switch (recordType) { + case PurgeRecordType.DIDCOMM_CREDENTIAL: + await (agent as any).modules.didcomm.credentials.deleteById(recordId) + break + + case PurgeRecordType.DIDCOMM_PROOF: + await (agent as any).modules.didcomm.proofs.deleteById(recordId) + break + + case PurgeRecordType.OID4VC_ISSUANCE: { + const repo = agent.dependencyManager.resolve(OpenId4VcIssuanceSessionRepository) + await repo.deleteById(agent.context, recordId) + break + } + + case PurgeRecordType.OID4VC_VERIFICATION: { + const repo = agent.dependencyManager.resolve(OpenId4VcVerificationSessionRepository) + await repo.deleteById(agent.context, recordId) + break + } + + default: { + const _exhaustive: never = recordType + throw new Error(`[Purge] Unhandled record type: ${_exhaustive}`) + } + } +} diff --git a/src/purge/PurgeSchedulerFactory.ts b/src/purge/PurgeSchedulerFactory.ts new file mode 100644 index 00000000..22454280 --- /dev/null +++ b/src/purge/PurgeSchedulerFactory.ts @@ -0,0 +1,23 @@ +import { CronPurgeScheduler } from './schedulers/CronPurgeScheduler' +import { NatsPurgeScheduler } from './schedulers/NatsPurgeScheduler' + +let _natsScheduler: NatsPurgeScheduler | null = null +let _cronScheduler: CronPurgeScheduler | null = null + +export function getNatsPurgeScheduler(): NatsPurgeScheduler | null { + return _natsScheduler +} + +export function getCronPurgeScheduler(): CronPurgeScheduler | null { + return _cronScheduler +} + +export function initPurgeSchedulers(natsEnabled: boolean, cronEnabled: boolean): void { + if (natsEnabled) _natsScheduler = new NatsPurgeScheduler() + if (cronEnabled) _cronScheduler = new CronPurgeScheduler() +} + +export async function stopPurgeSchedulers(): Promise { + await _natsScheduler?.stop() + await _cronScheduler?.stop() +} diff --git a/src/purge/PurgeTypes.ts b/src/purge/PurgeTypes.ts new file mode 100644 index 00000000..ee9c5a92 --- /dev/null +++ b/src/purge/PurgeTypes.ts @@ -0,0 +1,117 @@ +export interface NatsConfig { + servers: string[] + nkeySeed?: string + credentialsFile?: string + username?: string + password?: string +} + +export type AgentMode = 'shared' | 'dedicated' + +export enum PurgeRecordType { + DIDCOMM_CREDENTIAL = 'didcomm_credential', + DIDCOMM_PROOF = 'didcomm_proof', + OID4VC_ISSUANCE = 'oid4vc_issuance', + OID4VC_VERIFICATION = 'oid4vc_verification', +} + +export interface PurgeJob { + recordId: string + recordType: PurgeRecordType + tenantId: string + agentMode: AgentMode + scheduledAt: string +} + +export interface PurgeNatsConfig { + enabled: boolean + ttlSeconds: number + nats: NatsConfig + recordTypes: PurgeRecordType[] +} + +export interface PurgeCronConfig { + enabled: boolean + ttlSeconds: number + cronSchedule: string + recordTypes: PurgeRecordType[] +} + +export interface PurgeConfig { + natsConfig: PurgeNatsConfig + cronConfig: PurgeCronConfig + webhookEnabled: boolean +} + +export function buildPurgeConfig(): PurgeConfig | undefined { + if (process.env.PURGE_ENABLED !== 'true') return undefined + + const natsEnabled = process.env.PURGE_NATS_ENABLED === 'true' + const cronEnabled = process.env.PURGE_CRON_ENABLED === 'true' + + if (!natsEnabled && !cronEnabled) return undefined + + return { + natsConfig: { + enabled: natsEnabled, + ttlSeconds: parseTtlSeconds(process.env.PURGE_NATS_TTL_SECONDS, 'PURGE_NATS_TTL_SECONDS'), + nats: { + servers: (process.env.NATS_SERVERS || 'nats://localhost:4222').split(',').map((s) => s.trim()).filter(Boolean), + nkeySeed: process.env.NATS_NKEY_SEED, + credentialsFile: process.env.NATS_CREDENTIALS_FILE, + username: process.env.NATS_USER, + password: process.env.NATS_PASSWORD, + }, + recordTypes: buildPurgeRecordTypes(), + }, + cronConfig: { + enabled: cronEnabled, + ttlSeconds: parseTtlSeconds(process.env.PURGE_CRON_TTL_SECONDS, 'PURGE_CRON_TTL_SECONDS'), + cronSchedule: process.env.PURGE_CRON_SCHEDULE || '0 * * * *', + recordTypes: buildPurgeRecordTypes(), + }, + webhookEnabled: process.env.PURGE_WEBHOOK_ENABLED !== 'false', + } +} + +function parseTtlSeconds(value: string | undefined, envKey: string, defaultSeconds = 2592000): number { + if (value === undefined || value === '') return defaultSeconds + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed <= 0) { + throw new Error(`[Purge] ${envKey} must be a positive integer, got: "${value}"`) + } + return parsed +} + +function buildPurgeRecordTypes(): PurgeRecordType[] { + const envFlags: Record = { + PURGE_DIDCOMM_CREDENTIAL: PurgeRecordType.DIDCOMM_CREDENTIAL, + PURGE_DIDCOMM_PROOF: PurgeRecordType.DIDCOMM_PROOF, + PURGE_OID4VC_ISSUANCE: PurgeRecordType.OID4VC_ISSUANCE, + PURGE_OID4VC_VERIFICATION: PurgeRecordType.OID4VC_VERIFICATION, + } + + const anyEnvSet = Object.keys(envFlags).some((key) => process.env[key] !== undefined) + + if (anyEnvSet) { + const selected = Object.entries(envFlags) + .filter(([key]) => process.env[key] === 'true') + .map(([, type]) => type) + + if (selected.length === 0) { + throw new Error( + '[Purge] At least one PURGE_* record type flag must be set to "true" when any flag is present. ' + + 'Set PURGE_ENABLED=false to disable purge entirely.', + ) + } + + return selected + } + + return [ + PurgeRecordType.DIDCOMM_CREDENTIAL, + PurgeRecordType.DIDCOMM_PROOF, + PurgeRecordType.OID4VC_ISSUANCE, + PurgeRecordType.OID4VC_VERIFICATION, + ] +} diff --git a/src/purge/PurgeWebhook.ts b/src/purge/PurgeWebhook.ts new file mode 100644 index 00000000..fa1dbf55 --- /dev/null +++ b/src/purge/PurgeWebhook.ts @@ -0,0 +1,56 @@ +import type { Logger } from '@credo-ts/core' + +import { sleep } from '../utils/webhook' +import { PURGE_WEBHOOK_PATHS, PURGE_WEBHOOK_RETRY_DELAYS_MS } from './PurgeConstants' +import type { PurgeRecordType } from './PurgeTypes' + +export enum PurgeDeletionStatus { + DELETED = 'deleted', + ALREADY_ABSENT = 'already-absent', +} + +export async function sendPurgeWebhook( + webhookUrl: string, + recordId: string, + recordType: PurgeRecordType, + tenantId: string, + status: PurgeDeletionStatus, + logger: Logger, +): Promise { + const url = `${webhookUrl}${PURGE_WEBHOOK_PATHS[recordType]}` + const body = { + eventType: 'purge.deletion.complete', + occurredAt: new Date().toISOString(), + tenantId, + deletion: { + recordId, + recordType, + status, + deletedAt: new Date().toISOString(), + }, + } + + const delays = PURGE_WEBHOOK_RETRY_DELAYS_MS + const maxAttempts = delays.length + 1 + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + try { + const res = await fetch(url, { + method: 'POST', + body: JSON.stringify(body), + headers: { 'Content-Type': 'application/json' }, + signal: AbortSignal.timeout(10000), + }) + if (!res.ok) throw new Error(`HTTP ${res.status}`) + logger.debug('[Purge] Webhook delivered', { url, recordId }) + return + } catch (err: any) { + if (attempt === maxAttempts - 1) { + logger.warn('[Purge] Webhook failed after all retries', { url, recordId, error: err?.message }) + } else { + logger.debug(`[Purge] Webhook attempt ${attempt + 1} failed — retrying in ${delays[attempt]}ms`) + await sleep(delays[attempt]) + } + } + } +} diff --git a/src/purge/PurgeWorker.ts b/src/purge/PurgeWorker.ts new file mode 100644 index 00000000..6ce2bcce --- /dev/null +++ b/src/purge/PurgeWorker.ts @@ -0,0 +1,111 @@ +import type { Agent } from '@credo-ts/core' +import type { Consumer } from 'nats' + +import { RecordNotFoundError } from '@credo-ts/core' +import { StringCodec } from 'nats' + +import { PURGE_CONSUMER_MAX_DELIVER } from './PurgeConstants' +import { deletePurgeRecord } from './PurgeDeleteRecord' +import { sendPurgeWebhook, PurgeDeletionStatus } from './PurgeWebhook' +import type { PurgeJob } from './PurgeTypes' +import { PurgeRecordType } from './PurgeTypes' + +const sc = StringCodec() + +export class PurgeWorker { + private recordType: PurgeRecordType + private consumerName: string + private webhookUrl: string | undefined + + constructor(recordType: PurgeRecordType, consumerName: string, webhookUrl?: string) { + this.recordType = recordType + this.consumerName = consumerName + this.webhookUrl = webhookUrl + } + + async start(agent: Agent, consumer: Consumer): Promise { + console.log(`[Purge][Worker] Started — consumer=${this.consumerName} recordType=${this.recordType}`) + agent.config.logger.info('[Purge] Worker started', { consumer: this.consumerName }) + + while (true) { + const messages = await consumer.consume() + console.log(`[Purge][Worker] Consuming messages — consumer=${this.consumerName}`) + for await (const msg of messages) { + await this.processMessage(msg, agent) + } + console.warn(`[Purge][Worker] Consume loop ended — restarting consumer=${this.consumerName}`) + agent.config.logger.warn('[Purge] Consume loop ended — restarting', { consumer: this.consumerName }) + } + } + + private async processMessage(msg: any, agent: Agent): Promise { + let job: PurgeJob | undefined + + try { + job = JSON.parse(sc.decode(msg.data)) as PurgeJob + } catch { + agent.config.logger.error('[Purge] Failed to parse job — discarding', { consumer: this.consumerName }) + msg.ack() + return + } + + const { recordId, recordType, tenantId, agentMode } = job + const logger = agent.config.logger + const deliveryCount: number = msg.info.deliveryCount + + if (recordType !== this.recordType) { + logger.error('[Purge] Job record type mismatch — discarding', { + expected: this.recordType, + received: recordType, + recordId, + }) + msg.ack() + return + } + + console.log(`[Purge][Worker] Job received — recordType=${recordType} recordId=${recordId} tenantId="${tenantId}" deliveryCount=${deliveryCount}`) + logger.info('[Purge] Job received', { recordId, recordType, tenantId, deliveryCount }) + + try { + if (agentMode === 'shared') { + await (agent as any).modules.tenants.withTenantAgent({ tenantId }, async (tenantAgent: Agent) => { + await deletePurgeRecord(tenantAgent, this.recordType, recordId) + }) + } else { + await deletePurgeRecord(agent, this.recordType, recordId) + } + + console.log(`[Purge][Worker] Record deleted — recordType=${recordType} recordId=${recordId} tenantId="${tenantId}"`) + logger.info('[Purge] Record deleted', { recordId, recordType, tenantId }) + msg.ack() + + if (this.webhookUrl) { + await sendPurgeWebhook(this.webhookUrl, recordId, this.recordType, tenantId, PurgeDeletionStatus.DELETED, logger) + } + } catch (err: any) { + if (err instanceof RecordNotFoundError) { + console.warn(`[Purge][Worker] Record already absent — recordType=${recordType} recordId=${recordId}`) + logger.warn('[Purge] Record already absent — treating as success', { recordId, recordType }) + msg.ack() + + if (this.webhookUrl) { + await sendPurgeWebhook(this.webhookUrl, recordId, this.recordType, tenantId, PurgeDeletionStatus.ALREADY_ABSENT, logger) + } + return + } + + console.warn(`[Purge][Worker] Job failed — recordType=${recordType} recordId=${recordId} deliveryCount=${deliveryCount}`, err?.message) + logger.warn('[Purge] Job failed', { recordId, recordType, deliveryCount, error: err?.message }) + + if (deliveryCount >= PURGE_CONSUMER_MAX_DELIVER) { + console.error(`[Purge][Worker] Job dropped after max retries — recordType=${recordType} recordId=${recordId} tenantId="${tenantId}"`) + logger.error('[Purge] Job dropped after max retries', { recordId, recordType, tenantId, deliveryCount }) + msg.ack() + } else { + console.log(`[Purge][Worker] Nacking job for retry — recordType=${recordType} recordId=${recordId} deliveryCount=${deliveryCount}`) + msg.nak() + } + } + } + +} diff --git a/src/purge/decorators/SchedulePurge.ts b/src/purge/decorators/SchedulePurge.ts new file mode 100644 index 00000000..cb7a316e --- /dev/null +++ b/src/purge/decorators/SchedulePurge.ts @@ -0,0 +1,55 @@ +import type { AgentMode, PurgeRecordType } from '../PurgeTypes' + +import { getCronPurgeScheduler, getNatsPurgeScheduler } from '../PurgeSchedulerFactory' + +export function SchedulePurge( + recordType: PurgeRecordType, + idExtractor: (result: unknown) => string | undefined, +) { + return function (_target: object, _key: string, descriptor: PropertyDescriptor) { + const original = descriptor.value as (...args: unknown[]) => Promise + + descriptor.value = async function (...args: unknown[]) { + const result = await original.apply(this, args) + + const scheduler = getNatsPurgeScheduler() + + if (!scheduler) { + console.warn(`[Purge] @SchedulePurge(${recordType}): NATS scheduler not initialized — skipping`) + return result + } + + const recordId = idExtractor(result) + + if (!recordId) { + const resultKeys = result && typeof result === 'object' ? Object.keys(result) : typeof result + console.warn(`[Purge] @SchedulePurge(${recordType}): could not extract recordId — result shape: ${JSON.stringify(resultKeys)}`) + return result + } + + const request = args[0] as any + // TenantAgent.context.contextCorrelationId = `tenant-${tenantId}` (Credo internals) + const contextCorrelationId: string = (request?.agent as any)?.context?.contextCorrelationId ?? '' + const tenantId: string = contextCorrelationId.startsWith('tenant-') + ? contextCorrelationId.slice('tenant-'.length) + : '' + const agentMode: AgentMode = tenantId ? 'shared' : 'dedicated' + + console.info(`[Purge] Scheduling purge: ${recordType} recordId=${recordId} tenantId="${tenantId}" agentMode=${agentMode}`) + + // Fire-and-forget: purge scheduling must not block record creation. + // If NATS publish fails and cron is disabled, this record will not be purged. + scheduler.schedulePurge(recordType, recordId, tenantId, agentMode).catch((err: Error) => { + const hasCronFallback = getCronPurgeScheduler() !== null + const level = hasCronFallback ? 'warn' : 'error' + console[level]( + `[Purge] Failed to schedule NATS purge for ${recordType}:${recordId} — ${hasCronFallback ? 'cron fallback active' : 'NO cron fallback, record may leak'}: ${err?.message}`, + ) + }) + + return result + } + + return descriptor + } +} diff --git a/src/purge/schedulers/CronPurgeScheduler.ts b/src/purge/schedulers/CronPurgeScheduler.ts new file mode 100644 index 00000000..cd781116 --- /dev/null +++ b/src/purge/schedulers/CronPurgeScheduler.ts @@ -0,0 +1,197 @@ +import type { Agent } from '@credo-ts/core' +import type { ScheduledTask } from 'node-cron' + +import { RecordNotFoundError } from '@credo-ts/core' +import { OpenId4VcIssuanceSessionRepository, OpenId4VcVerificationSessionRepository } from '@credo-ts/openid4vc' +import cron from 'node-cron' + +import { deletePurgeRecord } from '../PurgeDeleteRecord' +import { sendPurgeWebhook, PurgeDeletionStatus } from '../PurgeWebhook' +import type { PurgeConfig } from '../PurgeTypes' +import { PurgeRecordType } from '../PurgeTypes' + +export class CronPurgeScheduler { + private job: ScheduledTask | null = null + private isRunning = false + + async start(agent: Agent, config: PurgeConfig, webhookUrl: string | undefined): Promise { + const { cronConfig } = config + + this.job = cron.schedule(cronConfig.cronSchedule, () => { + console.log(`[Purge][Cron] Tick fired — schedule="${cronConfig.cronSchedule}"`) + if (this.isRunning) { + console.warn('[Purge][Cron] Scan still running — skipping this tick') + agent.config.logger.warn('[Purge] Cron scan still running — skipping this tick') + return + } + this.isRunning = true + this.runScan(agent, config, webhookUrl) + .catch((err: Error) => { + console.error('[Purge][Cron] Scan error', err?.message) + agent.config.logger.error('[Purge] Cron scan error', { error: err?.message }) + }) + .finally(() => { this.isRunning = false }) + }) + + console.log(`[Purge][Cron] Scheduler started — schedule="${cronConfig.cronSchedule}" ttlSeconds=${cronConfig.ttlSeconds} recordTypes=${cronConfig.recordTypes.join(', ')}`) + agent.config.logger.info('[Purge] CronPurgeScheduler started', { + cronSchedule: cronConfig.cronSchedule, + ttlSeconds: cronConfig.ttlSeconds, + recordTypes: cronConfig.recordTypes, + }) + } + + async stop(): Promise { + this.job?.stop() + this.job = null + } + + private async runScan(agent: Agent, config: PurgeConfig, webhookUrl: string | undefined): Promise { + const logger = agent.config.logger + const isShared = typeof (agent as any).modules?.tenants?.getAllTenants === 'function' + + console.log(`[Purge][Cron] Scan started — agentMode=${isShared ? 'shared' : 'dedicated'}`) + logger.info('[Purge] Cron scan started', { agentMode: isShared ? 'shared' : 'dedicated' }) + + let totalDeleted = 0 + + if (isShared) { + const tenants: Array<{ id: string }> = await (agent as any).modules.tenants.getAllTenants() + + for (const tenant of tenants) { + try { + await (agent as any).modules.tenants.withTenantAgent( + { tenantId: tenant.id }, + async (tenantAgent: Agent) => { + const count = await this.scanTenant(tenantAgent, tenant.id, config, webhookUrl) + totalDeleted += count + }, + ) + } catch (err: any) { + console.error(`[Purge][Cron] Failed to scan tenant tenantId=${tenant.id}`, err?.message) + logger.error('[Purge] Failed to scan tenant', { tenantId: tenant.id, error: err?.message }) + } + } + } else { + totalDeleted = await this.scanTenant(agent, '', config, webhookUrl) + } + + console.log(`[Purge][Cron] Scan completed — totalDeleted=${totalDeleted}`) + logger.info('[Purge] Cron scan completed', { totalDeleted }) + } + + private async scanTenant( + tenantAgent: Agent, + tenantId: string, + config: PurgeConfig, + webhookUrl: string | undefined, + ): Promise { + let deleted = 0 + const { cronConfig } = config + + for (const recordType of cronConfig.recordTypes) { + try { + const expiredIds = await this.queryExpiredRecords(tenantAgent, recordType, cronConfig.ttlSeconds) + console.log(`[Purge][Cron] Queried expired records — recordType=${recordType} tenantId="${tenantId}" found=${expiredIds.length}`) + + for (const recordId of expiredIds) { + console.log(`[Purge][Cron] Deleting record — recordType=${recordType} recordId=${recordId} tenantId="${tenantId}"`) + if (await this.deleteAndNotify(tenantAgent, recordId, recordType, tenantId, webhookUrl)) { + deleted++ + } + } + } catch (err: any) { + console.error(`[Purge][Cron] Error scanning recordType=${recordType} tenantId="${tenantId}"`, err?.message) + tenantAgent.config.logger.error('[Purge] Error scanning record type', { + tenantId, + recordType, + error: err?.message, + }) + } + } + + return deleted + } + + private async queryExpiredRecords( + agent: Agent, + recordType: PurgeRecordType, + ttlSeconds: number, + ): Promise { + const cutoffMs = Date.now() - ttlSeconds * 1000 + const ids: string[] = [] + + switch (recordType) { + case PurgeRecordType.DIDCOMM_CREDENTIAL: { + const records = await (agent as any).modules.didcomm.credentials.findAllByQuery({}) + for (const r of records) { + if (r.createdAt && new Date(r.createdAt).getTime() < cutoffMs) ids.push(r.id) + } + break + } + + case PurgeRecordType.DIDCOMM_PROOF: { + const records = await (agent as any).modules.didcomm.proofs.findAllByQuery({}) + for (const r of records) { + if (r.createdAt && new Date(r.createdAt).getTime() < cutoffMs) ids.push(r.id) + } + break + } + + case PurgeRecordType.OID4VC_ISSUANCE: { + const repo = agent.dependencyManager.resolve(OpenId4VcIssuanceSessionRepository) + const records = await repo.findByQuery(agent.context, {}) + for (const r of records) { + if (r.createdAt && new Date(r.createdAt).getTime() < cutoffMs) ids.push(r.id) + } + break + } + + case PurgeRecordType.OID4VC_VERIFICATION: { + const repo = agent.dependencyManager.resolve(OpenId4VcVerificationSessionRepository) + const records = await repo.findByQuery(agent.context, {}) + for (const r of records) { + if (r.createdAt && new Date(r.createdAt).getTime() < cutoffMs) ids.push(r.id) + } + break + } + } + + return ids + } + + private async deleteAndNotify( + agent: Agent, + recordId: string, + recordType: PurgeRecordType, + tenantId: string, + webhookUrl: string | undefined, + ): Promise { + const logger = agent.config.logger + let status: PurgeDeletionStatus + + try { + await deletePurgeRecord(agent, recordType, recordId) + console.log(`[Purge][Cron] Record deleted — recordType=${recordType} recordId=${recordId} tenantId="${tenantId}"`) + logger.info('[Purge] Record deleted by cron', { recordId, recordType, tenantId }) + status = PurgeDeletionStatus.DELETED + } catch (err: any) { + if (err instanceof RecordNotFoundError) { + console.warn(`[Purge][Cron] Record already absent — recordType=${recordType} recordId=${recordId}`) + logger.warn('[Purge] Record already absent — skipping', { recordId, recordType }) + status = PurgeDeletionStatus.ALREADY_ABSENT + } else { + console.error(`[Purge][Cron] Failed to delete record — recordType=${recordType} recordId=${recordId}`, err?.message) + logger.error('[Purge] Failed to delete record', { recordId, recordType, error: err?.message }) + return false + } + } + + if (webhookUrl) { + await sendPurgeWebhook(webhookUrl, recordId, recordType, tenantId, status, logger) + + } + + return true + } +} diff --git a/src/purge/schedulers/NatsPurgeScheduler.ts b/src/purge/schedulers/NatsPurgeScheduler.ts new file mode 100644 index 00000000..609bd4bd --- /dev/null +++ b/src/purge/schedulers/NatsPurgeScheduler.ts @@ -0,0 +1,220 @@ +import type { Agent } from '@credo-ts/core' +import type { JetStreamClient, JetStreamManager, NatsConnection } from 'nats' + +import { AckPolicy, DiscardPolicy, RetentionPolicy, StorageType, StringCodec, connect, headers } from 'nats' + +import { buildNatsAuthenticator } from '../../utils/NatsAuthenticator' + +import { + NATS_ERR_CONSUMER_ALREADY_EXISTS, + NATS_ERR_STREAM_ALREADY_EXISTS, + NATS_MAX_RECONNECT_ATTEMPTS, + NATS_RECONNECT_TIME_WAIT_MS, +} from '../../utils/NatsConstants' +import { + PURGE_CONSUMER_ACK_WAIT_NS, + PURGE_CONSUMER_BACKOFF_NS, + PURGE_CONSUMER_MAX_DELIVER, + PURGE_CONSUMER_NAMES, + PURGE_EXECUTION_SUBJECTS, + PURGE_SCHEDULER_SUBJECTS, + PURGE_STREAM, + PURGE_STREAM_BUFFER_NS, +} from '../PurgeConstants' +import type { AgentMode, PurgeConfig, PurgeJob } from '../PurgeTypes' +import { PurgeRecordType } from '../PurgeTypes' +import { PurgeWorker } from '../PurgeWorker' + +const sc = StringCodec() + +export class NatsPurgeScheduler { + private nc: NatsConnection | null = null + private js: JetStreamClient | null = null + private jsm: JetStreamManager | null = null + private ttlSeconds = 0 + private recordTypes: PurgeRecordType[] = [] + + async start(agent: Agent, config: PurgeConfig, webhookUrl: string | undefined): Promise { + const { natsConfig } = config + this.ttlSeconds = natsConfig.ttlSeconds + this.recordTypes = natsConfig.recordTypes + + this.nc = await connect({ + servers: natsConfig.nats.servers, + ...buildNatsAuthenticator(natsConfig.nats), + maxReconnectAttempts: NATS_MAX_RECONNECT_ATTEMPTS, + reconnectTimeWait: NATS_RECONNECT_TIME_WAIT_MS, + }) + this.js = this.nc.jetstream() + this.jsm = await this.nc.jetstreamManager() + + console.log(`[Purge][NATS] Connected to NATS server(s): ${natsConfig.nats.servers}`) + console.log('[Purge][NATS] Provisioning streams...') + agent.config.logger.info('[Purge] Provisioning NATS streams...') + await this.provisionStreams() + console.log('[Purge][NATS] Streams ready') + agent.config.logger.info('[Purge] NATS streams ready') + + console.log('[Purge][NATS] Provisioning consumers...') + agent.config.logger.info('[Purge] Provisioning NATS consumers...') + await this.provisionConsumers() + console.log(`[Purge][NATS] Consumers ready — recordTypes=${this.recordTypes.join(', ')}`) + agent.config.logger.info('[Purge] NATS consumers ready') + + await this.startWorkers(agent, webhookUrl) + + console.log(`[Purge][NATS] Scheduler started — ttlSeconds=${this.ttlSeconds} recordTypes=${this.recordTypes.join(', ')}`) + agent.config.logger.info('[Purge] NatsPurgeScheduler started', { ttlSeconds: this.ttlSeconds }) + } + + async schedulePurge( + recordType: PurgeRecordType, + recordId: string, + tenantId: string, + agentMode: AgentMode, + ): Promise { + if (!this.js) throw new Error('[Purge] NatsPurgeScheduler not started') + + const fireAt = new Date(Date.now() + this.ttlSeconds * 1000).toISOString() + const job: PurgeJob = { recordId, recordType, tenantId, agentMode, scheduledAt: fireAt } + + // tenantScope ensures subjects and dedup IDs are unique across tenants in shared mode + const tenantScope = agentMode === 'shared' ? tenantId : 'dedicated' + // Subject is unique per tenant+record — NATS allows only one active schedule per subject + const scheduleSubject = `${PURGE_SCHEDULER_SUBJECTS[recordType]}.${tenantScope}.${recordId}` + + const h = headers() + h.set('Nats-Schedule', `@at ${fireAt}`) + h.set('Nats-Schedule-Target', PURGE_EXECUTION_SUBJECTS[recordType]) + h.set('Nats-Msg-Id', `purge-${recordType}-${tenantScope}-${recordId}`) + + await this.js.publish(scheduleSubject, sc.encode(JSON.stringify(job)), { headers: h }) + + console.info(`[Purge] Scheduled: ${recordType} recordId=${recordId} tenantId="${tenantId}" agentMode=${agentMode} fireAt=${fireAt}`) + } + + async stop(): Promise { + if (this.nc) { + await this.nc.drain() + this.nc = null + this.js = null + this.jsm = null + } + } + + private async provisionStreams(): Promise { + if (!this.jsm) throw new Error('[Purge] Not connected') + + await this.addOrUpdateStream({ + name: PURGE_STREAM, + subjects: ['purge.schedule.>', 'purge.execute.>'], + retention: RetentionPolicy.Limits, + storage: StorageType.File, + max_age: this.ttlSeconds * 1_000_000_000 + PURGE_STREAM_BUFFER_NS, + discard: DiscardPolicy.Old, + allow_msg_schedules: true, + }) + } + + private async addOrUpdateStream(config: any): Promise { + if (!this.jsm) throw new Error('[Purge] Not connected') + try { + await this.jsm.streams.add(config) + } catch (err: any) { + if (this.isAlreadyExistsError(err)) { + await this.jsm.streams.update(config.name, config) + } else if (err?.message?.includes('subjects overlap')) { + // Stale streams from a previous version — delete and retry + console.warn('[Purge] Subject overlap detected — purging stale streams and retrying') + await this.deleteStaleStreams(config.subjects) + await this.jsm.streams.add(config) + } else { + throw err + } + } + } + + private async deleteStaleStreams(_subjects: string[]): Promise { + if (!this.jsm) return + const list = await this.jsm.streams.list().next() + for (const stream of list) { + if (stream.config.name === PURGE_STREAM) continue + // Only delete streams that explicitly claim purge.* subjects — never touch unrelated streams + const isPurgeStream = stream.config.subjects?.some( + (s: string) => s.startsWith('purge.schedule.') || s.startsWith('purge.execute.'), + ) + if (isPurgeStream) { + console.warn(`[Purge] Deleting stale purge stream: ${stream.config.name}`) + await this.jsm.streams.delete(stream.config.name) + } + } + } + + private async provisionConsumers(): Promise { + if (!this.jsm) throw new Error('[Purge] Not connected') + + for (const recordType of this.recordTypes) { + try { + await this.jsm.consumers.add(PURGE_STREAM, { + durable_name: PURGE_CONSUMER_NAMES[recordType], + ack_policy: AckPolicy.Explicit, + ack_wait: PURGE_CONSUMER_ACK_WAIT_NS, + max_deliver: PURGE_CONSUMER_MAX_DELIVER, + backoff: PURGE_CONSUMER_BACKOFF_NS, + filter_subject: PURGE_EXECUTION_SUBJECTS[recordType], + }) + } catch (err: any) { + if (!this.isAlreadyExistsError(err)) throw err + } + } + } + + private async startWorkers(agent: Agent, webhookUrl: string | undefined): Promise { + if (!this.js) throw new Error('[Purge] Not connected') + + for (const recordType of this.recordTypes) { + const consumerName = PURGE_CONSUMER_NAMES[recordType] + console.log(`[Purge][NATS] Starting worker — recordType=${recordType} consumer=${consumerName}`) + agent.config.logger.info('[Purge] Starting worker', { recordType, consumerName }) + this.runWorkerWithRestart(agent, recordType, consumerName, webhookUrl) + } + } + + private runWorkerWithRestart( + agent: Agent, + recordType: PurgeRecordType, + consumerName: string, + webhookUrl: string | undefined, + attempt = 0, + ): void { + if (!this.js) return + + const delayMs = Math.min(1000 * 2 ** attempt, 60_000) + + const launch = async () => { + const consumer = await this.js!.consumers.get(PURGE_STREAM, consumerName) + const worker = new PurgeWorker(recordType, consumerName, webhookUrl) + await worker.start(agent, consumer) + } + + launch() + .then(() => console.log(`[Purge][NATS] Worker launched — consumer=${consumerName}`)) + .catch((err: Error) => { + if (!this.nc) return // scheduler stopped — do not restart + console.error(`[Purge][NATS] Worker crashed — consumer=${consumerName} attempt=${attempt} retryIn=${delayMs}ms`, err?.message) + agent.config.logger.error('[Purge] Worker crashed — restarting', { consumerName, attempt, delayMs, error: err?.message }) + setTimeout(() => this.runWorkerWithRestart(agent, recordType, consumerName, webhookUrl, attempt + 1), delayMs) + }) + } + + private isAlreadyExistsError(err: any): boolean { + const msg: string = err?.message ?? '' + return ( + msg.includes('stream name already in use') || + msg.includes('consumer name already in use') || + err?.api_error?.err_code === NATS_ERR_STREAM_ALREADY_EXISTS || + err?.api_error?.err_code === NATS_ERR_CONSUMER_ALREADY_EXISTS + ) + } +} + diff --git a/src/routes/swagger.json b/src/routes/swagger.json index 92368c70..cd877124 100644 --- a/src/routes/swagger.json +++ b/src/routes/swagger.json @@ -9848,6 +9848,9 @@ "credentialRequestThId": { "type": "string" }, + "credentialExchangeRecordId": { + "type": "string" + }, "outOfBandRecordId": { "type": "string" }, @@ -9864,6 +9867,7 @@ "required": [ "invitationDid", "credentialRequestThId", + "credentialExchangeRecordId", "outOfBandRecordId", "outOfBandRecord", "invitation", @@ -10409,6 +10413,9 @@ "proofRecordThId": { "type": "string" }, + "proofRecordId": { + "type": "string" + }, "invitationDid": { "type": "string" }, @@ -10425,6 +10432,7 @@ "required": [ "proofMessageId", "proofRecordThId", + "proofRecordId", "invitationDid", "outOfBandRecord", "invitation", diff --git a/src/server.ts b/src/server.ts index 75c315f6..31042563 100644 --- a/src/server.ts +++ b/src/server.ts @@ -40,8 +40,12 @@ export const setupServer = async ( config: ServerConfig, apiKey?: string, ) => { - await otelSDK.start() - agent.config.logger.info('OpenTelemetry SDK started') + if (process.env.OTEL_ENABLED === 'true') { + await otelSDK.start() + agent.config.logger.info('OpenTelemetry SDK started') + } else { + agent.config.logger.info('OpenTelemetry SDK disabled (set OTEL_ENABLED=true to enable)') + } validateAuthConfig() container.registerInstance(Agent, agent as Agent) fs.writeFileSync('config.json', JSON.stringify(config, null, 2)) diff --git a/src/tracer.ts b/src/tracer.ts index ed0ecbe1..3c7048fc 100644 --- a/src/tracer.ts +++ b/src/tracer.ts @@ -27,6 +27,8 @@ const resource = new resourceFromAttributes({ [SemanticResourceAttributes.SERVICE_INSTANCE_ID]: process.env.HOSTNAME, }) +const otelEnabled = process.env.OTEL_ENABLED === 'true' + const traceExporter = new OTLPTraceExporter({ url: process.env.OTEL_TRACES_OTLP_ENDPOINT, headers: { @@ -40,8 +42,13 @@ const logExporter = new OTLPLogExporter({ Authorization: `Api-Key ${process.env.OTEL_HEADERS_KEY}`, }, }) + const logProvider = new LoggerProvider({ resource }) -logProvider.addLogRecordProcessor(new BatchLogRecordProcessor(logExporter)) +// Only add the BatchLogRecordProcessor when OTEL is enabled — otherwise it +// tries to connect to localhost:4318 on every flush even if SDK never started. +if (otelEnabled) { + logProvider.addLogRecordProcessor(new BatchLogRecordProcessor(logExporter)) +} export const otelLogger: Logger = logProvider.getLogger('credo-controller-logger') export const otelLoggerProviderInstance = logProvider diff --git a/src/utils/NatsAuthenticator.ts b/src/utils/NatsAuthenticator.ts new file mode 100644 index 00000000..a3420589 --- /dev/null +++ b/src/utils/NatsAuthenticator.ts @@ -0,0 +1,31 @@ +import { readFileSync } from 'node:fs' + +import type { Authenticator } from 'nats' +import { credsAuthenticator, nkeyAuthenticator, usernamePasswordAuthenticator } from 'nats' + +import type { NatsConfig } from '../purge/PurgeTypes' + +export type NatsAuthType = 'nkey' | 'creds' | 'usernamePassword' | 'none' + +export function buildNatsAuthenticator(nats: NatsConfig): { authenticator?: Authenticator } { + const authType = (process.env.NATS_AUTH_TYPE as NatsAuthType) || 'none' + + switch (authType) { + case 'nkey': + if (!nats.nkeySeed) throw new Error('[NATS] NATS_AUTH_TYPE=nkey but NATS_NKEY_SEED is not set') + return { authenticator: nkeyAuthenticator(new TextEncoder().encode(nats.nkeySeed)) } + + case 'creds': + if (!nats.credentialsFile) throw new Error('[NATS] NATS_AUTH_TYPE=creds but NATS_CREDENTIALS_FILE is not set') + return { authenticator: credsAuthenticator(readFileSync(nats.credentialsFile)) } + + case 'usernamePassword': + if (!nats.username || !nats.password) + throw new Error('[NATS] NATS_AUTH_TYPE=usernamePassword but NATS_USER or NATS_PASSWORD is not set') + return { authenticator: usernamePasswordAuthenticator(nats.username, nats.password) } + + case 'none': + default: + return {} + } +} diff --git a/src/utils/NatsConstants.ts b/src/utils/NatsConstants.ts new file mode 100644 index 00000000..43d85f9f --- /dev/null +++ b/src/utils/NatsConstants.ts @@ -0,0 +1,9 @@ +// -1 means retry forever — the agent should not stop trying to reconnect. +export const NATS_MAX_RECONNECT_ATTEMPTS = -1 + +// How long (ms) to wait between reconnect attempts. +export const NATS_RECONNECT_TIME_WAIT_MS = 2000 + +// JetStream API error codes returned when a stream or consumer already exists. +export const NATS_ERR_STREAM_ALREADY_EXISTS = 10058 +export const NATS_ERR_CONSUMER_ALREADY_EXISTS = 10148 diff --git a/yarn.lock b/yarn.lock index 1f60614e..095114a6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2754,6 +2754,11 @@ dependencies: "@types/express" "*" +"@types/node-cron@^3.0.11": + version "3.0.11" + resolved "https://registry.yarnpkg.com/@types/node-cron/-/node-cron-3.0.11.tgz#70b7131f65038ae63cfe841354c8aba363632344" + integrity sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg== + "@types/node-fetch@^2.6.4": version "2.6.13" resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.13.tgz#e0c9b7b5edbdb1b50ce32c127e85e880872d56ee" @@ -6470,6 +6475,13 @@ napi-postinstall@^0.3.0: resolved "https://registry.yarnpkg.com/napi-postinstall/-/napi-postinstall-0.3.4.tgz#7af256d6588b5f8e952b9190965d6b019653bbb9" integrity sha512-PHI5f1O0EP5xJ9gQmFGMS6IZcrVvTjpXjz7Na41gTE7eE2hK11lg04CECCYEEjdc17EV4DO+fkGEtt7TpTaTiQ== +nats@^2.29.3: + version "2.29.3" + resolved "https://registry.yarnpkg.com/nats/-/nats-2.29.3.tgz#86099c9bc193464d4f8cbb1d3504335bd2719ec6" + integrity sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA== + dependencies: + nkeys.js "1.1.0" + natural-compare@^1.4.0: version "1.4.0" resolved "https://registry.yarnpkg.com/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7" @@ -6509,6 +6521,13 @@ ngrok@^4.3.3: optionalDependencies: hpagent "^0.1.2" +nkeys.js@1.1.0: + version "1.1.0" + resolved "https://registry.yarnpkg.com/nkeys.js/-/nkeys.js-1.1.0.tgz#de83a9a13f396c5b6d7c412788f4b9f7f35d4c18" + integrity sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg== + dependencies: + tweetnacl "1.0.3" + node-addon-api@^2.0.0: version "2.0.2" resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-2.0.2.tgz#432cfa82962ce494b132e9d72a15b29f71ff5d32" @@ -6519,6 +6538,11 @@ node-addon-api@^3.0.0: resolved "https://registry.yarnpkg.com/node-addon-api/-/node-addon-api-3.2.1.tgz#81325e0a2117789c0128dab65e7e38f07ceba161" integrity sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A== +node-cron@^4.2.1: + version "4.2.1" + resolved "https://registry.yarnpkg.com/node-cron/-/node-cron-4.2.1.tgz#6979be4aee4702f06322d21220df8de252c8e265" + integrity sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg== + node-fetch@2, node-fetch@^2.7.0: version "2.7.0" resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.7.0.tgz#d0f0fa6e3e2dc1d27efcd8ad99d550bda94d187d" @@ -7839,6 +7863,11 @@ tsyringe@^4.10.0, tsyringe@^4.6.0: dependencies: tslib "^1.9.3" +tweetnacl@1.0.3: + version "1.0.3" + resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-1.0.3.tgz#ac0af71680458d8a6378d0d0d050ab1407d35596" + integrity sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw== + type-check@^0.4.0, type-check@~0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/type-check/-/type-check-0.4.0.tgz#07b8203bfa7056c0657050e3ccd2c37730bab8f1"