Fixed Issue #258#365
Conversation
|
@Juwonlo Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
There was a problem hiding this comment.
Pull request overview
Implements the backend pieces for GraphQL Subscriptions intended to stream execution logs to a real-time dashboard, using WebSockets + Redis-backed PubSub.
Changes:
- Added GraphQL subscription schema (
ExecutionLog) and subscription resolver with optionaltriggerIdfiltering. - Added a WebSocket subscription server wrapper using
graphql-ws, with JWT validation during WS connect. - Introduced Redis-based PubSub and began publishing execution logs from the BullMQ worker (currently on failures only), while also changing webhook delivery behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/src/worker/typeDefs.js | Adds GraphQL SDL for ExecutionLog + Subscription. |
| backend/src/worker/server.js | Adds Apollo/graphql-ws WebSocket server configuration with JWT auth on connect. |
| backend/src/worker/resolvers.js | Adds subscription resolver using withFilter + PubSub iterator. |
| backend/src/worker/pubsub.js | Adds Redis-backed PubSub implementation for subscriptions. |
| backend/src/worker/processor.js | Publishes execution logs on job failure; changes webhook sending from signed webhooks to plain axios.post. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| case 'webhook': { | ||
| if (!actionUrl) { | ||
| throw new Error('Missing actionUrl for webhook trigger'); | ||
| } | ||
|
|
||
| const payload = { | ||
| return await axios.post(actionUrl, { | ||
| contractId, | ||
| eventName, | ||
| payload: eventPayload, | ||
| }; | ||
|
|
||
| return await webhookService.sendSignedWebhook( | ||
| actionUrl, | ||
| payload, | ||
| trigger.webhookSecret | ||
| ); | ||
| }); |
There was a problem hiding this comment.
Webhook delivery no longer uses webhookService.sendSignedWebhook(...) and instead posts unsigned JSON directly. This removes HMAC signing that’s documented (backend/WEBHOOK_VERIFICATION.md) and implemented in src/services/webhook.service.js, breaking existing webhook verification and reducing security. Restore signed delivery (or provide an explicit, backwards-compatible opt-out).
| try { | ||
| const decoded = jwt.verify(token.replace('Bearer ', ''), process.env.JWT_SECRET || 'secret'); | ||
| return { user: decoded }; // Pass user info into context | ||
| } catch (error) { | ||
| logger.error('WebSocket authentication failed', { error: error.message }); | ||
| throw new Error('Authentication token is invalid'); | ||
| } |
There was a problem hiding this comment.
jwt.verify(..., process.env.JWT_SECRET || 'secret') introduces an insecure fallback: if JWT_SECRET is unset/misconfigured, anyone can mint valid tokens using the known string secret. Require JWT_SECRET (fail fast on startup) and avoid a hard-coded default.
| return { user: decoded }; // Pass user info into context | ||
| } catch (error) { | ||
| logger.error('WebSocket authentication failed', { error: error.message }); | ||
| throw new Error('Authentication token is invalid'); | ||
| } | ||
| } |
There was a problem hiding this comment.
onConnect in graphql-ws is only for accepting/rejecting the connection; returning { user: decoded } does not automatically become the GraphQL execution context (despite the comment). If resolvers need the authenticated user, persist it on ctx.extra in onConnect and provide a context function in useServer (or use onSubscribe) to inject { user } into the operation context.
| return { user: decoded }; // Pass user info into context | |
| } catch (error) { | |
| logger.error('WebSocket authentication failed', { error: error.message }); | |
| throw new Error('Authentication token is invalid'); | |
| } | |
| } | |
| ctx.extra.user = decoded; | |
| } catch (error) { | |
| logger.error('WebSocket authentication failed', { error: error.message }); | |
| throw new Error('Authentication token is invalid'); | |
| } | |
| }, | |
| context: async (ctx) => ({ | |
| user: ctx.extra.user, | |
| }), |
| async function configureGraphQLServer(httpServer) { | ||
| const schema = makeExecutableSchema({ typeDefs, resolvers }); | ||
|
|
||
| // Creating the WebSocket server for subscriptions | ||
| const wsServer = new WebSocketServer({ | ||
| server: httpServer, | ||
| path: '/graphql', | ||
| }); | ||
|
|
||
| // Connect WebSocket server with GraphQL | ||
| const serverCleanup = useServer({ | ||
| schema, | ||
| onConnect: async (ctx) => { | ||
| // Secure auth over WebSocket connections | ||
| const token = ctx.connectionParams?.authToken || ctx.connectionParams?.Authorization; | ||
| if (!token) { | ||
| logger.warn('WebSocket connection attempted without auth token'); | ||
| throw new Error('Authentication token is required'); | ||
| } | ||
|
|
||
| try { | ||
| const decoded = jwt.verify(token.replace('Bearer ', ''), process.env.JWT_SECRET || 'secret'); | ||
| return { user: decoded }; // Pass user info into context | ||
| } catch (error) { | ||
| logger.error('WebSocket authentication failed', { error: error.message }); | ||
| throw new Error('Authentication token is invalid'); | ||
| } | ||
| } | ||
| }, wsServer); | ||
|
|
||
| const server = new ApolloServer({ | ||
| schema, | ||
| plugins: [ | ||
| ApolloServerPluginDrainHttpServer({ httpServer }), | ||
| { | ||
| async serverWillStart() { | ||
| return { | ||
| async drainServer() { | ||
| await serverCleanup.dispose(); | ||
| }, | ||
| }; | ||
| }, | ||
| }, | ||
| ], | ||
| }); | ||
|
|
||
| await server.start(); | ||
|
|
||
| logger.info('Apollo GraphQL Subscriptions server initialized'); | ||
| return server; | ||
| } |
There was a problem hiding this comment.
configureGraphQLServer is exported but not invoked anywhere in backend/src (no references found), so subscriptions won’t actually be enabled. If this PR is meant to implement Issue #258 end-to-end, wire this into the HTTP server startup (or document the separate entrypoint) so /graphql WS is reachable.
| worker.on('completed', (job) => { | ||
| logger.info('Job completed', { | ||
| jobId: job.id, | ||
| actionType: job.data.trigger.actionType, | ||
| }); | ||
| }); | ||
|
|
||
| worker.on('failed', (job, err) => { | ||
| logger.error('Job failed', { | ||
| jobId: job?.id, | ||
| actionType: job?.data?.trigger?.actionType, | ||
| error: err.message, | ||
| attemptsRemaining: job ? job.opts.attempts - job.attemptsMade : 0, | ||
| }); | ||
|
|
||
| if (job) { | ||
| pubsub.publish('EXECUTION_LOG', { | ||
| executionLog: { | ||
| jobId: job.id, | ||
| triggerId: String(job.data.trigger._id), | ||
| actionType: job.data.trigger.actionType, | ||
| status: 'FAILED', | ||
| error: err.message, | ||
| timestamp: new Date().toISOString() | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Execution log publishing is currently only emitted on job failure. For “real-time dashboard” execution logs, you likely also want to publish on completed (and possibly start/progress) with status: 'SUCCEEDED' (and error: null) so subscribers can reflect successful executions too.
| const serverCleanup = useServer({ | ||
| schema, | ||
| onConnect: async (ctx) => { | ||
| // Secure auth over WebSocket connections | ||
| const token = ctx.connectionParams?.authToken || ctx.connectionParams?.Authorization; | ||
| if (!token) { | ||
| logger.warn('WebSocket connection attempted without auth token'); | ||
| throw new Error('Authentication token is required'); | ||
| } | ||
|
|
||
| try { | ||
| const decoded = jwt.verify(token.replace('Bearer ', ''), process.env.JWT_SECRET || 'secret'); | ||
| return { user: decoded }; // Pass user info into context | ||
| } catch (error) { | ||
| logger.error('WebSocket authentication failed', { error: error.message }); | ||
| throw new Error('Authentication token is invalid'); | ||
| } | ||
| } | ||
| }, wsServer); |
There was a problem hiding this comment.
There are existing Node tests under backend/__tests__, but no unit/integration tests were added for the new subscription auth + filtering behavior. Add tests that cover: (1) rejecting WS connections without/with invalid JWT, (2) allowing valid JWT, and (3) executionLog(triggerId) filtering behavior.
| @@ -0,0 +1,16 @@ | |||
| c.onst typeDefs = `#graphql | |||
There was a problem hiding this comment.
Line 1 has a syntax error (c.onst). This file won’t load and will prevent the GraphQL schema from being created. Change it to a valid const declaration.
| c.onst typeDefs = `#graphql | |
| const typeDefs = `#graphql |
| error: String | ||
| timestamp: String! | ||
| } | ||
|
|
There was a problem hiding this comment.
The schema defines Subscription but no Query root (and no explicit schema { subscription: Subscription }). makeExecutableSchema/GraphQL validation will fail with “Query root type must be provided”. Add an explicit schema definition or include at least a minimal type Query { _empty: String }.
| type Query { | |
| _empty: String | |
| } |
| const { ApolloServer } = require('@apollo/server'); | ||
| const { ApolloServerPluginDrainHttpServer } = require('@apollo/server/plugin/drainHttpServer'); | ||
| const { makeExecutableSchema } = require('@graphql-tools/schema'); | ||
| const { WebSocketServer } = require('ws'); | ||
| const { useServer } = require('graphql-ws/lib/use/ws'); | ||
| const jwt = require('jsonwebtoken'); | ||
| const typeDefs = require('./typeDefs'); | ||
| const resolvers = require('./resolvers'); |
There was a problem hiding this comment.
This module imports new runtime dependencies (@apollo/server, @graphql-tools/schema, graphql-ws, ws, graphql-subscriptions, graphql-redis-subscriptions) that are not present in backend/package.json. When this code is wired in, require() will fail at runtime. Add the required packages to backend/package.json (dependencies) and lockfile.
| const telegramService = require('../services/telegram.service'); | ||
| const webhookService = require('../services/webhook.service'); | ||
| const logger = require('../config/logger'); | ||
| const pubsub = require('../graphql/pubsub'); |
There was a problem hiding this comment.
const pubsub = require('../graphql/pubsub'); points to a non-existent path (backend/src/graphql does not exist). This throws during worker initialization and disables the BullMQ worker (caught in src/server.js, effectively turning off background processing). Import the actual pubsub module (likely ./pubsub within src/worker) and ensure both publisher and subscription server use the same instance/config.
| const pubsub = require('../graphql/pubsub'); | |
| const pubsub = require('./pubsub'); |
Closes #258
GraphQL Subscriptions for real-time dashboard --- Fixed Issue #258