feat: remove promise.all to prevent resource exhaustion#2632
Conversation
…e IDs" This reverts commit c03f9d8.
|
Thanks for opening this pull request and contributing to the project! The next step is for the maintainers to review your changes. If everything looks good, it will be approved and merged into the main branch. In the meantime, anyone in the community is encouraged to test this pull request and provide feedback. ✅ How to confirm it worksIf you’ve tested this PR, please comment below with: This helps us speed up the review and merge process. 📦 To test this PR locally:If you encounter any issues or have feedback, feel free to comment as well. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR changes createParticipantNodes to encrypt recipients sequentially with per-recipient try/catch that logs and skips failures, yielding to the event loop between recipients. A small utility yieldEventLoop() is added. ChangesMessage Encryption Error Handling
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/Socket/messages-send.ts (1)
558-607:⚠️ Potential issue | 🔴 CriticalFix incorrect
p-limitusage and removeanycasts increateParticipantNodes(src/Socket/messages-send.ts, ~558-607)
patchedMessages.map(limit(async (...) => { ... }, {}))is invalid:limit(fn, ...args)returns a Promise, but.map()expects a mapper function, so this will fail at runtime; also{}would be passed as the async fn’s first argument.- Use
patchedMessages.map(item => limit(async () => { const { recipientJid: jid, message: patchedMessage } = item; ... }))(or equivalent wrapper).- Remove
(patchedMessages as any)and the: anyparameter by typingpatchedMessagesand the callback properly.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/Socket/messages-send.ts` around lines 558 - 607, The mapped encryptionPromises incorrectly calls limit as the mapper and uses any-casts; change to map each patchedMessages item into a call that returns limit(async () => { ... }) so limit receives the async function and map returns an array of Promises (i.e., replace patchedMessages.map(limit(async ({ recipientJid: jid, message: patchedMessage }: any) => { ... }, {})) with patchedMessages.map(item => limit(async () => { const { recipientJid: jid, message: patchedMessage } = item; ... })) and remove the (patchedMessages as any) and : any parameter types by properly typing patchedMessages and the callback; keep existing logic that uses jidDecode, meId/meLid/meLidUser, dsmMessage, encodeWAMessage, encryptionMutex.mutex, and signalRepository.encryptMessage and preserve the try/catch and logger calls so behavior is unchanged.Source: Coding guidelines
🧹 Nitpick comments (1)
src/Socket/messages-send.ts (1)
556-556: ⚡ Quick winConsider making encryption concurrency configurable.
The hardcoded value of
10may not be optimal for all deployment scenarios (serverless vs VPS, different CPU/memory constraints). Consider adding anencryptionConcurrencyoption toSocketConfigwith10as the default.♻️ Suggested approach
In the
SocketConfigtype (likely insrc/Types/index.ts):encryptionConcurrency?: numberThen at line 122:
const encryptionLimiter = pLimit({ concurrency: config.encryptionConcurrency ?? 10 })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/Socket/messages-send.ts` at line 556, Add a configurable encryption concurrency option: extend the SocketConfig type (e.g., add encryptionConcurrency?: number) and use it when constructing the pLimit limiter instead of the hardcoded 10; update the code that defines "const limit = pLimit({ concurrency: 10 })" to read the concurrency from config (fallback to 10 if undefined) so the pLimit call (and any references to "limit") honor config.encryptionConcurrency.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/Socket/messages-send.ts`:
- Around line 556-557: The pLimit instance (currently created as "const limit =
pLimit({ concurrency: 10 })" inside createParticipantNodes) must be moved to the
socket/module scope so all createParticipantNodes calls share a single
concurrency budget: create the limiter immediately after the existing
"encryptionMutex" declaration (same scope as encryptionMutex) and remove the
local "limit" declaration from inside createParticipantNodes; update references
in createParticipantNodes to use the now-top-level "limit" variable so
concurrent encryptions are globally limited.
---
Outside diff comments:
In `@src/Socket/messages-send.ts`:
- Around line 558-607: The mapped encryptionPromises incorrectly calls limit as
the mapper and uses any-casts; change to map each patchedMessages item into a
call that returns limit(async () => { ... }) so limit receives the async
function and map returns an array of Promises (i.e., replace
patchedMessages.map(limit(async ({ recipientJid: jid, message: patchedMessage }:
any) => { ... }, {})) with patchedMessages.map(item => limit(async () => { const
{ recipientJid: jid, message: patchedMessage } = item; ... })) and remove the
(patchedMessages as any) and : any parameter types by properly typing
patchedMessages and the callback; keep existing logic that uses jidDecode,
meId/meLid/meLidUser, dsmMessage, encodeWAMessage, encryptionMutex.mutex, and
signalRepository.encryptMessage and preserve the try/catch and logger calls so
behavior is unchanged.
---
Nitpick comments:
In `@src/Socket/messages-send.ts`:
- Line 556: Add a configurable encryption concurrency option: extend the
SocketConfig type (e.g., add encryptionConcurrency?: number) and use it when
constructing the pLimit limiter instead of the hardcoded 10; update the code
that defines "const limit = pLimit({ concurrency: 10 })" to read the concurrency
from config (fallback to 10 if undefined) so the pLimit call (and any references
to "limit") honor config.encryptionConcurrency.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: ebc33fef-7283-44fb-b7c3-ce1abf48468f
⛔ Files ignored due to path filters (2)
package-lock.jsonis excluded by!**/package-lock.jsonyarn.lockis excluded by!**/yarn.lock,!**/*.lock
📒 Files selected for processing (2)
package.jsonsrc/Socket/messages-send.ts
There was a problem hiding this comment.
1 issue found across 4 files
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/Socket/messages-send.ts (1)
72-72:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUnused import —
pLimitis imported but never used.The
pLimitimport is present but the code no longer uses it. Thefor...ofloop at lines 559–608 executes encryptions sequentially (one at a time), not with a concurrency limit of 10 as stated in the PR objectives.Either:
- Remove this unused import if sequential execution is intentional, or
- Restore the concurrency limiter at module scope (as the prior review suggested) to match the PR's stated goal of "capping parallel encryption tasks at 10"
Option 2: Restore concurrency limiting at module scope
At line 122, after
encryptionMutex:const encryptionMutex = makeKeyedMutex() +const encryptionLimiter = pLimit(10)Then at lines 559–608, replace the sequential
for...ofwith limited parallel execution:-const nodes: BinaryNode[] = [] - -for (const { recipientJid: jid, message: patchedMessage } of patchedMessages as any) { - try { - // ... encryption logic ... - } catch (err) { - logger.error({ jid, err }, 'Failed to encrypt for recipient') - } -} +const nodes = (await Promise.all( + (patchedMessages as { recipientJid: string; message: proto.IMessage }[]).map( + ({ recipientJid: jid, message: patchedMessage }) => + encryptionLimiter(async () => { + try { + if (!jid) return null + // ... encryption logic returning node ... + } catch (err) { + logger.error({ jid, err }, 'Failed to encrypt for recipient') + return null + } + }) + ) +)).filter((n): n is BinaryNode => n !== null)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/Socket/messages-send.ts` at line 72, The import pLimit is unused: either remove the import or reinstate the concurrency limiter to cap parallel encryption tasks at 10; to fix, choose one approach—(A) delete the pLimit import and keep the existing sequential encryption loop inside the function that handles message encryption (look for the for...of loop iterating messages and the encryptionMutex variable), or (B) restore a module-scoped limiter by creating a pLimit(10) instance near encryptionMutex and replace the sequential for...of with Promise-based limited tasks that call the existing per-message encrypt function (the same function invoked inside the loop) via limiter(() => encryptMessage(...)) and await Promise.all on the scheduled tasks to maintain the intended concurrency cap.
🧹 Nitpick comments (1)
src/Socket/messages-send.ts (1)
559-559: ⚡ Quick winAvoid
anyin new code.The cast
patchedMessages as anybypasses type checking. Define a proper type or use the existing shape frompatchMessageBeforeSending.-for (const { recipientJid: jid, message: patchedMessage } of patchedMessages as any) { +for (const { recipientJid: jid, message: patchedMessage } of patchedMessages as { recipientJid: string; message: proto.IMessage }[]) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/Socket/messages-send.ts` at line 559, The loop currently bypasses type checking with "patchedMessages as any"; replace that cast by giving patchedMessages a proper typed array using the return/type shape from patchMessageBeforeSending (e.g., an interface/alias with recipientJid and patchedMessage/message fields) and use that type in the for...of signature (for (const { recipientJid: jid, message: patchedMessage } of patchedMessages: PatchedMessage[])), or import/reuse the existing return type from patchMessageBeforeSending so the compiler can validate recipientJid and patchedMessage instead of relying on any.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/Socket/messages-send.ts`:
- Line 72: The import pLimit is unused: either remove the import or reinstate
the concurrency limiter to cap parallel encryption tasks at 10; to fix, choose
one approach—(A) delete the pLimit import and keep the existing sequential
encryption loop inside the function that handles message encryption (look for
the for...of loop iterating messages and the encryptionMutex variable), or (B)
restore a module-scoped limiter by creating a pLimit(10) instance near
encryptionMutex and replace the sequential for...of with Promise-based limited
tasks that call the existing per-message encrypt function (the same function
invoked inside the loop) via limiter(() => encryptMessage(...)) and await
Promise.all on the scheduled tasks to maintain the intended concurrency cap.
---
Nitpick comments:
In `@src/Socket/messages-send.ts`:
- Line 559: The loop currently bypasses type checking with "patchedMessages as
any"; replace that cast by giving patchedMessages a proper typed array using the
return/type shape from patchMessageBeforeSending (e.g., an interface/alias with
recipientJid and patchedMessage/message fields) and use that type in the
for...of signature (for (const { recipientJid: jid, message: patchedMessage } of
patchedMessages: PatchedMessage[])), or import/reuse the existing return type
from patchMessageBeforeSending so the compiler can validate recipientJid and
patchedMessage instead of relying on any.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: cf128699-4154-4f16-b00c-0af03cb1987c
📒 Files selected for processing (1)
src/Socket/messages-send.ts
|
Can run bartender @jlucaso1 @purpshell ? |
This function can block the event loop for up to 2 seconds under a high volume of connections. Combined with 20+ concurrent connections on the same server, this can cause noticeable latency. The issue comes from using Promise.all for CPU-bound tasks. Replacing it with sequential execution plus periodic yields to the event loop prevents long event loop stalls and keeps the server more responsive.
This pull request refactors the way participant message nodes are created and encrypted in the
makeMessagesSocketfunction. The main improvement is simplifying the logic by replacing the use ofPromise.allwith a sequentialfor...ofloop, which enhances error handling and code clarity.Refactoring and simplification of message encryption:
Promise.allwith a synchronousfor...ofloop to process message encryption for each recipient, allowing for immediate error handling and skipping invalid recipients.nodesarray, removing the need to filter outnullvalues after encryption attempts.Code readability and maintainability:
continueinstead of returningnullwithin the encryption loop.encryptionPromisesarray and its associated mapping logic, streamlining the code structure.createParticipantNodesfunction definition.