Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions backend/__tests__/trigger.workflow.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const { validationSchemas } = require('../src/middleware/validation.middleware');
const Trigger = require('../src/models/trigger.model');
const triggerController = require('../src/controllers/trigger.controller');
const ipWhitelistService = require('../src/services/ipWhitelist.service');

const originalSave = Trigger.prototype.save;
const originalValidateUrl = ipWhitelistService.validateUrl;

function req(body = {}) {
return {
body,
get() {
return 'test-agent';
},
ip: '127.0.0.1',
user: {
id: 'user-1',
organization: { _id: 'org-1' },
},
};
}

function res() {
return {
statusCode: 200,
payload: undefined,
status(code) {
this.statusCode = code;
return this;
},
json(payload) {
this.payload = payload;
return this;
},
};
}

test.afterEach(() => {
Trigger.prototype.save = originalSave;
ipWhitelistService.validateUrl = originalValidateUrl;
});

test('trigger create validation accepts workflow steps without top-level actionUrl', () => {
const { error, value } = validationSchemas.triggerCreate.validate({
contractId: 'contract-1',
eventName: 'Transfer',
steps: [
{ id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' },
{ id: 'notifyTelegram', actionType: 'telegram', actionUrl: 'chat-1' },
],
});

assert.equal(error, undefined);
assert.equal(value.steps.length, 2);
});

test('trigger create validation rejects workflow with top-level actionUrl', () => {
const { error } = validationSchemas.triggerCreate.validate({
contractId: 'contract-1',
eventName: 'Transfer',
actionUrl: 'https://example.com/top-level',
steps: [
{ id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' },
],
});

assert.match(error.message, /cannot also define top-level actionUrl/);
});

test('trigger update validation rejects workflow with top-level actionUrl', () => {
const { error } = validationSchemas.triggerUpdate.validate({
actionUrl: 'https://example.com/top-level',
steps: [
{ id: 'notifyWebhook', actionType: 'webhook', actionUrl: 'https://example.com/hook' },
],
});

assert.match(error.message, /cannot also define top-level actionUrl/);
});

test('trigger create validation rejects duplicate step ids', () => {
const { error } = validationSchemas.triggerCreate.validate({
contractId: 'contract-1',
eventName: 'Transfer',
steps: [
{ id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/one' },
{ id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/two' },
],
});

assert.match(error.message, /Workflow step ids must be unique/);
});

test('createTrigger validates webhook URLs inside workflow steps', async () => {
const validated = [];

ipWhitelistService.validateUrl = async (url, organizationId, options) => {
validated.push({ url, organizationId, options });
return { warnings: [`warning for ${url}`] };
};
Trigger.prototype.save = async function save() {
return this;
};

const response = res();
await triggerController.createTrigger(req({
contractId: 'contract-1',
eventName: 'Transfer',
steps: [
{ id: 'firstHook', actionType: 'webhook', actionUrl: 'https://example.com/one' },
{ id: 'sendMessage', actionType: 'telegram', actionUrl: 'chat-1' },
{ id: 'secondHook', actionType: 'webhook', actionUrl: 'https://example.com/two' },
],
}), response, () => {});

assert.deepEqual(validated, [
{
url: 'https://example.com/one',
organizationId: 'org-1',
options: { allowDnsFailure: true },
},
{
url: 'https://example.com/two',
organizationId: 'org-1',
options: { allowDnsFailure: true },
},
]);
assert.deepEqual(response.payload.warnings, [
'step firstHook: warning for https://example.com/one',
'step secondHook: warning for https://example.com/two',
]);
});
35 changes: 35 additions & 0 deletions backend/__tests__/workflow.processor.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const processor = require('../src/worker/processor');

test('batch workflow dispatch executes one workflow per event payload', async () => {
const trigger = {
_id: 'trigger-1',
organization: 'org-1',
contractId: 'contract-1',
eventName: 'Transfer',
batchingConfig: { continueOnError: true },
steps: [
{ id: 'notify', actionType: 'webhook', actionUrl: 'https://example.com/hook' },
],
};

const calls = [];
const result = await processor.executeBatchAction(trigger, [
{ sequence: 1 },
{ sequence: 2 },
], {
runIdPrefix: 'job-123',
executeStep: async (stepTrigger, eventPayload, options) => {
calls.push({ stepTrigger, eventPayload, options });
return { sequence: eventPayload.sequence };
},
});

assert.equal(result.total, 2);
assert.equal(result.successful, 2);
assert.equal(result.failed, 0);
assert.equal(calls.length, 2);
assert.equal(calls[0].options.context.runId, 'job-123:0');
assert.equal(calls[1].options.context.runId, 'job-123:1');
});
146 changes: 146 additions & 0 deletions backend/__tests__/workflow.service.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const {
WorkflowExecutionError,
executeWorkflow,
} = require('../src/services/workflow.service');
const { resolveTemplates } = require('../src/utils/templater');

function trigger(overrides = {}) {
return {
_id: 'trigger-1',
organization: 'org-1',
contractId: 'contract-1',
eventName: 'Transfer',
steps: [],
...overrides,
};
}

test('workflow executes sequential steps and stores keyed results', async () => {
const calls = [];
const result = await executeWorkflow(trigger({
steps: [
{ id: 'first', actionType: 'webhook', actionUrl: 'https://example.com/first' },
{ id: 'second', actionType: 'telegram', actionUrl: 'chat-1' },
],
}), { amount: 10 }, {
runId: 'run-1',
executeStep: async (stepTrigger, eventPayload, options) => {
calls.push({ stepTrigger, eventPayload, options });
return { receivedStep: stepTrigger.id };
},
});

assert.equal(result.status, 'succeeded');
assert.deepEqual(result.context.stepOrder, ['first', 'second']);
assert.equal(result.context.steps.first.success, true);
assert.equal(result.context.steps.second.output.receivedStep, 'second');
assert.equal(calls[1].options.context.steps.first.output.receivedStep, 'first');
});

test('default runIf success skips the next step after a failure', async () => {
let error;
try {
await executeWorkflow(trigger({
steps: [
{ id: 'first', actionType: 'webhook', actionUrl: 'https://example.com/first' },
{ id: 'second', actionType: 'webhook', actionUrl: 'https://example.com/second' },
],
}), {}, {
runId: 'run-2',
executeStep: async (stepTrigger) => {
if (stepTrigger.id === 'first') {
throw new Error('downstream failed');
}
return { ok: true };
},
});
} catch (caught) {
error = caught;
}

assert.ok(error instanceof WorkflowExecutionError);
assert.equal(error.result.status, 'failed');
assert.equal(error.result.context.steps.first.success, false);
assert.equal(error.result.context.steps.second.skipped, true);
});

test('runIf failure allows a compensating step to run', async () => {
let error;
try {
await executeWorkflow(trigger({
steps: [
{ id: 'primary', actionType: 'webhook', actionUrl: 'https://example.com/primary' },
{ id: 'compensate', actionType: 'telegram', actionUrl: 'chat-1', runIf: 'failure' },
],
}), {}, {
runId: 'run-3',
executeStep: async (stepTrigger) => {
if (stepTrigger.id === 'primary') throw new Error('failed');
return { alertSent: true };
},
});
} catch (caught) {
error = caught;
}

assert.ok(error instanceof WorkflowExecutionError);
assert.equal(error.result.context.steps.primary.success, false);
assert.equal(error.result.context.steps.compensate.success, true);
assert.deepEqual(error.result.context.steps.compensate.output, { alertSent: true });
});

test('continueOnError returns failed workflow result instead of throwing', async () => {
const result = await executeWorkflow(trigger({
workflowConfig: { continueOnError: true },
steps: [
{ id: 'primary', actionType: 'webhook', actionUrl: 'https://example.com/primary' },
{ id: 'cleanup', actionType: 'telegram', actionUrl: 'chat-1', runIf: 'failure' },
],
}), {}, {
runId: 'run-4',
executeStep: async (stepTrigger) => {
if (stepTrigger.id === 'primary') throw new Error('failed');
return { ok: true };
},
});

assert.equal(result.status, 'failed');
assert.equal(result.context.steps.cleanup.success, true);
});

test('templates resolve against keyed workflow context', async () => {
const calls = [];
await executeWorkflow(trigger({
steps: [
{ id: 'fetchUser', actionType: 'webhook', actionUrl: 'https://example.com/fetch' },
{
id: 'notifyUser',
actionType: 'webhook',
actionUrl: 'https://example.com/users/{{steps.fetchUser.output.userId}}',
config: {
message: 'User {{steps.fetchUser.output.userId}} moved {{event.amount}} tokens',
},
},
],
}), { amount: 25 }, {
runId: 'run-5',
executeStep: async (stepTrigger) => {
calls.push(stepTrigger);
if (stepTrigger.id === 'fetchUser') return { userId: 'user-123' };
return { notified: stepTrigger.actionUrl };
},
});

assert.equal(calls[1].actionUrl, 'https://example.com/users/user-123');
assert.equal(calls[1].config.message, 'User user-123 moved 25 tokens');
});

test('missing template paths are left unchanged', () => {
const value = resolveTemplates({
url: 'https://example.com/{{steps.missing.output.id}}',
}, { steps: {} });

assert.equal(value.url, 'https://example.com/{{steps.missing.output.id}}');
});
36 changes: 33 additions & 3 deletions backend/src/controllers/trigger.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ const asyncHandler = require('../utils/asyncHandler');
const ipWhitelistService = require('../services/ipWhitelist.service');

async function validateWebhookDestinationIfNeeded(body, organizationId, currentTrigger = null) {
const warnings = [];
const steps = body.steps ?? currentTrigger?.steps ?? [];

if (Array.isArray(steps) && steps.length > 0) {
for (const step of steps) {
if (step.actionType !== 'webhook' || !step.actionUrl) {
continue;
}

const result = await ipWhitelistService.validateUrl(step.actionUrl, organizationId, {
allowDnsFailure: true,
});
warnings.push(...(result.warnings || []).map((warning) => `step ${step.id}: ${warning}`));
}
return warnings;
}

const actionType = body.actionType ?? currentTrigger?.actionType ?? 'webhook';
const actionUrl = body.actionUrl ?? currentTrigger?.actionUrl;

if (actionType !== 'webhook' || !actionUrl) {
return [];
return warnings;
}

const result = await ipWhitelistService.validateUrl(actionUrl, organizationId, {
allowDnsFailure: true,
});

return result.warnings || [];
warnings.push(...(result.warnings || []));
return warnings;
}

exports.createTrigger = asyncHandler(async (req, res) => {
Expand Down Expand Up @@ -135,9 +153,21 @@ exports.updateTrigger = asyncHandler(async (req, res) => {
existingTrigger
);

const updatePayload = { ...req.body };
const hasWorkflowSteps = Array.isArray(updatePayload.steps) && updatePayload.steps.length > 0;
if (hasWorkflowSteps) {
delete updatePayload.actionUrl;
}
const updateOperation = hasWorkflowSteps
? {
$set: updatePayload,
$unset: { actionUrl: '' },
}
: updatePayload;

const trigger = await Trigger.findOneAndUpdate(
{ _id: req.params.id, organization: req.user.organization._id },
req.body,
updateOperation,
{ new: true, runValidators: true }
);

Expand Down
Loading
Loading