diff --git a/backend/__tests__/trigger.workflow.test.js b/backend/__tests__/trigger.workflow.test.js new file mode 100644 index 0000000..aaf90e1 --- /dev/null +++ b/backend/__tests__/trigger.workflow.test.js @@ -0,0 +1,134 @@ +const test = require('node:test'); +const assert = require('node:assert/strict'); +const { validationSchemas } = require('../src/middleware/validation.middleware'); +const Trigger = require('../src/models/trigger.model'); +const triggerController = require('../src/controllers/trigger.controller'); +const ipWhitelistService = require('../src/services/ipWhitelist.service'); + +const originalSave = Trigger.prototype.save; +const originalValidateUrl = ipWhitelistService.validateUrl; + +function req(body = {}) { + return { + body, + get() { + return 'test-agent'; + }, + ip: '127.0.0.1', + user: { + id: 'user-1', + organization: { _id: 'org-1' }, + }, + }; +} + +function res() { + return { + statusCode: 200, + payload: undefined, + status(code) { + this.statusCode = code; + return this; + }, + json(payload) { + this.payload = payload; + return this; + }, + }; +} + +test.afterEach(() => { + Trigger.prototype.save = originalSave; + ipWhitelistService.validateUrl = originalValidateUrl; +}); + +test('trigger create validation accepts workflow steps without top-level actionUrl', () => { + const { error, value } = validationSchemas.triggerCreate.validate({ + contractId: 'contract-1', + eventName: 'Transfer', + steps: [ + { id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' }, + { id: 'notifyTelegram', actionType: 'telegram', actionUrl: 'chat-1' }, + ], + }); + + assert.equal(error, undefined); + assert.equal(value.steps.length, 2); +}); + +test('trigger create validation rejects workflow with top-level actionUrl', () => { + const { error } = validationSchemas.triggerCreate.validate({ + contractId: 'contract-1', + eventName: 'Transfer', + actionUrl: 'https://example.com/top-level', + steps: [ + { id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' }, + ], + }); + + assert.match(error.message, /cannot also define top-level actionUrl/); +}); + +test('trigger update validation rejects workflow with top-level actionUrl', () => { + const { error } = validationSchemas.triggerUpdate.validate({ + actionUrl: 'https://example.com/top-level', + steps: [ + { id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' }, + ], + }); + + assert.match(error.message, /cannot also define top-level actionUrl/); +}); + +test('trigger create validation rejects duplicate step ids', () => { + const { error } = validationSchemas.triggerCreate.validate({ + contractId: 'contract-1', + eventName: 'Transfer', + steps: [ + { id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/one' }, + { id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/two' }, + ], + }); + + assert.match(error.message, /Workflow step ids must be unique/); +}); + +test('createTrigger validates webhook URLs inside workflow steps', async () => { + const validated = []; + + ipWhitelistService.validateUrl = async (url, organizationId, options) => { + validated.push({ url, organizationId, options }); + return { warnings: [`warning for ${url}`] }; + }; + Trigger.prototype.save = async function save() { + return this; + }; + + const response = res(); + await triggerController.createTrigger(req({ + contractId: 'contract-1', + eventName: 'Transfer', + steps: [ + { id: 'firstHook', actionType: 'webhook', actionUrl: 'https://example.com/one' }, + { id: 'sendMessage', actionType: 'telegram', actionUrl: 'chat-1' }, + { id: 'secondHook', actionType: 'webhook', actionUrl: 'https://example.com/two' }, + ], + }), response, () => {}); + + assert.deepEqual(validated, [ + { + url: 'https://example.com/one', + organizationId: 'org-1', + options: { allowDnsFailure: true }, + }, + { + url: 'https://example.com/two', + organizationId: 'org-1', + options: { allowDnsFailure: true }, + }, + ]); + assert.deepEqual(response.payload.warnings, [ + 'step firstHook: warning for https://example.com/one', + 'step secondHook: warning for https://example.com/two', + ]); +}); diff --git a/backend/__tests__/workflow.processor.test.js b/backend/__tests__/workflow.processor.test.js new file mode 100644 index 0000000..69196c8 --- /dev/null +++ b/backend/__tests__/workflow.processor.test.js @@ -0,0 +1,35 @@ +const test = require('node:test'); +const assert = require('node:assert/strict'); +const processor = require('../src/worker/processor'); + +test('batch workflow dispatch executes one workflow per event payload', async () => { + const trigger = { + _id: 'trigger-1', + organization: 'org-1', + contractId: 'contract-1', + eventName: 'Transfer', + batchingConfig: { continueOnError: true }, + steps: [ + { id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/hook' }, + ], + }; + + const calls = []; + const result = await processor.executeBatchAction(trigger, [ + { sequence: 1 }, + { sequence: 2 }, + ], { + runIdPrefix: 'job-123', + executeStep: async (stepTrigger, eventPayload, options) => { + calls.push({ stepTrigger, eventPayload, options }); + return { sequence: eventPayload.sequence }; + }, + }); + + assert.equal(result.total, 2); + assert.equal(result.successful, 2); + assert.equal(result.failed, 0); + assert.equal(calls.length, 2); + assert.equal(calls[0].options.context.runId, 'job-123:0'); + assert.equal(calls[1].options.context.runId, 'job-123:1'); +}); diff --git a/backend/__tests__/workflow.service.test.js b/backend/__tests__/workflow.service.test.js new file mode 100644 index 0000000..066187f --- /dev/null +++ b/backend/__tests__/workflow.service.test.js @@ -0,0 +1,146 @@ +const test = require('node:test'); +const assert = require('node:assert/strict'); +const { + WorkflowExecutionError, + executeWorkflow, +} = require('../src/services/workflow.service'); +const { resolveTemplates } = require('../src/utils/templater'); + +function trigger(overrides = {}) { + return { + _id: 'trigger-1', + organization: 'org-1', + contractId: 'contract-1', + eventName: 'Transfer', + steps: [], + ...overrides, + }; +} + +test('workflow executes sequential steps and stores keyed results', async () => { + const calls = []; + const result = await executeWorkflow(trigger({ + steps: [ + { id: 'first', actionType: 'webhook', actionUrl: 'https://example.com/first' }, + { id: 'second', actionType: 'telegram', actionUrl: 'chat-1' }, + ], + }), { amount: 10 }, { + runId: 'run-1', + executeStep: async (stepTrigger, eventPayload, options) => { + calls.push({ stepTrigger, eventPayload, options }); + return { receivedStep: stepTrigger.id }; + }, + }); + + assert.equal(result.status, 'succeeded'); + assert.deepEqual(result.context.stepOrder, ['first', 'second']); + assert.equal(result.context.steps.first.success, true); + assert.equal(result.context.steps.second.output.receivedStep, 'second'); + assert.equal(calls[1].options.context.steps.first.output.receivedStep, 'first'); +}); + +test('default runIf success skips the next step after a failure', async () => { + let error; + try { + await executeWorkflow(trigger({ + steps: [ + { id: 'first', actionType: 'webhook', actionUrl: 'https://example.com/first' }, + { id: 'second', actionType: 'webhook', actionUrl: 'https://example.com/second' }, + ], + }), {}, { + runId: 'run-2', + executeStep: async (stepTrigger) => { + if (stepTrigger.id === 'first') { + throw new Error('downstream failed'); + } + return { ok: true }; + }, + }); + } catch (caught) { + error = caught; + } + + assert.ok(error instanceof WorkflowExecutionError); + assert.equal(error.result.status, 'failed'); + assert.equal(error.result.context.steps.first.success, false); + assert.equal(error.result.context.steps.second.skipped, true); +}); + +test('runIf failure allows a compensating step to run', async () => { + let error; + try { + await executeWorkflow(trigger({ + steps: [ + { id: 'primary', actionType: 'webhook', actionUrl: 'https://example.com/primary' }, + { id: 'compensate', actionType: 'telegram', actionUrl: 'chat-1', runIf: 'failure' }, + ], + }), {}, { + runId: 'run-3', + executeStep: async (stepTrigger) => { + if (stepTrigger.id === 'primary') throw new Error('failed'); + return { alertSent: true }; + }, + }); + } catch (caught) { + error = caught; + } + + assert.ok(error instanceof WorkflowExecutionError); + assert.equal(error.result.context.steps.primary.success, false); + assert.equal(error.result.context.steps.compensate.success, true); + assert.deepEqual(error.result.context.steps.compensate.output, { alertSent: true }); +}); + +test('continueOnError returns failed workflow result instead of throwing', async () => { + const result = await executeWorkflow(trigger({ + workflowConfig: { continueOnError: true }, + steps: [ + { id: 'primary', actionType: 'webhook', actionUrl: 'https://example.com/primary' }, + { id: 'cleanup', actionType: 'telegram', actionUrl: 'chat-1', runIf: 'failure' }, + ], + }), {}, { + runId: 'run-4', + executeStep: async (stepTrigger) => { + if (stepTrigger.id === 'primary') throw new Error('failed'); + return { ok: true }; + }, + }); + + assert.equal(result.status, 'failed'); + assert.equal(result.context.steps.cleanup.success, true); +}); + +test('templates resolve against keyed workflow context', async () => { + const calls = []; + await executeWorkflow(trigger({ + steps: [ + { id: 'fetchUser', actionType: 'webhook', actionUrl: 'https://example.com/fetch' }, + { + id: 'notifyUser', + actionType: 'webhook', + actionUrl: 'https://example.com/users/{{steps.fetchUser.output.userId}}', + config: { + message: 'User {{steps.fetchUser.output.userId}} moved {{event.amount}} tokens', + }, + }, + ], + }), { amount: 25 }, { + runId: 'run-5', + executeStep: async (stepTrigger) => { + calls.push(stepTrigger); + if (stepTrigger.id === 'fetchUser') return { userId: 'user-123' }; + return { notified: stepTrigger.actionUrl }; + }, + }); + + assert.equal(calls[1].actionUrl, 'https://example.com/users/user-123'); + assert.equal(calls[1].config.message, 'User user-123 moved 25 tokens'); +}); + +test('missing template paths are left unchanged', () => { + const value = resolveTemplates({ + url: 'https://example.com/{{steps.missing.output.id}}', + }, { steps: {} }); + + assert.equal(value.url, 'https://example.com/{{steps.missing.output.id}}'); +}); diff --git a/backend/src/controllers/trigger.controller.js b/backend/src/controllers/trigger.controller.js index 56a802b..ef5fa38 100644 --- a/backend/src/controllers/trigger.controller.js +++ b/backend/src/controllers/trigger.controller.js @@ -5,18 +5,36 @@ const asyncHandler = require('../utils/asyncHandler'); const ipWhitelistService = require('../services/ipWhitelist.service'); async function validateWebhookDestinationIfNeeded(body, organizationId, currentTrigger = null) { + const warnings = []; + const steps = body.steps ?? currentTrigger?.steps ?? []; + + if (Array.isArray(steps) && steps.length > 0) { + for (const step of steps) { + if (step.actionType !== 'webhook' || !step.actionUrl) { + continue; + } + + const result = await ipWhitelistService.validateUrl(step.actionUrl, organizationId, { + allowDnsFailure: true, + }); + warnings.push(...(result.warnings || []).map((warning) => `step ${step.id}: ${warning}`)); + } + return warnings; + } + const actionType = body.actionType ?? currentTrigger?.actionType ?? 'webhook'; const actionUrl = body.actionUrl ?? currentTrigger?.actionUrl; if (actionType !== 'webhook' || !actionUrl) { - return []; + return warnings; } const result = await ipWhitelistService.validateUrl(actionUrl, organizationId, { allowDnsFailure: true, }); - return result.warnings || []; + warnings.push(...(result.warnings || [])); + return warnings; } exports.createTrigger = asyncHandler(async (req, res) => { @@ -135,9 +153,21 @@ exports.updateTrigger = asyncHandler(async (req, res) => { existingTrigger ); + const updatePayload = { ...req.body }; + const hasWorkflowSteps = Array.isArray(updatePayload.steps) && updatePayload.steps.length > 0; + if (hasWorkflowSteps) { + delete updatePayload.actionUrl; + } + const updateOperation = hasWorkflowSteps + ? { + $set: updatePayload, + $unset: { actionUrl: '' }, + } + : updatePayload; + const trigger = await Trigger.findOneAndUpdate( { _id: req.params.id, organization: req.user.organization._id }, - req.body, + updateOperation, { new: true, runValidators: true } ); diff --git a/backend/src/middleware/validation.middleware.js b/backend/src/middleware/validation.middleware.js index 3296d95..e6bc182 100644 --- a/backend/src/middleware/validation.middleware.js +++ b/backend/src/middleware/validation.middleware.js @@ -28,6 +28,52 @@ const filtersSchema = Joi.array() 'any.invalid': '{{#message}}', }); +const workflowStepIdSchema = Joi.string() + .trim() + .pattern(/^[A-Za-z][A-Za-z0-9_-]{0,63}$/) + .required() + .messages({ + 'string.pattern.base': 'Workflow step id must start with a letter and contain only letters, numbers, underscores, or hyphens', + }); + +const workflowStepSchema = Joi.object({ + id: workflowStepIdSchema, + name: Joi.string().trim(), + actionType: Joi.string().valid('webhook', 'discord', 'email', 'telegram').required(), + actionUrl: Joi.string().trim(), + webhookSecret: Joi.string(), + config: Joi.object().unknown(true).default({}), + runIf: Joi.string().valid('success', 'failure', 'always').default('success'), +}).custom((value, helpers) => { + if (['webhook', 'discord'].includes(value.actionType) && !value.actionUrl) { + return helpers.error('any.invalid', { + message: `Workflow step "${value.id}" requires actionUrl for ${value.actionType}`, + }); + } + if (['webhook', 'discord'].includes(value.actionType)) { + const { error } = Joi.string().uri().validate(value.actionUrl); + if (error) { + return helpers.error('any.invalid', { + message: `Workflow step "${value.id}" actionUrl must be a valid URI`, + }); + } + } + return value; +}, 'workflow step validation').messages({ + 'any.invalid': '{{#message}}', +}); + +const workflowStepsSchema = Joi.array() + .items(workflowStepSchema) + .max(20) + .unique('id') + .messages({ + 'array.unique': 'Workflow step ids must be unique', + }); + +const workflowConfigSchema = Joi.object({ + continueOnError: Joi.boolean().default(false), +}); const headerSchema = Joi.object({ key: Joi.string().trim().required().pattern(/^[_a-zA-Z0-9-]+$/), value: Joi.string().required() @@ -66,6 +112,49 @@ const validationSchemas = { triggerCreate: Joi.object({ contractId: Joi.string().trim().required(), eventName: Joi.string().trim().required(), + actionType: Joi.string().valid('webhook', 'discord', 'email', 'telegram'), + actionUrl: Joi.string().trim().uri(), + isActive: Joi.boolean().default(true), + lastPolledLedger: Joi.number().integer().min(0).default(0), + filters: filtersSchema.default([]), + steps: workflowStepsSchema.default([]), + workflowConfig: workflowConfigSchema.default({ continueOnError: false }), + }).custom((value, helpers) => { + const hasWorkflowSteps = Array.isArray(value.steps) && value.steps.length > 0; + if (hasWorkflowSteps && value.actionUrl) { + return helpers.error('any.invalid', { + message: 'Workflow triggers cannot also define top-level actionUrl', + }); + } + if (!hasWorkflowSteps && !value.actionUrl) { + return helpers.error('any.invalid', { + message: 'Trigger actionUrl is required when steps are not provided', + }); + } + return value; + }, 'trigger workflow validation').messages({ + 'any.invalid': '{{#message}}', + }), + triggerUpdate: Joi.object({ + contractId: Joi.string().trim(), + eventName: Joi.string().trim(), + actionType: Joi.string().valid('webhook', 'discord', 'email', 'telegram'), + actionUrl: Joi.string().trim().uri(), + isActive: Joi.boolean(), + lastPolledLedger: Joi.number().integer().min(0), + filters: filtersSchema, + steps: workflowStepsSchema, + workflowConfig: workflowConfigSchema, + }).min(1).custom((value, helpers) => { + const hasWorkflowSteps = Array.isArray(value.steps) && value.steps.length > 0; + if (hasWorkflowSteps && value.actionUrl) { + return helpers.error('any.invalid', { + message: 'Workflow triggers cannot also define top-level actionUrl', + }); + } + return value; + }, 'trigger workflow update validation').messages({ + 'any.invalid': '{{#message}}', actionType: Joi.string().valid('webhook', 'discord', 'email', 'telegram').default('webhook'), actionUrl: Joi.string().trim().uri().required(), webhookSecret: Joi.string().when('actionType', { diff --git a/backend/src/models/trigger.model.js b/backend/src/models/trigger.model.js index 7c10ebd..ec334b6 100644 --- a/backend/src/models/trigger.model.js +++ b/backend/src/models/trigger.model.js @@ -29,6 +29,44 @@ const filterSchema = new mongoose.Schema({ }, }, { _id: false }); +const WORKFLOW_ACTION_TYPES = ['webhook', 'discord', 'email', 'telegram']; +const WORKFLOW_RUN_IF = ['success', 'failure', 'always']; +const WORKFLOW_STEP_ID_PATTERN = /^[A-Za-z][A-Za-z0-9_-]{0,63}$/; + +const workflowStepSchema = new mongoose.Schema({ + id: { + type: String, + required: true, + trim: true, + match: WORKFLOW_STEP_ID_PATTERN, + }, + name: { + type: String, + trim: true, + }, + actionType: { + type: String, + enum: WORKFLOW_ACTION_TYPES, + required: true, + }, + actionUrl: { + type: String, + trim: true, + }, + webhookSecret: { + type: String, + }, + config: { + type: mongoose.Schema.Types.Mixed, + default: {}, + }, + runIf: { + type: String, + enum: WORKFLOW_RUN_IF, + default: 'success', + }, +}, { _id: false }); + const triggerSchema = new mongoose.Schema({ organization: { type: mongoose.Schema.Types.ObjectId, @@ -57,7 +95,9 @@ const triggerSchema = new mongoose.Schema({ }, actionUrl: { type: String, - required: true + required: function requiredActionUrl() { + return !this.steps || this.steps.length === 0; + } }, webhookSecret: { type: String, @@ -147,6 +187,15 @@ const triggerSchema = new mongoose.Schema({ type: [filterSchema], default: [], }, + steps: { + type: [workflowStepSchema], + default: [], + }, + workflowConfig: { + continueOnError: { + type: Boolean, + default: false, + }, // Custom headers for webhook actions customHeaders: { type: [{ @@ -259,4 +308,7 @@ const Trigger = mongoose.model('Trigger', triggerSchema); module.exports = Trigger; module.exports.FILTER_OPERATORS = FILTER_OPERATORS; +module.exports.WORKFLOW_ACTION_TYPES = WORKFLOW_ACTION_TYPES; +module.exports.WORKFLOW_RUN_IF = WORKFLOW_RUN_IF; +module.exports.WORKFLOW_STEP_ID_PATTERN = WORKFLOW_STEP_ID_PATTERN; module.exports.MAX_CONSECUTIVE_FAILURES = MAX_CONSECUTIVE_FAILURES; diff --git a/backend/src/routes/docs.routes.js b/backend/src/routes/docs.routes.js index fb10bc9..04923e5 100644 --- a/backend/src/routes/docs.routes.js +++ b/backend/src/routes/docs.routes.js @@ -35,6 +35,7 @@ const router = express.Router(); * - webhook * - discord * - email + * - telegram * default: webhook * example: webhook * actionUrl: @@ -42,6 +43,13 @@ const router = express.Router(); * format: uri * description: Destination URL or integration endpoint for the action. * example: https://example.com/webhooks/event-horizon + * steps: + * type: array + * description: Ordered workflow steps. When present, top-level actionUrl is not used. + * items: + * $ref: '#/components/schemas/WorkflowStep' + * workflowConfig: + * $ref: '#/components/schemas/WorkflowConfig' * isActive: * type: boolean * default: true @@ -92,16 +100,70 @@ const router = express.Router(); * - webhook * - discord * - email + * - telegram * default: webhook * example: webhook * actionUrl: * type: string * format: uri * example: https://example.com/webhooks/event-horizon + * steps: + * type: array + * description: Ordered workflow steps. Omit actionUrl when steps are provided. + * items: + * $ref: '#/components/schemas/WorkflowStep' + * workflowConfig: + * $ref: '#/components/schemas/WorkflowConfig' * isActive: * type: boolean * default: true * example: true + * WorkflowStep: + * type: object + * required: + * - id + * - actionType + * properties: + * id: + * type: string + * pattern: '^[A-Za-z][A-Za-z0-9_-]{0,63}$' + * example: notifyWebhook + * name: + * type: string + * example: Notify partner webhook + * actionType: + * type: string + * enum: + * - webhook + * - discord + * - email + * - telegram + * example: webhook + * actionUrl: + * type: string + * format: uri + * example: https://example.com/workflow-step + * webhookSecret: + * type: string + * description: Optional per-step webhook signing secret. + * config: + * type: object + * additionalProperties: true + * description: Action-specific settings. String values may reference workflow context templates. + * runIf: + * type: string + * enum: + * - success + * - failure + * - always + * default: success + * WorkflowConfig: + * type: object + * properties: + * continueOnError: + * type: boolean + * default: false + * description: Return a failed workflow result instead of throwing after failed steps. * authConfig: * type: object * properties: @@ -233,4 +295,4 @@ router.use('/', swaggerUi.serve, swaggerUi.setup(swaggerSpec, { customSiteTitle: 'EventHorizon API Docs', })); -module.exports = router; \ No newline at end of file +module.exports = router; diff --git a/backend/src/routes/trigger.routes.js b/backend/src/routes/trigger.routes.js index e896e93..131d9da 100644 --- a/backend/src/routes/trigger.routes.js +++ b/backend/src/routes/trigger.routes.js @@ -148,6 +148,7 @@ router.put('/:id', authMiddleware, permissionMiddleware('update_trigger'), auditMiddleware.auditUpdate(), + validateBody(validationSchemas.triggerUpdate), triggerController.updateTrigger ); diff --git a/backend/src/services/actionExecutor.service.js b/backend/src/services/actionExecutor.service.js new file mode 100644 index 0000000..e03b3d7 --- /dev/null +++ b/backend/src/services/actionExecutor.service.js @@ -0,0 +1,98 @@ +const telegramService = require('./telegram.service'); +const webhookService = require('./webhook.service'); + +function workflowPayload(context, stepId) { + if (!context) return undefined; + + return { + runId: context.runId, + stepId, + previousSteps: context.steps, + stepOrder: context.stepOrder, + lastResult: context.lastResult, + }; +} + +async function executeSingleAction(trigger, eventPayload, options = {}) { + const { actionType, actionUrl, contractId, eventName } = trigger; + const { context, stepId, webhookPayload } = options; + + switch (actionType) { + case 'email': { + const { sendEventNotification } = require('./email.service'); + return await sendEventNotification({ + trigger, + payload: eventPayload, + }); + } + + case 'discord': { + const { sendDiscordNotification } = require('./discord.service'); + if (!actionUrl) { + throw new Error('Missing actionUrl for Discord trigger'); + } + + const discordPayload = { + embeds: [{ + title: `Event: ${eventName}`, + description: `Contract: ${contractId}`, + fields: [ + { + name: 'Payload', + value: `\`\`\`json\n${JSON.stringify(eventPayload, null, 2).slice(0, 1000)}\n\`\`\``, + }, + ], + color: 0x5865F2, + timestamp: new Date().toISOString(), + }], + }; + + return await sendDiscordNotification(actionUrl, discordPayload); + } + + case 'telegram': { + const botToken = trigger.botToken || trigger.config?.botToken || process.env.TELEGRAM_BOT_TOKEN; + const chatId = trigger.chatId || trigger.config?.chatId || actionUrl; + if (!botToken || !chatId) { + throw new Error('Missing botToken or chatId for Telegram trigger'); + } + + const message = trigger.config?.message || `🔔 *Event Triggered*\n\n` + + `*Event:* ${telegramService.escapeMarkdownV2(eventName)}\n` + + `*Contract:* \`${telegramService.escapeMarkdownV2(contractId)}\`\n\n` + + `*Payload:*\n\`\`\`\n${telegramService.escapeMarkdownV2(JSON.stringify(eventPayload, null, 2))}\n\`\`\``; + + return await telegramService.sendTelegramMessage(botToken, chatId, message); + } + + case 'webhook': { + if (!actionUrl) { + throw new Error('Missing actionUrl for webhook trigger'); + } + + const payload = webhookPayload || { + contractId, + eventName, + payload: eventPayload, + }; + const workflow = workflowPayload(context, stepId); + if (workflow) { + payload.workflow = workflow; + } + + return await webhookService.sendSignedWebhook( + actionUrl, + payload, + trigger.webhookSecret || trigger.config?.webhookSecret, + { organizationId: trigger.organization } + ); + } + + default: + throw new Error(`Unsupported action type: ${actionType}`); + } +} + +module.exports = { + executeSingleAction, +}; diff --git a/backend/src/services/workflow.service.js b/backend/src/services/workflow.service.js index 06b328a..a7893bb 100644 --- a/backend/src/services/workflow.service.js +++ b/backend/src/services/workflow.service.js @@ -1,3 +1,157 @@ +const crypto = require('crypto'); +const { executeSingleAction } = require('./actionExecutor.service'); +const logger = require('../config/logger'); +const { resolveTemplates } = require('../utils/templater'); + +class WorkflowExecutionError extends Error { + constructor(message, result) { + super(message); + this.name = 'WorkflowExecutionError'; + this.result = result; + } +} + +function generateRunId() { + if (typeof crypto.randomUUID === 'function') { + return crypto.randomUUID(); + } + return crypto.randomBytes(16).toString('hex'); +} + +function shouldRunStep(runIf, previousResult) { + const condition = runIf || 'success'; + if (condition === 'always') return true; + if (!previousResult) return condition === 'success'; + if (condition === 'success') return previousResult.success === true; + if (condition === 'failure') return previousResult.success === false; + return false; +} + +function createContext(trigger, eventPayload, runId) { + return { + runId: runId || generateRunId(), + event: eventPayload, + trigger: { + id: trigger._id, + contractId: trigger.contractId, + eventName: trigger.eventName, + organization: trigger.organization, + }, + steps: {}, + stepOrder: [], + lastResult: null, + }; +} + +function buildStepTrigger(trigger, step) { + return { + ...trigger, + ...step, + contractId: trigger.contractId, + eventName: trigger.eventName, + organization: trigger.organization, + webhookSecret: step.webhookSecret || step.config?.webhookSecret || trigger.webhookSecret, + }; +} + +async function executeWorkflow(trigger, eventPayload, options = {}) { + const steps = Array.isArray(trigger.steps) ? trigger.steps : []; + if (steps.length === 0) { + throw new Error('Workflow trigger requires at least one step'); + } + + const executor = options.executeStep || executeSingleAction; + const context = createContext(trigger, eventPayload, options.runId); + const continueOnError = trigger.workflowConfig?.continueOnError === true; + let previousResult = null; + let failed = false; + + for (const rawStep of steps) { + const plainStep = typeof rawStep.toObject === 'function' + ? rawStep.toObject({ depopulate: true }) + : rawStep; + const step = resolveTemplates(plainStep, context); + const startedAt = Date.now(); + + if (!shouldRunStep(step.runIf, previousResult)) { + const skipped = { + id: step.id, + actionType: step.actionType, + success: false, + skipped: true, + output: null, + error: `Skipped because runIf "${step.runIf || 'success'}" was not satisfied`, + durationMs: 0, + }; + context.steps[step.id] = skipped; + context.stepOrder.push(step.id); + context.lastResult = skipped; + previousResult = skipped; + continue; + } + + try { + const output = await executor( + buildStepTrigger(trigger, step), + eventPayload, + { context, stepId: step.id } + ); + const result = { + id: step.id, + actionType: step.actionType, + success: true, + skipped: false, + output, + error: null, + durationMs: Date.now() - startedAt, + }; + context.steps[step.id] = result; + context.stepOrder.push(step.id); + context.lastResult = result; + previousResult = result; + } catch (error) { + failed = true; + const result = { + id: step.id, + actionType: step.actionType, + success: false, + skipped: false, + output: null, + error: error.message, + durationMs: Date.now() - startedAt, + }; + context.steps[step.id] = result; + context.stepOrder.push(step.id); + context.lastResult = result; + previousResult = result; + + logger.error('Workflow step failed', { + runId: context.runId, + stepId: step.id, + actionType: step.actionType, + error: error.message, + }); + } + } + + const result = { + runId: context.runId, + status: failed ? 'failed' : 'succeeded', + context, + }; + + if (failed && !continueOnError) { + throw new WorkflowExecutionError('Workflow execution failed', result); + } + + return result; +} + +module.exports = { + WorkflowExecutionError, + executeWorkflow, + shouldRunStep, +}; "use strict"; /** diff --git a/backend/src/utils/templater.js b/backend/src/utils/templater.js new file mode 100644 index 0000000..a01ea51 --- /dev/null +++ b/backend/src/utils/templater.js @@ -0,0 +1,48 @@ +const TEMPLATE_PATTERN = /\{\{\s*([A-Za-z0-9_.-]+)\s*\}\}/g; + +function getByPath(source, path) { + if (!source || !path) return undefined; + + return path.split('.').reduce((current, segment) => { + if (current == null || !Object.prototype.hasOwnProperty.call(Object(current), segment)) { + return undefined; + } + return current[segment]; + }, source); +} + +function stringifyTemplateValue(value) { + if (value == null) return ''; + if (typeof value === 'object') return JSON.stringify(value); + return String(value); +} + +function resolveTemplateString(value, context) { + return value.replace(TEMPLATE_PATTERN, (match, path) => { + const resolved = getByPath(context, path); + return resolved === undefined ? match : stringifyTemplateValue(resolved); + }); +} + +function resolveTemplates(value, context) { + if (typeof value === 'string') { + return resolveTemplateString(value, context); + } + + if (Array.isArray(value)) { + return value.map((item) => resolveTemplates(item, context)); + } + + if (value && typeof value === 'object') { + return Object.fromEntries( + Object.entries(value).map(([key, child]) => [key, resolveTemplates(child, context)]) + ); + } + + return value; +} + +module.exports = { + getByPath, + resolveTemplates, +}; diff --git a/backend/src/worker/poller.js b/backend/src/worker/poller.js index 6ccc1aa..5fdaaaa 100644 --- a/backend/src/worker/poller.js +++ b/backend/src/worker/poller.js @@ -120,6 +120,11 @@ try { }); // Fallback: direct execution with full action routing + const { executeSingleAction } = require('../services/actionExecutor.service'); + const { executeWorkflow } = require('../services/workflow.service'); + + enqueueAction = async function executeTriggerActionDirect(trigger, eventPayload) { + const { actionType, contractId, eventName } = trigger; const axios = require('axios'); const { sendEventNotification } = require('../services/email.service'); const { sendDiscordNotification } = require('../services/discord.service'); @@ -137,6 +142,8 @@ try { eventName, }); + if (trigger.steps?.length > 0) { + return await executeWorkflow(trigger, eventPayload); try { let result; switch (actionType) { @@ -239,6 +246,8 @@ try { } throw err; } + + return await executeSingleAction(trigger, eventPayload); }; } diff --git a/backend/src/worker/processor.js b/backend/src/worker/processor.js index 49ff9f7..6025075 100644 --- a/backend/src/worker/processor.js +++ b/backend/src/worker/processor.js @@ -1,5 +1,7 @@ const { Worker } = require('bullmq'); const Redis = require('ioredis'); +const { executeSingleAction } = require('../services/actionExecutor.service'); +const { executeWorkflow } = require('../services/workflow.service'); const axios = require('axios'); const { performance } = require('perf_hooks'); const { sendEventNotification } = require('../services/email.service'); @@ -62,6 +64,9 @@ const connection = new Redis(connectionConfig); * Execute the action based on the trigger type */ async function executeAction(job) { + const { trigger, eventPayload, eventPayloads, isBatch } = job.data; + const { contractId, eventName } = trigger; + const actionType = trigger.steps?.length > 0 ? 'workflow' : trigger.actionType; let { trigger, eventPayload, eventPayloads, isBatch } = job.data; const { actionType, actionUrl, contractId, eventName } = trigger; @@ -159,6 +164,11 @@ async function executeAction(job) { } if (isBatch) { + return await executeBatchAction(trigger, eventPayloads, { runIdPrefix: job.id }); + } + + if (trigger.steps?.length > 0) { + return await executeWorkflow(trigger, eventPayload, { runId: String(job.id) }); result = await executeBatchAction(trigger, eventPayloads); } else { result = await executeSingleAction(trigger, eventPayload); @@ -275,13 +285,16 @@ async function executeSingleAction(trigger, eventPayload) { default: throw new Error(`Unsupported action type: ${actionType}`); } + + return await executeSingleAction(trigger, eventPayload); } /** * Execute a batch action with error handling for individual events */ -async function executeBatchAction(trigger, eventPayloads) { - const { actionType, actionUrl, contractId, eventName, batchingConfig } = trigger; +async function executeBatchAction(trigger, eventPayloads, options = {}) { + const { contractId, eventName, batchingConfig } = trigger; + const actionType = trigger.steps?.length > 0 ? 'workflow' : trigger.actionType; const continueOnError = batchingConfig?.continueOnError ?? true; let webhookHeaders = {}; @@ -319,6 +332,22 @@ async function executeBatchAction(trigger, eventPayloads) { const eventPayload = eventPayloads[i]; try { + if (trigger.steps?.length > 0) { + await executeWorkflow(trigger, eventPayload, { + runId: options.runIdPrefix ? `${options.runIdPrefix}:${i}` : undefined, + executeStep: options.executeStep, + }); + } else { + const webhookPayload = actionType === 'webhook' ? { + contractId, + eventName, + payload: eventPayload, + batchIndex: i, + batchSize: eventPayloads.length, + batchPayloads: eventPayloads, + } : undefined; + + await executeSingleAction(trigger, eventPayload, { webhookPayload }); switch (actionType) { case 'email': { const { sendEventNotification } = require('../services/email.service'); @@ -650,6 +679,7 @@ module.exports = { createWorker, connection, executeAction, + executeBatchAction, executeSingleAction, executeBatchAction, executeWebhookBatchAction diff --git a/backend/src/worker/queue.js b/backend/src/worker/queue.js index 631a314..7e8c301 100644 --- a/backend/src/worker/queue.js +++ b/backend/src/worker/queue.js @@ -30,6 +30,13 @@ const getActionQueue = (network = 'testnet') => { const enqueueAction = async (trigger, eventPayload) => { const network = trigger.network || 'testnet'; const queue = getActionQueue(network); + const jobName = trigger.steps?.length > 0 + ? `workflow-${trigger._id}` + : `${trigger.actionType}-${trigger._id}`; + + await queue.add( + jobName, + { trigger, eventPayload }, const _traceContext = injectContextIntoCarrier({}); @@ -72,4 +79,5 @@ const cleanQueue = async () => { } }; -module.exports = { getActionQueue, enqueueAction, enqueueBatchAction, getQueueStats, cleanQueue, queues }; \ No newline at end of file +module.exports = { getActionQueue, enqueueAction, getQueueStats, cleanQueue, queues }; +module.exports = { getActionQueue, enqueueAction, enqueueBatchAction, getQueueStats, cleanQueue, queues }; diff --git a/docs/workflows.md b/docs/workflows.md new file mode 100644 index 0000000..2548948 --- /dev/null +++ b/docs/workflows.md @@ -0,0 +1,155 @@ +# Conditional Action Workflows + +EventHorizon triggers can execute ordered backend workflows by adding `steps` to the trigger payload. Workflows are backend-only and run sequentially for each matched Soroban event. + +## Schema + +```json +{ + "contractId": "CAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + "eventName": "Transfer", + "steps": [ + { + "id": "notifyPartner", + "name": "Notify partner system", + "actionType": "webhook", + "actionUrl": "https://partner.example.com/events", + "webhookSecret": "optional-step-secret", + "runIf": "success", + "config": {} + }, + { + "id": "sendFallbackAlert", + "actionType": "telegram", + "actionUrl": "123456789", + "runIf": "failure", + "config": { + "message": "Partner webhook failed for {{event.transactionHash}}" + } + } + ] +} +``` + +Workflow step fields: + +- `id`: required stable identifier. It must start with a letter and contain only letters, numbers, underscores, or hyphens. +- `name`: optional display name used for logs and future UI. +- `actionType`: one of `webhook`, `discord`, `email`, or `telegram`. +- `actionUrl`: destination URL or integration identifier. Required for `webhook` and `discord`. +- `webhookSecret`: optional per-step webhook signing secret. +- `config`: action-specific settings. String values can use workflow templates. +- `runIf`: `success`, `failure`, or `always`. Defaults to `success`. + +When `steps` is present, the trigger must not include top-level `actionUrl`. Existing single-action triggers without `steps` continue to run through the previous action path. + +## Execution Rules + +Steps execute in array order. Each step result is written into a workflow context: + +```json +{ + "runId": "bullmq-job-id-or-generated-id", + "event": {}, + "trigger": { + "contractId": "CAX...", + "eventName": "Transfer" + }, + "stepOrder": ["notifyPartner"], + "steps": { + "notifyPartner": { + "id": "notifyPartner", + "actionType": "webhook", + "success": true, + "skipped": false, + "output": {}, + "error": null, + "durationMs": 42 + } + }, + "lastResult": {} +} +``` + +`runIf` is evaluated against the immediately previous step result: + +- `success`: run only if the previous step succeeded. For the first step, this evaluates to true. +- `failure`: run only if the previous step failed. +- `always`: run regardless of the previous step result. + +If a step fails, later `runIf: "failure"` steps can run as compensating actions. Unless `workflowConfig.continueOnError` is set to `true`, the workflow reports failure after all eligible steps are evaluated. + +`continueOnError` only changes whether a failed workflow throws at the end. It does not make default `runIf: "success"` steps continue after a failure. To continue running later steps after a failed step, set those later steps to `runIf: "always"` or `runIf: "failure"`. + +## State Passing + +Templates are resolved before each step executes. Templates are read-only dot paths against the workflow context and do not evaluate code. + +```json +{ + "id": "notifyUser", + "actionType": "webhook", + "actionUrl": "https://example.com/users/{{steps.lookupUser.output.userId}}", + "config": { + "message": "Transfer amount: {{event.amount}}" + } +} +``` + +Missing paths are left unchanged so bad templates are visible in downstream payloads and logs. + +Webhook workflow steps include workflow metadata in the signed body: + +```json +{ + "contractId": "CAX...", + "eventName": "Transfer", + "payload": {}, + "workflow": { + "runId": "123", + "stepId": "notifyPartner", + "previousSteps": {}, + "stepOrder": [], + "lastResult": null + } +} +``` + +Receivers should use `workflow.runId` and `workflow.stepId` for deduplication. + +## Retry And Idempotency + +BullMQ retries the whole job when a workflow fails. This means a previously successful step may run again on retry. Workflow receivers should be idempotent and dedupe by `runId` plus `stepId`. + +Persisted step resume is intentionally out of scope for the first workflow backend PR. + +## Benchmark Notes + +Workflow overhead is small compared with outbound network latency. A local mocked executor benchmark can be run from a Node REPL or temporary script: + +```js +const { executeWorkflow } = require('./backend/src/services/workflow.service'); + +async function bench(iterations) { + const trigger = { + contractId: 'contract', + eventName: 'Event', + steps: [ + { id: 'a', actionType: 'webhook', actionUrl: 'https://example.com/a' }, + { id: 'b', actionType: 'webhook', actionUrl: 'https://example.com/{{steps.a.output.id}}' } + ] + }; + const start = Date.now(); + for (let i = 0; i < iterations; i++) { + await executeWorkflow(trigger, { i }, { + runId: `bench-${i}`, + executeStep: async (step) => ({ id: step.id }) + }); + } + return { iterations, totalMs: Date.now() - start }; +} + +Promise.all([bench(10), bench(100), bench(1000)]).then(console.log); +``` + +The benchmark excludes external APIs by using a mocked step executor.