From 13eceb25e6b4c70730d6f2dec578402c2a07e01b Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Tue, 28 Jan 2025 20:10:49 +0000 Subject: [PATCH] feat: append messageIds to standard and timeout errors --- src/consumer.ts | 3 ++ src/errors.ts | 33 ++++++++++++++++++-- test/tests/consumer.test.ts | 62 +++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index 44f24d1..7d5cef9 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -549,12 +549,14 @@ export class Consumer extends TypedEventEmitter { throw toTimeoutError( err, `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`, + message ); } if (err instanceof Error) { throw toStandardError( err, `Unexpected message handler failure: ${err.message}`, + message ); } throw err; @@ -581,6 +583,7 @@ export class Consumer extends TypedEventEmitter { throw toStandardError( err, `Unexpected message handler failure: ${err.message}`, + messages, ); } throw err; diff --git a/src/errors.ts b/src/errors.ts index 5fbcb85..d81a9a9 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,3 +1,5 @@ +import { Message } from "@aws-sdk/client-sqs"; + import { AWSError } from "./types.js"; class SQSError extends Error { @@ -17,6 +19,7 @@ class SQSError extends Error { } class TimeoutError extends Error { + messageIds: string[]; cause: Error; time: Date; @@ -24,10 +27,12 @@ class TimeoutError extends Error { super(message); this.message = message; this.name = "TimeoutError"; + this.messageIds = []; } } class StandardError extends Error { + messageIds: string[]; cause: Error; time: Date; @@ -35,6 +40,7 @@ class StandardError extends Error { super(message); this.message = message; this.name = "StandardError"; + this.messageIds = []; } } @@ -90,15 +96,32 @@ function toSQSError( return sqsError; } +/** + * Gets the message IDs from the message. + * @param message The message that was received from SQS. + */ +function getMessageIds(message: Message | Message[]): string[] { + if (Array.isArray(message)) { + return message.map((m) => m.MessageId); + } + return [message.MessageId]; +} + /** * Formats an Error to the StandardError type. * @param err The error object that was received. * @param message The message to send with the error. + * @param sqsMessage The message that was received from SQS. */ -function toStandardError(err: Error, message: string): StandardError { +function toStandardError( + err: Error, + message: string, + sqsMessage: Message | Message[], +): StandardError { const error = new StandardError(message); error.cause = err; error.time = new Date(); + error.messageIds = getMessageIds(sqsMessage); return error; } @@ -107,11 +130,17 @@ function toStandardError(err: Error, message: string): StandardError { * Formats an Error to the TimeoutError type. * @param err The error object that was received. * @param message The message to send with the error. + * @param sqsMessage The message that was received from SQS. */ -function toTimeoutError(err: TimeoutError, message: string): TimeoutError { +function toTimeoutError( + err: TimeoutError, + message: string, + sqsMessage: Message | Message[], +): TimeoutError { const error = new TimeoutError(message); error.cause = err; error.time = new Date(); + error.messageIds = getMessageIds(sqsMessage); return error; } diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 974277c..3d4eb20 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -466,6 +466,7 @@ describe("Consumer", () => { "Unexpected message handler failure: Processing error", ); assert.equal(message.MessageId, "123"); + assert.deepEqual((err as any).messageIds, ["123"]); }); it("fires an `error` event when an `SQSError` occurs processing a message", async () => { @@ -1707,6 +1708,67 @@ describe("Consumer", () => { assert.ok(err); assert.equal(err.message, "Error changing visibility timeout: failed"); }); + + it("includes messageIds in timeout errors", async () => { + const handleMessageTimeout = 500; + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: () => + new Promise((resolve) => setTimeout(resolve, 1000)), + handleMessageTimeout, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + }); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "timeout_error"), + clock.tickAsync(handleMessageTimeout), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal( + err.message, + `Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`, + ); + assert.deepEqual(err.messageIds, ["123"]); + }); + + it("includes messageIds in batch processing errors", async () => { + sqs.send.withArgs(mockReceiveMessage).resolves({ + Messages: [ + { MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" }, + { MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" }, + ], + }); + + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: () => { + throw new Error("Batch processing error"); + }, + batchSize: 2, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + }); + + consumer.start(); + const [err]: any = await Promise.all([ + pEvent(consumer, "error"), + clock.tickAsync(100), + ]); + consumer.stop(); + + assert.ok(err); + assert.equal( + err.message, + "Unexpected message handler failure: Batch processing error" + ); + assert.deepEqual(err.messageIds, ["1", "2"]); + }); }); describe("event listeners", () => {