Custom Payload Transformer (WebAssembly Support) [ Fixed Issue #257 ]#366
Custom Payload Transformer (WebAssembly Support) [ Fixed Issue #257 ]#366Juwonlo wants to merge 2 commits intoEDOHWARES:mainfrom
Conversation
|
@Juwonlo Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
There was a problem hiding this comment.
Pull request overview
Implements a custom payload transformation runtime that executes user-provided WebAssembly (Wasm) against event payloads, and wires it into the BullMQ worker so triggers can optionally transform payloads before dispatching actions.
Changes:
- Added a worker-thread-based Wasm payload transformer with timeout support.
- Integrated optional Wasm transformation into the action processor (including batch handling) and added a benchmark script.
- Added GraphQL subscriptions/pubsub scaffolding intended to publish execution logs.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/src/worker/wasmTransformer.js | Adds worker-thread Wasm execution and payload transformation logic. |
| backend/src/worker/wasm.benchmark.js | Adds a benchmark script to measure transformation/thread overhead. |
| backend/src/worker/processor.js | Invokes the Wasm transformer before executing trigger actions; adds failure pubsub publishing. |
| backend/src/worker/server.js | Adds an Apollo + graphql-ws subscription server configuration. |
| backend/src/worker/typeDefs.js | Adds GraphQL schema definitions for execution log subscriptions. |
| backend/src/worker/resolvers.js | Adds subscription resolver using withFilter and pubsub iterator. |
| backend/src/worker/pubsub.js | Adds Redis-backed GraphQL pubsub implementation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const { ApolloServer } = require('@apollo/server'); | ||
| const { ApolloServerPluginDrainHttpServer } = require('@apollo/server/plugin/drainHttpServer'); | ||
| const { makeExecutableSchema } = require('@graphql-tools/schema'); | ||
| const { WebSocketServer } = require('ws'); | ||
| const { useServer } = require('graphql-ws/lib/use/ws'); | ||
| const jwt = require('jsonwebtoken'); |
There was a problem hiding this comment.
This file introduces imports for packages that are not listed in backend/package.json (e.g., @apollo/server, @apollo/server/plugin/drainHttpServer, @graphql-tools/schema, ws, graphql-ws). Add the missing dependencies (or remove the code) so the backend workspace can install and start successfully.
| transformPayload: async (wasmBase64, payload, timeoutMs = 1000) => { | ||
| return new Promise((resolve, reject) => { | ||
| const worker = new Worker(__filename, { | ||
| workerData: { wasmBase64, payload } | ||
| }); | ||
|
|
||
| const timer = setTimeout(() => { | ||
| worker.terminate(); | ||
| reject(new Error(`WASM execution timed out after ${timeoutMs}ms`)); | ||
| }, timeoutMs); | ||
|
|
||
| worker.on('message', (msg) => { | ||
| clearTimeout(timer); | ||
| if (msg.error) reject(new Error(msg.error)); | ||
| else resolve(msg.result); | ||
| }); | ||
|
|
||
| worker.on('error', (err) => { | ||
| clearTimeout(timer); | ||
| reject(err); | ||
| }); | ||
|
|
||
| worker.on('exit', (code) => { | ||
| clearTimeout(timer); | ||
| if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); | ||
| }); |
There was a problem hiding this comment.
New transformation behavior is introduced here (worker thread execution, timeouts, ABI export validation, output decoding/JSON parsing), but there are no unit/integration tests added alongside it. Given the repo uses node --test with backend/__tests__, please add tests for success/failure cases (invalid base64, missing exports, timeout, invalid JSON, large output) to prevent regressions.
| const { RedisPubSub } = require('graphql-redis-subscriptions'); | ||
| const Redis = require('ioredis'); |
There was a problem hiding this comment.
This module depends on graphql-redis-subscriptions (and ioredis), but graphql-redis-subscriptions isn’t listed in backend/package.json right now. Add it to the backend workspace dependencies so installs don’t fail.
| const { withFilter } = require('graphql-subscriptions'); | ||
| const pubsub = require('./pubsub'); | ||
|
|
There was a problem hiding this comment.
graphql-subscriptions is imported here, but it’s not currently declared in backend/package.json. Add the missing dependency (or replace this with the project’s existing subscription utilities) to avoid runtime/module resolution failures.
| const { withFilter } = require('graphql-subscriptions'); | |
| const pubsub = require('./pubsub'); | |
| const pubsub = require('./pubsub'); | |
| const withFilter = (asyncIteratorFn, filterFn) => (...args) => { | |
| const asyncIterator = asyncIteratorFn(...args); | |
| const getNextFilteredValue = async () => { | |
| while (true) { | |
| const result = await asyncIterator.next(); | |
| if (result.done) { | |
| return result; | |
| } | |
| const payload = result.value; | |
| const variables = args[1] || {}; | |
| if (await filterFn(payload, variables, ...args)) { | |
| return { value: payload, done: false }; | |
| } | |
| } | |
| }; | |
| return { | |
| async next() { | |
| return getNextFilteredValue(); | |
| }, | |
| return() { | |
| if (typeof asyncIterator.return === 'function') { | |
| return asyncIterator.return(); | |
| } | |
| return Promise.resolve({ value: undefined, done: true }); | |
| }, | |
| throw(error) { | |
| if (typeof asyncIterator.throw === 'function') { | |
| return asyncIterator.throw(error); | |
| } | |
| return Promise.reject(error); | |
| }, | |
| [Symbol.asyncIterator]() { | |
| return this; | |
| } | |
| }; | |
| }; |
| case 'webhook': { | ||
| if (!actionUrl) { | ||
| throw new Error('Missing actionUrl for webhook trigger'); | ||
| } | ||
|
|
||
| const payload = { | ||
| return await axios.post(actionUrl, { | ||
| contractId, | ||
| eventName, | ||
| payload: eventPayload, | ||
| }; | ||
|
|
||
| return await webhookService.sendSignedWebhook( | ||
| actionUrl, | ||
| payload, | ||
| trigger.webhookSecret | ||
| ); | ||
| }); |
There was a problem hiding this comment.
The webhook action was changed from webhookService.sendSignedWebhook(...) to a plain axios.post(...), which removes the HMAC signing and request options previously used for secure delivery. Please restore signed webhook delivery (or re-implement equivalent signature + timestamp headers) to avoid breaking existing webhook verification expectations.
| /** | ||
| * Configures the Apollo Server with WebSocket Subscriptions support. | ||
| * | ||
| * @param {Object} httpServer HTTP server instance from Express | ||
| * @returns {ApolloServer} the configured Apollo server instance | ||
| */ | ||
| async function configureGraphQLServer(httpServer) { | ||
| const schema = makeExecutableSchema({ typeDefs, resolvers }); |
There was a problem hiding this comment.
The PR description/linked Issue #257 is specifically about adding a custom payload transformer (Wasm runtime + benchmarks). This PR also adds a GraphQL subscriptions server/pubsub/typeDefs/resolvers which isn’t mentioned in the description or acceptance criteria. Either update the PR description to explain this additional scope and how it relates, or move the subscriptions work into a separate PR.
| @@ -0,0 +1,16 @@ | |||
| c.onst typeDefs = `#graphql | |||
There was a problem hiding this comment.
Line 1 has a syntax error (c.onst) which will prevent the module from loading. Change it to a valid const declaration so the schema can be imported.
| c.onst typeDefs = `#graphql | |
| const typeDefs = `#graphql |
| const telegramService = require('../services/telegram.service'); | ||
| const webhookService = require('../services/webhook.service'); | ||
| const logger = require('../config/logger'); | ||
| const pubsub = require('../graphql/pubsub'); |
There was a problem hiding this comment.
This import path looks incorrect: there is no src/graphql/pubsub directory in the backend workspace, but a new src/worker/pubsub.js file exists. Update the require to point at the actual pubsub module (e.g., ./pubsub) so the worker doesn’t crash on startup.
| const pubsub = require('../graphql/pubsub'); | |
| const pubsub = require('./pubsub'); |
| case 'webhook': { | ||
| if (!actionUrl) { | ||
| throw new Error('Missing actionUrl for webhook trigger'); | ||
| } | ||
|
|
||
| const payload = { | ||
| await axios.post(actionUrl, { | ||
| contractId, | ||
| eventName, | ||
| payload: eventPayload, | ||
| batchIndex: i, | ||
| batchSize: eventPayloads.length, | ||
| batchPayloads: eventPayloads, // Send the full batch for webhooks | ||
| }; | ||
|
|
||
| await webhookService.sendSignedWebhook( | ||
| actionUrl, | ||
| payload, | ||
| trigger.webhookSecret | ||
| ); | ||
| }); |
There was a problem hiding this comment.
Same issue in the batch webhook path: sending unsigned requests via axios.post drops the webhook HMAC signature and related options. Please switch back to signed webhook delivery (or add equivalent signing headers) for batch webhooks as well.
| let outLen = 0; | ||
| while (outArray[outLen] !== 0 && outLen < 5000000) outLen++; // 5MB limit max |
There was a problem hiding this comment.
The null-terminated output scan can run past the end of memory.buffer: new Uint8Array(memory.buffer, outPtr) has a finite length, but the loop only checks outLen < 5000000. If there’s no 0 byte before the end of the view, outLen keeps increasing and new Uint8Array(memory.buffer, outPtr, outLen) can throw a RangeError. Bound the loop by outArray.length (and fail if no terminator is found within the limit) before decoding/JSON parsing.
| let outLen = 0; | |
| while (outArray[outLen] !== 0 && outLen < 5000000) outLen++; // 5MB limit max | |
| const maxOutLen = Math.min(outArray.length, 5000000); // 5MB limit max | |
| let outLen = 0; | |
| while (outLen < maxOutLen && outArray[outLen] !== 0) outLen++; | |
| if (outLen === maxOutLen) { | |
| throw new Error( | |
| outArray.length > 5000000 | |
| ? 'WASM output exceeds maximum allowed size or is missing null terminator' | |
| : 'WASM output is missing null terminator' | |
| ); | |
| } |
Closes #257
Fixed Issue #257